1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package jgroup.relacs.mss;
19
20 import java.net.DatagramPacket;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27 import jgroup.relacs.events.Event;
28 import jgroup.util.ThreadMonitor;
29
30 import org.apache.log4j.Logger;
31
32
33
34
35
36
37
38
39
40 public class EventHandler
41 implements NIListener
42 {
43
44
45
46
47
48
49 private static final Logger log = Logger.getLogger(EventHandler.class);
50
51
52
53
54
55
56
57 private final EhandlerUser ehuser;
58
59
60 private final ScheduledExecutorService executorService;
61
62
63
64
65
66
67
68
69
70
71
72
73 EventHandler(EhandlerUser ehuser)
74 {
75
76 this.ehuser = ehuser;
77
78 ThreadFactory tFactory = new ThreadFactory() {
79 public Thread newThread(Runnable r) {
80 Thread ehandler = new Thread(r, "MssEventHandler");
81 ehandler.setDaemon(true);
82 ThreadMonitor.add(ehandler);
83 return ehandler;
84 }
85 };
86 executorService = Executors.newSingleThreadScheduledExecutor(tFactory);
87 }
88
89
90
91
92
93
94
95
96
97 void kill()
98 {
99 executorService.shutdown();
100 }
101
102
103
104
105
106
107 public void rnotify(final DatagramPacket msg)
108 {
109 Runnable task = new Runnable() {
110
111 public void run() {
112 if (log.isDebugEnabled())
113 log.debug("remote msg: " + msg);
114 ehuser.rreceive(msg);
115 if (log.isDebugEnabled())
116 log.debug("remote msg execution completed");
117 }
118 };
119 executorService.submit(task);
120 if (log.isDebugEnabled())
121 log.debug("remote msg submitted for execution");
122 }
123
124
125
126
127
128
129 void lnotify(final Event event)
130 {
131 Runnable task = new Runnable() {
132
133 public void run() {
134 if (log.isDebugEnabled())
135 log.debug("local msg: " + event);
136 ehuser.lreceive(event);
137 if (log.isDebugEnabled())
138 log.debug("local msg execution completed");
139 }
140 };
141 executorService.submit(task);
142 if (log.isDebugEnabled())
143 log.debug("local msg submitted for execution");
144 }
145
146
147
148
149
150
151
152
153
154
155 ScheduledEvent setTimeout(long timeout, final ScheduledEvent event)
156 {
157 Runnable task = new Runnable() {
158
159 public void run() {
160 if (log.isDebugEnabled())
161 log.debug("Delivering timeout event: " + event);
162 ehuser.treceive(event);
163 if (log.isDebugEnabled())
164 log.debug("Timeout execution completed");
165 }
166 };
167 ScheduledFuture handle = executorService.schedule(task, timeout, TimeUnit.MILLISECONDS);
168 event.setTimeout(timeout, handle);
169 return event;
170 }
171
172
173
174
175
176 void abortTimeout(ScheduledEvent event)
177 {
178 if (event != null)
179 event.abortTimeout();
180 }
181
182 }