View Javadoc

1   /*
2    * Copyright (c) 1998-2004 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gm;
20  
21  import java.rmi.RemoteException;
22  import java.rmi.server.ExportException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadFactory;
28  
29  import jgroup.core.ConfigManager;
30  import jgroup.core.JgroupException;
31  import jgroup.relacs.config.TransportConfig;
32  import jgroup.relacs.events.Event;
33  import jgroup.util.ThreadMonitor;
34  import net.jini.export.Exporter;
35  import net.jini.jeri.BasicILFactory;
36  import net.jini.jeri.BasicJeriExporter;
37  import net.jini.jeri.tcp.TcpServerEndpoint;
38  
39  import org.apache.log4j.Logger;
40  import org.apache.log4j.MDC;
41  
42  /**
43   * Handle dispatching daemon events to other layers in the stack, e.g.,
44   * the <code>MembershipLayer</code> and <code>MulticastLayer</code>.
45   *
46   * @author Hein Meling
47   * @since Jgroup 0.1 (rewritten for Jgroup 3.0)
48   */
49  public class NewDispatcherLayer
50    implements DispatcherService, RemoteDispatcher, PingListener
51  {
52  
53    ////////////////////////////////////////////////////////////////////////////////////////////
54    // Logger
55    ////////////////////////////////////////////////////////////////////////////////////////////
56  
57    /** Obtain logger for this class */
58    private static final Logger log = Logger.getLogger(NewDispatcherLayer.class);
59  
60  
61    //////////////////////////////////////////////////////////////////////////////////////////
62    // Constants
63    //////////////////////////////////////////////////////////////////////////////////////////
64  
65    /** The maximum number of event types that we allow */
66    private static final int MAX_EVENT_TYPES = 20;
67  
68  
69    //////////////////////////////////////////////////////////////////////////////////////////
70    // Private fields
71    //////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** Last time when the dispatcher had interaction with the daemon */
74    private volatile long lastPingReceived;
75  
76    /** Ping timeout value */
77    private int daemonSuspectTimeout;
78  
79    /** The executor used to schedule events to the dispatcher listeners */
80    private ExecutorService executorService;
81  
82    /** List of dispatcher listeners (indexed by their supported event types) */
83    private final List<DispatcherListener> eventTypes;
84  
85    /** The exporter for this remote dispatcher object */
86    private Exporter exporter;
87  
88  
89    //////////////////////////////////////////////////////////////////////////////////////////
90    // Constructors
91    //////////////////////////////////////////////////////////////////////////////////////////
92  
93    /**
94     *  Initializes a DispatcherLayer
95     */
96    private NewDispatcherLayer()
97    {
98      eventTypes = new ArrayList<DispatcherListener>(MAX_EVENT_TYPES);
99      for (int i = 0; i < MAX_EVENT_TYPES; i++)
100       eventTypes.add(null);
101 
102     /* Obtains configuration information */
103     TransportConfig tconf =
104       (TransportConfig) ConfigManager.getConfig(TransportConfig.class);
105     daemonSuspectTimeout = tconf.getDaemonSuspectTimeout();
106   }
107 
108 
109   ////////////////////////////////////////////////////////////////////////////////////////////
110   // Static factory
111   ////////////////////////////////////////////////////////////////////////////////////////////
112 
113   public static NewDispatcherLayer getLayer()
114   {
115     return new NewDispatcherLayer();
116   }
117 
118 
119   ////////////////////////////////////////////////////////////////////////////////////////////
120   // Layer methods
121   ////////////////////////////////////////////////////////////////////////////////////////////
122 
123   /* (non-Javadoc)
124    * @see jgroup.core.Layer#addListener(java.lang.Object)
125    */
126   public void addListener(Object listener)
127   {
128     if (listener instanceof DispatcherListener) {
129       DispatcherListener dispListener = (DispatcherListener) listener;
130       int[] events = dispListener.eventTypes();
131       for (int i = 0; i < events.length; i++) {
132         eventTypes.set(events[i], dispListener);
133       }
134     } else {
135       throw new IllegalArgumentException("Specified listener is not a DispatcherListener");
136     }
137   }
138 
139 
140   //////////////////////////////////////////////////////////////////////////////////////////
141   // Methods from DispatcherService
142   //////////////////////////////////////////////////////////////////////////////////////////
143 
144   /* (non-Javadoc)
145    * @see jgroup.relacs.gm.DispatcherService#start(int)
146    */
147   public void start(final int gid)
148   {
149     ThreadFactory tFactory = new ThreadFactory() {
150       public Thread newThread(Runnable r) {
151         Thread dispatcher = new Thread(r, "Dispatcher-" + gid);
152         dispatcher.setDaemon(true);
153         ThreadMonitor.add(dispatcher);
154         return dispatcher;
155       }
156     };
157 //    executorService = Executors.newSingleThreadExecutor(tFactory);
158     executorService = Executors.newFixedThreadPool(3, tFactory);
159   }
160 
161   /* (non-Javadoc)
162    * @see jgroup.relacs.gm.DispatcherService#halt()
163    */
164   public void halt()
165   {
166     // Unexport the remote dispatcher
167     exporter.unexport(true);
168     executorService.shutdown();
169   }
170 
171   /* (non-Javadoc)
172    * @see jgroup.relacs.gm.DispatcherService#dispatch(java.lang.Object)
173    */
174   public void dispatch(Object obj)
175   {
176     if (log.isDebugEnabled())
177       log.debug("Blocking on " + obj);
178     synchronized (obj) {
179       try {
180         obj.wait(daemonSuspectTimeout);
181       } catch (InterruptedException e) {
182         if (log.isDebugEnabled())
183           log.debug("Interrupted on " + obj);
184         return;
185       }
186     }
187     /* Check if the daemon is suspected to have failed. */
188     long timePassed = System.currentTimeMillis() - lastPingReceived; 
189     if (timePassed > daemonSuspectTimeout) {
190       log.warn("Daemon suspected after: " + timePassed);
191     } else {
192       if (log.isDebugEnabled())
193         log.debug("Unblocked on " + obj);
194     }
195   }
196 
197   /* (non-Javadoc)
198    * @see jgroup.relacs.gm.DispatcherService#getRemoteDispatcher()
199    */
200   public RemoteDispatcher getRemoteDispatcher() throws JgroupException
201   {
202     exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
203         new BasicILFactory());
204     try {
205       return (RemoteDispatcher) exporter.export(this);
206     } catch (ExportException e) {
207       throw new JgroupException("Could not export the dispatcher", e);
208     }
209   }
210 
211   /* (non-Javadoc)
212    * @see jgroup.relacs.gm.RemoteDispatcher#addEvent(jgroup.relacs.events.Event)
213    */
214   public void addEvent(final Event event) throws RemoteException
215   {
216     lastPingReceived = System.currentTimeMillis();
217     /* Insert the event in the upcall queue */
218     if (log.isDebugEnabled()) {
219       MDC.put("group", "[Group: " + event.getGid() + "]");
220       log.debug("Received daemon event: " + lastPingReceived + " " + event);
221     }
222     Runnable task = new Runnable() {
223       public void run() {
224         if (log.isDebugEnabled()) {
225           MDC.put("group", "[Group: " + event.getGid() + "]");
226           log.debug("Executing event: " + event);
227         }
228         DispatcherListener listener = eventTypes.get(event.getTag());
229         listener.notifyEvent(event);
230         if (log.isDebugEnabled())
231           log.debug("Event execution completed");
232       }
233     };
234     executorService.submit(task);
235     if (log.isDebugEnabled())
236       log.debug("Event submitted for execution");
237   }
238 
239   /* (non-Javadoc)
240    * @see jgroup.relacs.gm.PingListener#ping()
241    */
242   public void ping() throws RemoteException
243   {
244     lastPingReceived = System.currentTimeMillis();
245     if (log.isDebugEnabled()) {
246       log.debug("PingReceived: " + lastPingReceived);
247     }
248   }
249 
250   /* (non-Javadoc)
251    * @see java.lang.Object#toString()
252    */
253   public String toString()
254   {
255     return " DISP ";
256   }
257 
258 } // END NewDispatcherLayer