1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.gm;
20
21 import java.rmi.RemoteException;
22 import java.rmi.server.ExportException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadFactory;
28
29 import jgroup.core.ConfigManager;
30 import jgroup.core.JgroupException;
31 import jgroup.relacs.config.TransportConfig;
32 import jgroup.relacs.events.Event;
33 import jgroup.util.ThreadMonitor;
34 import net.jini.export.Exporter;
35 import net.jini.jeri.BasicILFactory;
36 import net.jini.jeri.BasicJeriExporter;
37 import net.jini.jeri.tcp.TcpServerEndpoint;
38
39 import org.apache.log4j.Logger;
40 import org.apache.log4j.MDC;
41
42
43
44
45
46
47
48
49 public class NewDispatcherLayer
50 implements DispatcherService, RemoteDispatcher, PingListener
51 {
52
53
54
55
56
57
58 private static final Logger log = Logger.getLogger(NewDispatcherLayer.class);
59
60
61
62
63
64
65
66 private static final int MAX_EVENT_TYPES = 20;
67
68
69
70
71
72
73
74 private volatile long lastPingReceived;
75
76
77 private int daemonSuspectTimeout;
78
79
80 private ExecutorService executorService;
81
82
83 private final List<DispatcherListener> eventTypes;
84
85
86 private Exporter exporter;
87
88
89
90
91
92
93
94
95
96 private NewDispatcherLayer()
97 {
98 eventTypes = new ArrayList<DispatcherListener>(MAX_EVENT_TYPES);
99 for (int i = 0; i < MAX_EVENT_TYPES; i++)
100 eventTypes.add(null);
101
102
103 TransportConfig tconf =
104 (TransportConfig) ConfigManager.getConfig(TransportConfig.class);
105 daemonSuspectTimeout = tconf.getDaemonSuspectTimeout();
106 }
107
108
109
110
111
112
113 public static NewDispatcherLayer getLayer()
114 {
115 return new NewDispatcherLayer();
116 }
117
118
119
120
121
122
123
124
125
126 public void addListener(Object listener)
127 {
128 if (listener instanceof DispatcherListener) {
129 DispatcherListener dispListener = (DispatcherListener) listener;
130 int[] events = dispListener.eventTypes();
131 for (int i = 0; i < events.length; i++) {
132 eventTypes.set(events[i], dispListener);
133 }
134 } else {
135 throw new IllegalArgumentException("Specified listener is not a DispatcherListener");
136 }
137 }
138
139
140
141
142
143
144
145
146
147 public void start(final int gid)
148 {
149 ThreadFactory tFactory = new ThreadFactory() {
150 public Thread newThread(Runnable r) {
151 Thread dispatcher = new Thread(r, "Dispatcher-" + gid);
152 dispatcher.setDaemon(true);
153 ThreadMonitor.add(dispatcher);
154 return dispatcher;
155 }
156 };
157
158 executorService = Executors.newFixedThreadPool(3, tFactory);
159 }
160
161
162
163
164 public void halt()
165 {
166
167 exporter.unexport(true);
168 executorService.shutdown();
169 }
170
171
172
173
174 public void dispatch(Object obj)
175 {
176 if (log.isDebugEnabled())
177 log.debug("Blocking on " + obj);
178 synchronized (obj) {
179 try {
180 obj.wait(daemonSuspectTimeout);
181 } catch (InterruptedException e) {
182 if (log.isDebugEnabled())
183 log.debug("Interrupted on " + obj);
184 return;
185 }
186 }
187
188 long timePassed = System.currentTimeMillis() - lastPingReceived;
189 if (timePassed > daemonSuspectTimeout) {
190 log.warn("Daemon suspected after: " + timePassed);
191 } else {
192 if (log.isDebugEnabled())
193 log.debug("Unblocked on " + obj);
194 }
195 }
196
197
198
199
200 public RemoteDispatcher getRemoteDispatcher() throws JgroupException
201 {
202 exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
203 new BasicILFactory());
204 try {
205 return (RemoteDispatcher) exporter.export(this);
206 } catch (ExportException e) {
207 throw new JgroupException("Could not export the dispatcher", e);
208 }
209 }
210
211
212
213
214 public void addEvent(final Event event) throws RemoteException
215 {
216 lastPingReceived = System.currentTimeMillis();
217
218 if (log.isDebugEnabled()) {
219 MDC.put("group", "[Group: " + event.getGid() + "]");
220 log.debug("Received daemon event: " + lastPingReceived + " " + event);
221 }
222 Runnable task = new Runnable() {
223 public void run() {
224 if (log.isDebugEnabled()) {
225 MDC.put("group", "[Group: " + event.getGid() + "]");
226 log.debug("Executing event: " + event);
227 }
228 DispatcherListener listener = eventTypes.get(event.getTag());
229 listener.notifyEvent(event);
230 if (log.isDebugEnabled())
231 log.debug("Event execution completed");
232 }
233 };
234 executorService.submit(task);
235 if (log.isDebugEnabled())
236 log.debug("Event submitted for execution");
237 }
238
239
240
241
242 public void ping() throws RemoteException
243 {
244 lastPingReceived = System.currentTimeMillis();
245 if (log.isDebugEnabled()) {
246 log.debug("PingReceived: " + lastPingReceived);
247 }
248 }
249
250
251
252
253 public String toString()
254 {
255 return " DISP ";
256 }
257
258 }