1 /*
2 * Copyright (c) 1998-2005 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18 package jgroup.relacs.mss;
19
20 import java.net.DatagramPacket;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27 import jgroup.relacs.events.Event;
28 import jgroup.util.ThreadMonitor;
29
30 import org.apache.log4j.Logger;
31
32 /**
33 * The <code>EventHandler</code> class handles all events, packets and
34 * timeout events generated by the various mss layer components and by
35 * the network interface threads.
36 *
37 * @author Hein Meling
38 * @since Jgroup 3.0
39 */
40 public class EventHandler
41 implements NIListener
42 {
43
44 ////////////////////////////////////////////////////////////////////////////////////////////
45 // Logger
46 ////////////////////////////////////////////////////////////////////////////////////////////
47
48 /** Obtain logger for this class */
49 private static final Logger log = Logger.getLogger(EventHandler.class);
50
51
52 ////////////////////////////////////////////////////////////////////////////////////////////
53 // Fields
54 ////////////////////////////////////////////////////////////////////////////////////////////
55
56 /** Upper layer */
57 private final EhandlerUser ehuser;
58
59 /** The executor used to schedule events, timeouts and packets */
60 private final ScheduledExecutorService executorService;
61
62
63 ////////////////////////////////////////////////////////////////////////////////////////////
64 // Constructor
65 ////////////////////////////////////////////////////////////////////////////////////////////
66
67 /**
68 * Constructs and starts the event handler.
69 *
70 * @param ehuser
71 * reference to the receiver of upcall events.
72 */
73 EventHandler(EhandlerUser ehuser)
74 {
75 /* Initialize the upper layer reference */
76 this.ehuser = ehuser;
77 /* Thread management */
78 ThreadFactory tFactory = new ThreadFactory() {
79 public Thread newThread(Runnable r) {
80 Thread ehandler = new Thread(r, "MssEventHandler");
81 ehandler.setDaemon(true);
82 ThreadMonitor.add(ehandler);
83 return ehandler;
84 }
85 };
86 executorService = Executors.newSingleThreadScheduledExecutor(tFactory);
87 }
88
89
90 ////////////////////////////////////////////////////////////////////////////////////////////
91 // Methods
92 ////////////////////////////////////////////////////////////////////////////////////////////
93
94 /**
95 * Stops the EventHandler task execution.
96 */
97 void kill()
98 {
99 executorService.shutdown();
100 }
101
102 /**
103 * Schedule remote packet for delivery up to the mss layer.
104 *
105 * @see jgroup.relacs.mss.NIListener#rnotify(java.net.DatagramPacket)
106 */
107 public void rnotify(final DatagramPacket msg)
108 {
109 Runnable task = new Runnable() {
110 /* Insert the packet in the upcall queue */
111 public void run() {
112 if (log.isDebugEnabled())
113 log.debug("remote msg: " + msg);
114 ehuser.rreceive(msg);
115 if (log.isDebugEnabled())
116 log.debug("remote msg execution completed");
117 }
118 };
119 executorService.submit(task);
120 if (log.isDebugEnabled())
121 log.debug("remote msg submitted for execution");
122 }
123
124 /**
125 * Schedule event for delivery down to the mss layer.
126 *
127 * @param event the event to be passed to the mss layer.
128 */
129 void lnotify(final Event event)
130 {
131 Runnable task = new Runnable() {
132 /* Insert the event in the downcall queue */
133 public void run() {
134 if (log.isDebugEnabled())
135 log.debug("local msg: " + event);
136 ehuser.lreceive(event);
137 if (log.isDebugEnabled())
138 log.debug("local msg execution completed");
139 }
140 };
141 executorService.submit(task);
142 if (log.isDebugEnabled())
143 log.debug("local msg submitted for execution");
144 }
145
146 /**
147 * Schedule a new timeout event. The event will be handled
148 * appropriately at the given time.
149 *
150 * @param timeout
151 * Timeout length in milliseconds till the event should be handled.
152 * @param event
153 * The event description to associate this timeout value with.
154 */
155 ScheduledEvent setTimeout(long timeout, final ScheduledEvent event)
156 {
157 Runnable task = new Runnable() {
158 /* Insert the event in the downcall queue */
159 public void run() {
160 if (log.isDebugEnabled())
161 log.debug("Delivering timeout event: " + event);
162 ehuser.treceive(event);
163 if (log.isDebugEnabled())
164 log.debug("Timeout execution completed");
165 }
166 };
167 ScheduledFuture handle = executorService.schedule(task, timeout, TimeUnit.MILLISECONDS);
168 event.setTimeout(timeout, handle);
169 return event;
170 }
171
172
173 /**
174 * Abort the timeout associated to the specified event.
175 */
176 void abortTimeout(ScheduledEvent event)
177 {
178 if (event != null)
179 event.abortTimeout();
180 }
181
182 } // END EventHandler