1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.net.DatagramPacket;
23 import java.util.Iterator;
24 import java.util.SortedSet;
25 import java.util.TreeSet;
26
27 import jgroup.relacs.events.Event;
28 import jgroup.util.Queue;
29 import jgroup.util.ThreadMonitor;
30
31 import org.apache.log4j.Logger;
32
33
34 /**
35 * The <code>Ehandler</code> class handles all events generated
36 * internally by the timer and externally by the local members and by
37 * the network interface threads.
38 *
39 * @author Alberto Montresor
40 * @author Hein Meling
41 * @since Jgroup 0.1
42 */
43 final class Ehandler
44 extends Thread
45 implements NIListener, MssConstants, MssTag
46 {
47
48 ////////////////////////////////////////////////////////////////////////////////////////////
49 // Logger
50 ////////////////////////////////////////////////////////////////////////////////////////////
51
52 /** Obtain logger for this class */
53 private static final Logger log = Logger.getLogger(Ehandler.class);
54
55
56 ////////////////////////////////////////////////////////////////////////////////////////////
57 // Fields
58 ////////////////////////////////////////////////////////////////////////////////////////////
59
60 /** Upper layer */
61 private final EhandlerUser ehuser;
62
63 /** Sorted list of scheduled events */
64 private final SortedSet<ScheduledEvent> events = new TreeSet<ScheduledEvent>();
65
66 /** Local message queue */
67 private final Queue lmessages = new Queue();
68
69 /** Remote message queue*/
70 private final Queue rmessages = new Queue();
71
72 /** Next remote message */
73 private DatagramPacket nextRemoteMessage;
74
75 /** Next local message */
76 private Event nextLocalMessage;
77
78 /** Next nextScheduledEvent */
79 private ScheduledEvent nextScheduledEvent;
80
81 /** Set to true when the Ehandler thread has to be stopped */
82 private boolean stop;
83
84
85 ////////////////////////////////////////////////////////////////////////////////////////////
86 // Constructor
87 ////////////////////////////////////////////////////////////////////////////////////////////
88
89 /**
90 * Constructs and starts the nextScheduledEvent handler.
91 *
92 * @param name
93 * the name of the nextScheduledEvent handler thread
94 * @param ehuser
95 * reference to the nextScheduledEvent handler user (used to generate upcalls).
96 */
97 Ehandler(String name, EhandlerUser ehuser)
98 throws IOException
99 {
100 /* Thread initialization */
101 super(name);
102
103 /* Data structure initialization */
104 this.ehuser = ehuser;
105
106 /* Thread management */
107 stop = false;
108 this.setDaemon(true);
109 this.setPriority(EHANDLER_PRIORITY);
110 this.start();
111 ThreadMonitor.add(this);
112 }
113
114 ////////////////////////////////////////////////////////////////////////////////////////////
115 // Methods
116 ////////////////////////////////////////////////////////////////////////////////////////////
117
118 /**
119 * Stops the Ehandler thread by setting stop to true.
120 */
121 void kill()
122 {
123 ThreadMonitor.remove(this);
124 stop = true;
125 }
126
127 /**
128 * Inserts a new timeout event. The event will be handled
129 * appropriately at the given time.
130 *
131 * @param timeout
132 * Timeout length in milliseconds till the event should be handled.
133 * @param event
134 * The event description to associate this timeout value with.
135 */
136 synchronized ScheduledEvent setTimeout(long timeout, ScheduledEvent event)
137 {
138 long time = System.currentTimeMillis() + timeout;
139 event.setTimeout(time);
140 /*
141 * Check if the event set contains several events at the exact same
142 * time (millisecond granularity); if so, we avoid this problem by
143 * incrementing the time value until we find an open time value for
144 * the event to occur on.
145 */
146 while (events.contains(event)) {
147 time++;
148 event.setTimeout(time);
149 }
150 events.add(event);
151 notifyAll();
152 return event;
153 }
154
155
156 /**
157 * Abort the timeout associated to the specified <CODE>event</CODE>.
158 * If <CODE>event</CODE> is different from <CODE>null</CODE>, it
159 * is removed from the event list.
160 */
161 synchronized void abortTimeout(ScheduledEvent event)
162 {
163 if (event != null)
164 events.remove(event);
165 }
166
167
168 /**
169 * Sends messages for local members by storing them in the <code>
170 * lmessages </code> queue.
171 *
172 * @param msg
173 * Local message to send
174 */
175 synchronized void lnotify(Object msg)
176 {
177 lmessages.insert(msg);
178 notifyAll();
179 }
180
181
182 /**
183 * Sends messages for remote members by storing them in the <code>
184 * rmessages </code> queue.
185 *
186 * @param msg
187 * Local message to send
188 */
189 public synchronized void rnotify(DatagramPacket packet)
190 {
191 rmessages.insert(packet);
192 notifyAll();
193 }
194
195
196 ////////////////////////////////////////////////////////////////////////////////////////////
197 // Run method of the thread
198 ////////////////////////////////////////////////////////////////////////////////////////////
199
200 /**
201 * Run method of the thread. It remains blocked until (i) a Datagram
202 * packet is received through the net; (ii) a message from a local
203 * member is received; (iii) a scheduled timeout expires.
204 */
205 public void run()
206 {
207 while (!stop) {
208 await();
209 if (log.isDebugEnabled())
210 logEventQueue();
211 if (nextScheduledEvent != null) {
212 if (log.isDebugEnabled())
213 log.debug("Scheduling event: " + nextScheduledEvent);
214 ehuser.treceive(nextScheduledEvent);
215 yield();
216 }
217 if (nextRemoteMessage != null) {
218 if (log.isDebugEnabled())
219 log.debug("Scheduling remote mesg: " + nextRemoteMessage);
220 ehuser.rreceive(nextRemoteMessage);
221 nextRemoteMessage = null;
222 yield();
223 }
224 if (nextLocalMessage != null) {
225 if (log.isDebugEnabled())
226 log.debug("Scheduling local mesg: " + nextLocalMessage);
227 ehuser.lreceive(nextLocalMessage);
228 nextLocalMessage = null;
229 yield();
230 }
231 }
232 }
233
234
235 ////////////////////////////////////////////////////////////////////////////////////////////
236 // Private methods
237 ////////////////////////////////////////////////////////////////////////////////////////////
238
239 /**
240 * Select next nextScheduledEvent and next messages
241 */
242 private synchronized void await()
243 {
244 /* Select next events and messages */
245 if (!events.isEmpty()) {
246 nextScheduledEvent = events.first();
247 }
248 nextRemoteMessage = (DatagramPacket) rmessages.removeFirst();
249 nextLocalMessage = (Event) lmessages.removeFirst();
250
251 if (nextRemoteMessage == null && nextLocalMessage == null) {
252 if (nextScheduledEvent == null) {
253 try { wait(); }
254 catch (InterruptedException e) {
255 log.warn("Ehandler:await:withoutTimeout", e);
256 }
257 } else {
258 long timeout = nextScheduledEvent.getTimeout() - System.currentTimeMillis();
259 if (timeout > 0) {
260 try { wait(timeout); }
261 catch (InterruptedException e) {
262 log.warn("Ehandler:await:withTimeout", e);
263 }
264 }
265 }
266 }
267
268 /*
269 * Check if the first timeout is expired
270 */
271 if (nextScheduledEvent != null) {
272 long timeout = nextScheduledEvent.getTimeout() - System.currentTimeMillis();
273 if (timeout <= 0) {
274 events.remove(nextScheduledEvent);
275 if (log.isDebugEnabled())
276 log.debug("Timeout expired: " + (-timeout) + " msec ago");
277 } else {
278 nextScheduledEvent = null;
279 }
280 }
281 }
282
283
284 /**
285 * Returns true if the local message queue contains undelivered
286 * messages for the specified group; otherwise, false is returned.
287 */
288 public boolean hasUndeliveredLocalMsgs(int gid)
289 {
290 for (Iterator iter = lmessages.iterator(); iter.hasNext();) {
291 Event event = (Event) iter.next();
292 if (event.getGid() == gid) {
293 return true;
294 }
295 }
296 return false;
297 }
298
299
300 ////////////////////////////////////////////////////////////////////////////////////////////
301 // Methods from Object
302 ////////////////////////////////////////////////////////////////////////////////////////////
303
304 /**
305 * Returns a string representation of this object
306 */
307 public String toString()
308 {
309 StringBuilder buf = new StringBuilder();
310 buf.append("[Ehandler: LocalMsgs(");
311 buf.append(lmessages.size());
312 buf.append("), RemoteDatagrams(");
313 buf.append(rmessages.size());
314 buf.append("), ScheduledEvents(");
315 buf.append(events.size());
316 buf.append("), Events={ ");
317 for (ScheduledEvent scheduledEvent : events) {
318 buf.append(scheduledEvent);
319 buf.append(", ");
320 }
321 buf.append(" }]");
322 return buf.toString();
323 }
324
325 //mos
326 /** last time show() was invoked or 0 */
327 private long lastShowTime = 0;
328
329 /** interval between show times */
330 private final long interval = 2000;
331
332 /**
333 * Shows the contents of the event queues. This method is called
334 * from the run() loop of Ehandler.
335 */
336 private void logEventQueue()
337 {
338 long now = System.currentTimeMillis();
339 if (now-lastShowTime > interval) {
340 lastShowTime = now;
341 log.debug(this);
342 }
343 }
344
345 } // END Ehandler