View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  import java.net.DatagramPacket;
23  import java.util.Iterator;
24  import java.util.SortedSet;
25  import java.util.TreeSet;
26  
27  import jgroup.relacs.events.Event;
28  import jgroup.util.Queue;
29  import jgroup.util.ThreadMonitor;
30  
31  import org.apache.log4j.Logger;
32  
33  
34  /**
35   *  The <code>Ehandler</code> class handles all events generated
36   *  internally by the timer and externally by the local members and by
37   *  the network interface threads.
38   *
39   *  @author Alberto Montresor
40   *  @author Hein Meling
41   *  @since Jgroup 0.1
42   */
43  final class Ehandler
44    extends Thread
45    implements NIListener, MssConstants, MssTag
46  {
47  
48    ////////////////////////////////////////////////////////////////////////////////////////////
49    // Logger
50    ////////////////////////////////////////////////////////////////////////////////////////////
51  
52    /** Obtain logger for this class */
53    private static final Logger log = Logger.getLogger(Ehandler.class);
54  
55  
56    ////////////////////////////////////////////////////////////////////////////////////////////
57    // Fields
58    ////////////////////////////////////////////////////////////////////////////////////////////
59  
60    /** Upper layer */
61    private final EhandlerUser ehuser;
62    
63    /** Sorted list of scheduled events */
64    private final SortedSet<ScheduledEvent> events = new TreeSet<ScheduledEvent>();
65    
66    /** Local message queue */
67    private final Queue lmessages = new Queue();
68    
69    /** Remote message queue*/
70    private final Queue rmessages = new Queue();
71  
72    /** Next remote message */
73    private DatagramPacket nextRemoteMessage;
74  
75    /** Next local message */
76    private Event nextLocalMessage;
77  
78    /** Next nextScheduledEvent */
79    private ScheduledEvent nextScheduledEvent;
80  
81    /** Set to true when the Ehandler thread has to be stopped */
82    private boolean stop;
83  
84  
85    ////////////////////////////////////////////////////////////////////////////////////////////
86    // Constructor
87    ////////////////////////////////////////////////////////////////////////////////////////////
88  
89    /**
90     * Constructs and starts the nextScheduledEvent handler.
91     *  
92     * @param name        
93     *   the name of the nextScheduledEvent handler thread
94     * @param ehuser
95     *   reference to the nextScheduledEvent handler user (used to generate upcalls).
96     */
97    Ehandler(String name, EhandlerUser ehuser)
98      throws IOException
99    {
100     /* Thread initialization */
101     super(name);
102 
103     /* Data structure initialization */
104     this.ehuser = ehuser;
105 
106     /* Thread management */
107     stop = false;
108     this.setDaemon(true);
109     this.setPriority(EHANDLER_PRIORITY);
110     this.start();
111     ThreadMonitor.add(this);
112   }
113 
114   ////////////////////////////////////////////////////////////////////////////////////////////
115   // Methods
116   ////////////////////////////////////////////////////////////////////////////////////////////
117 
118   /** 
119    *  Stops the Ehandler thread by setting stop to true.
120    */
121   void kill()
122   {
123     ThreadMonitor.remove(this);
124     stop = true;
125   }
126 
127   /**
128    *  Inserts a new timeout event.  The event will be handled
129    *  appropriately at the given time.
130    *
131    *  @param timeout
132    *    Timeout length in milliseconds till the event should be handled.
133    *  @param event
134    *    The event description to associate this timeout value with.
135    */
136   synchronized ScheduledEvent setTimeout(long timeout, ScheduledEvent event)
137   {
138     long time = System.currentTimeMillis() + timeout;
139     event.setTimeout(time);
140     /*
141      * Check if the event set contains several events at the exact same
142      * time (millisecond granularity); if so, we avoid this problem by
143      * incrementing the time value until we find an open time value for
144      * the event to occur on.
145      */
146     while (events.contains(event)) {
147       time++;
148       event.setTimeout(time);
149     }
150     events.add(event);
151     notifyAll();
152     return event;
153   }
154 
155 
156   /**
157    *  Abort the timeout associated to the specified <CODE>event</CODE>.
158    *  If <CODE>event</CODE> is different from <CODE>null</CODE>, it
159    *  is removed from the event list.
160    */
161   synchronized void abortTimeout(ScheduledEvent event)
162   {
163     if (event != null)
164       events.remove(event);
165   }
166 
167   
168   /**
169    *  Sends messages for local members by storing them in the <code>
170    *  lmessages </code> queue.
171    *
172    *  @param msg
173    *    Local message to send
174    */
175   synchronized void lnotify(Object msg)
176   {
177     lmessages.insert(msg);
178     notifyAll();
179   }
180 
181 
182   /**
183    *  Sends messages for remote members by storing them in the <code>
184    *  rmessages </code> queue.
185    *
186    *  @param msg
187    *    Local message to send
188    */
189   public synchronized void rnotify(DatagramPacket packet)
190   {
191     rmessages.insert(packet);
192     notifyAll();
193   }
194 
195 
196   ////////////////////////////////////////////////////////////////////////////////////////////
197   // Run method of the thread
198   ////////////////////////////////////////////////////////////////////////////////////////////
199 
200   /**
201    *  Run method of the thread.  It remains blocked until (i) a Datagram
202    *  packet is received through the net; (ii) a message from a local
203    *  member is received; (iii) a scheduled timeout expires.
204    */
205   public void run()
206   {
207     while (!stop) {
208       await();
209       if (log.isDebugEnabled())
210         logEventQueue();
211       if (nextScheduledEvent != null) {
212         if (log.isDebugEnabled())
213           log.debug("Scheduling event: " + nextScheduledEvent);
214         ehuser.treceive(nextScheduledEvent);
215         yield();
216       }
217       if (nextRemoteMessage != null) {
218         if (log.isDebugEnabled())
219           log.debug("Scheduling remote mesg: " + nextRemoteMessage);
220         ehuser.rreceive(nextRemoteMessage);
221         nextRemoteMessage = null;
222         yield();
223       }
224       if (nextLocalMessage != null) {
225         if (log.isDebugEnabled())
226           log.debug("Scheduling local mesg: " + nextLocalMessage);
227         ehuser.lreceive(nextLocalMessage);
228         nextLocalMessage = null;
229         yield();
230       }
231     }
232   }
233 
234 
235   ////////////////////////////////////////////////////////////////////////////////////////////
236   // Private methods
237   ////////////////////////////////////////////////////////////////////////////////////////////
238 
239   /**
240    *  Select next nextScheduledEvent and next messages 
241    */
242   private synchronized void await()
243   {
244     /* Select next events and messages */
245     if (!events.isEmpty()) {
246       nextScheduledEvent = events.first();
247     }
248     nextRemoteMessage = (DatagramPacket) rmessages.removeFirst();
249     nextLocalMessage  = (Event) lmessages.removeFirst();
250 
251     if (nextRemoteMessage == null && nextLocalMessage == null) {
252       if (nextScheduledEvent == null) {
253         try { wait(); } 
254         catch (InterruptedException e) {
255           log.warn("Ehandler:await:withoutTimeout", e);
256         }
257       } else {
258         long timeout = nextScheduledEvent.getTimeout() - System.currentTimeMillis();
259         if (timeout > 0) {
260           try { wait(timeout); }
261           catch (InterruptedException e) {
262             log.warn("Ehandler:await:withTimeout", e);
263           }
264         }
265       }
266     }
267 
268     /*
269      *  Check if the first timeout is expired
270      */
271     if (nextScheduledEvent != null) {
272       long timeout = nextScheduledEvent.getTimeout() - System.currentTimeMillis();
273       if (timeout <= 0) {
274         events.remove(nextScheduledEvent);
275         if (log.isDebugEnabled())
276           log.debug("Timeout expired: " + (-timeout) + " msec ago");
277       } else {
278         nextScheduledEvent = null;
279       }
280     }
281   }
282 
283 
284   /**
285    * Returns true if the local message queue contains undelivered
286    * messages for the specified group; otherwise, false is returned.
287    */
288   public boolean hasUndeliveredLocalMsgs(int gid)
289   {
290     for (Iterator iter = lmessages.iterator(); iter.hasNext();) {
291       Event event = (Event) iter.next();
292       if (event.getGid() == gid) {
293         return true;
294       }
295     }
296     return false;
297   }
298 
299 
300   ////////////////////////////////////////////////////////////////////////////////////////////
301   // Methods from Object
302   ////////////////////////////////////////////////////////////////////////////////////////////
303 
304   /**
305    *  Returns a string representation of this object
306    */
307   public String toString()
308   {
309     StringBuilder buf = new StringBuilder();
310     buf.append("[Ehandler: LocalMsgs(");
311     buf.append(lmessages.size());
312     buf.append("), RemoteDatagrams(");
313     buf.append(rmessages.size());
314     buf.append("), ScheduledEvents(");
315     buf.append(events.size());
316     buf.append("), Events={ ");
317     for (ScheduledEvent scheduledEvent : events) {
318       buf.append(scheduledEvent);
319       buf.append(", ");
320     }
321     buf.append(" }]");
322     return buf.toString();
323   }
324 
325   //mos
326   /** last time show() was invoked or 0 */
327   private long lastShowTime = 0;
328 
329   /** interval between show times */
330   private final long interval = 2000;
331 
332   /**
333    *  Shows the contents of the event queues.  This method is called
334    *  from the run() loop of Ehandler.
335    */
336   private void logEventQueue()
337   {
338     long now = System.currentTimeMillis();
339     if (now-lastShowTime > interval) {
340       lastShowTime = now;
341       log.debug(this);
342     }
343   }
344 
345 } // END Ehandler