1 /*
2 * Copyright (c) 1998-2003 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18 package jgroup.relacs.gmi;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.ObjectOutputStream;
23 import java.io.OutputStream;
24 import java.util.EnumMap;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.locks.Condition;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31
32 import jgroup.core.MemberId;
33 import jgroup.relacs.gmi.protocols.ProtocolDispatcher;
34 import net.jini.jeri.Endpoint;
35 import net.jini.jeri.InboundRequest;
36 import net.jini.jeri.RequestDispatcher;
37 import net.jini.jeri.ServerEndpoint;
38 import net.jini.jeri.ServerEndpoint.ListenCookie;
39 import net.jini.jeri.ServerEndpoint.ListenEndpoint;
40 import net.jini.jeri.tcp.TcpServerEndpoint;
41
42 import org.apache.log4j.Logger;
43 import org.apache.log4j.MDC;
44 import org.apache.log4j.NDC;
45
46 /**
47 * Handles external group invocation requests.
48 *
49 * @author Tor Arve Stangeland
50 * @author Hein Meling
51 */
52 public class GroupRequestHandler
53 implements RequestDispatcher
54 {
55
56 ////////////////////////////////////////////////////////////////////////////////////////////
57 // Logger
58 ////////////////////////////////////////////////////////////////////////////////////////////
59
60 /** Obtain logger for this class */
61 private static final Logger log = Logger.getLogger(GroupRequestHandler.class);
62
63
64 ////////////////////////////////////////////////////////////////////////////////////////////
65 // Fields
66 ////////////////////////////////////////////////////////////////////////////////////////////
67
68 /** Invocation dispatcher */
69 private final Map<ListenEndpoint,Record> handles = new HashMap<ListenEndpoint,Record>();
70
71 /** Mapping from a method semantics to the corresponding protocol dispatcher. */
72 private final EnumMap<MethodSemantics,ProtocolDispatcher> protocols =
73 new EnumMap<MethodSemantics,ProtocolDispatcher>(MethodSemantics.class);
74
75 /** My member identifier */
76 private final MemberId me;
77
78 /** Server group view identifier */
79 private final int groupId;
80
81 /** Server endpoint */
82 private TcpServerEndpoint serverEndpoint;
83
84 /**
85 * True if invocation requests should be blocked awaiting a new view.
86 * Initially, we do not accept any invocations until we have installed a view.
87 */
88 private volatile boolean blockInvocation = true;
89
90 /** Lock and condition used to enable/disable method dispatching */
91 private final Lock lock = new ReentrantLock();
92 private final Condition notDispatching = lock.newCondition();
93
94
95 ////////////////////////////////////////////////////////////////////////////////////////////
96 // Constructors
97 ////////////////////////////////////////////////////////////////////////////////////////////
98
99 /**
100 * Constructs a new request handler for the group.
101 */
102 public GroupRequestHandler(int gid, MemberId me)
103 {
104 this.groupId = gid;
105 this.me = me;
106 }
107
108
109 ////////////////////////////////////////////////////////////////////////////////////////////
110 // Public methods
111 ////////////////////////////////////////////////////////////////////////////////////////////
112
113 /**
114 * Returns true if the given method semantics has registered its
115 * protocol instance.
116 */
117 public boolean hasProtocol(MethodSemantics semantics)
118 {
119 return protocols.containsKey(semantics);
120 }
121
122 /**
123 * Associate the given protocol with the given method invocation semantics
124 * as stored in the internal table of protocols. The method will override
125 * any existing mapping for the given invocation semantics.
126 */
127 public void addProtocol(MethodSemantics semantics, ProtocolDispatcher protocol)
128 {
129 protocols.put(semantics, protocol);
130 }
131
132
133 ////////////////////////////////////////////////////////////////////////////////////////////
134 // Methods from net.jini.jeri.RequestDispatcher
135 ////////////////////////////////////////////////////////////////////////////////////////////
136
137 /**
138 * Handles inbound requests to this local endpoint (a member of a group).
139 *
140 * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
141 */
142 public void dispatch(InboundRequest request)
143 {
144 if (log.isDebugEnabled())
145 MDC.put("group", "[Group: " + groupId + "]");
146 doDispatch(request);
147 }
148
149
150 /**
151 * Handles inbound requests to this local endpoint (a member of a group).
152 *
153 * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
154 */
155 public void BLOCKING_dispatch(InboundRequest request)
156 {
157 if (log.isDebugEnabled())
158 MDC.put("group", "[Group: " + groupId + "]");
159
160 /*
161 * Check if the invocation should be blocked to await a new view installation.
162 * Allowing layers (e.g. the merging layer) to complete the updating of the
163 * server state before performing new invocations which may itself update the
164 * server state.
165 *
166 * Note that multiple threads (inbound invocation requests) may be blocked
167 * here, pending execution on some local server. We have no control over
168 * which of the pending requests are executed first after the invocation
169 * dispatching is again unblocked.
170 */
171 lock.lock();
172 try {
173 while (blockInvocation) {
174 if (log.isDebugEnabled())
175 log.debug("Blocking invocation " + request);
176 // wait for signal to start dispatching again
177 // notDispatching.awaitUninterruptibly();
178
179 // wait for signal or timeout; and start dispatching again
180 try {
181 boolean elapsed = !notDispatching.await(5, TimeUnit.SECONDS);
182 if (elapsed) {
183 // Unblock since we haved waited for 5 seconds.
184 blockInvocation = false;
185 log.warn("UNBLOCKED DUE TO TIMEOUT");
186 }
187 } catch (InterruptedException e) { }
188
189 if (!blockInvocation && log.isDebugEnabled())
190 log.debug("Unblocking invocation " + request);
191 }
192 } finally {
193 lock.unlock();
194 }
195 doDispatch(request);
196 }
197
198
199 /**
200 * Handles inbound requests to this local endpoint (a member of a group).
201 *
202 * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
203 */
204 public void CLIENT_BLOCKING_dispatch(InboundRequest request)
205 {
206 if (log.isDebugEnabled())
207 MDC.put("group", "[Group: " + groupId + "]");
208
209 /*
210 * Check if the invocation should be blocked to await a new view installation.
211 * Allowing layers (e.g. the merging layer) to complete the updating of the
212 * server state before performing new invocations which may itself update the
213 * server state.
214 *
215 * Note that multiple threads (inbound invocation requests) may be blocked
216 * here, pending execution on some local server. We have no control over
217 * which of the pending requests are executed first after the invocation
218 * dispatching is again unblocked.
219 */
220 if (blockInvocation) {
221 if (log.isDebugEnabled())
222 log.debug("Blocked invocation " + request);
223 OutputStream out = request.getResponseOutputStream();
224 try {
225 out.write(InvocationResult.INVOCATION_BLOCKED);
226 // Flush and close the return object stream
227 out.flush();
228 out.close();
229 } catch (IOException ioex) {
230 // If an exception is thrown, something is wrong with the network (?)
231 log.warn("Failed to perform method invocation; aborting...", ioex);
232 request.abort();
233 }
234 } else {
235 doDispatch(request);
236 }
237 }
238
239 /**
240 * Perform the method dispatching according to the protocol type incorporated
241 * within the inbound request object. Note that methods may be dispatched in
242 * such a way that they interleave with other invocations using the same or a
243 * different protocol at the local endpoint.
244 *
245 * Methods that cannot tolerate the above concurrency behavior must implement
246 * its own synchronization policy (locking synchronous access to its variables)
247 * at the server level, and may in addition use the Atomic protocol.
248 */
249 private void doDispatch(InboundRequest request)
250 {
251 // Get in/out streams from the inbound request object
252 InputStream in = request.getRequestInputStream();
253 OutputStream out = request.getResponseOutputStream();
254 try {
255 boolean invoked = false;
256 InvocationResult res = null;
257 int ordinal = in.read();
258 try {
259 MethodSemantics semantics = MethodSemantics.valueOf(ordinal);
260 // no need to synchronize access to 'protocols' since it is not modified after init
261 ProtocolDispatcher protocol = protocols.get(semantics);
262 if (protocol == null) {
263 log.warn("No protocol registered for: " + semantics);
264 out.write(InvocationResult.UNKNOWN_INVOCATION_SEMANTICS);
265 } else {
266 if (log.isDebugEnabled())
267 NDC.push(semantics.toString());
268 /*
269 * Note that we do not synchronize here to block invocations if other
270 * requests are being handled; this has to be handled by the server
271 * implementations themselves.
272 */
273 res = protocol.dispatch(in);
274 if (log.isDebugEnabled())
275 NDC.pop();
276 invoked = true;
277 }
278 } catch (ArrayIndexOutOfBoundsException e) {
279 log.warn("Unknown invocation semantics in call: " + ordinal);
280 out.write(InvocationResult.UNKNOWN_INVOCATION_SEMANTICS);
281 }
282
283 if (invoked) {
284 if (res != null) {
285 // Write result to stream if successful
286 out.write(InvocationResult.INVOCATION_COMPLETED);
287 ObjectOutputStream oout = new ObjectOutputStream(out);
288 oout.writeObject(res);
289 } else {
290 log.warn("Invocation failed!");
291 out.write(InvocationResult.INVOCATION_FAILED);
292 }
293 }
294
295 // Flush and close the return object stream
296 out.flush();
297 out.close();
298 } catch (IOException ioex) {
299 // If an exception is thrown, something is wrong with the network (?)
300 log.warn("Failed to perform method invocation; aborting...", ioex);
301 request.abort();
302 }
303 }
304
305
306 ////////////////////////////////////////////////////////////////////////////////////////////
307 // Public methods
308 ////////////////////////////////////////////////////////////////////////////////////////////
309
310 /**
311 * Update local view, and unblock invocation dispatching.
312 */
313 void enableDispatching()
314 {
315 if (log.isDebugEnabled())
316 log.debug("Enable invocation dispatching");
317 lock.lock();
318 try {
319 // Unblock all pending invocations
320 blockInvocation = false;
321 notDispatching.signalAll();
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 /**
328 * To block invocation dispatching during view change periods.
329 */
330 void disableDispatching()
331 {
332 if (log.isDebugEnabled())
333 log.debug("Disable invocation dispatching");
334 lock.lock();
335 try {
336 // Block new invocations from now on and until enableDispatching is called
337 blockInvocation = true;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 Endpoint initServerEndpoint()
344 throws IOException
345 {
346 if (serverEndpoint == null) {
347 int port = me.getServerPort();
348 // Create a ServerEndpoint on the given port.
349 serverEndpoint = TcpServerEndpoint.getInstance(port);
350 if (log.isDebugEnabled())
351 log.debug("Created server endpoint on port: " + port);
352 }
353 // Enumerate endpoints
354 return serverEndpoint.enumerateListenEndpoints(new GroupRequestHandler.ListenContext());
355 }
356
357 ServerEndpoint getServerEndpoint()
358 {
359 return serverEndpoint;
360 }
361
362
363 ////////////////////////////////////////////////////////////////////////////////////////////
364 // Nested class for holding the listen handle and cookie
365 ////////////////////////////////////////////////////////////////////////////////////////////
366
367 /** Describe this class */
368 private class Record
369 {
370 public ServerEndpoint.ListenHandle handle;
371 public ServerEndpoint.ListenCookie cookie;
372 }
373
374
375 ////////////////////////////////////////////////////////////////////////////////////////////
376 // Nested class implementing the ServerEndpoint.ListenContext interface
377 ////////////////////////////////////////////////////////////////////////////////////////////
378
379 /**
380 * Internal listen context
381 */
382 private final class ListenContext
383 implements ServerEndpoint.ListenContext
384 {
385
386 /**
387 * @see net.jini.jeri.ServerEndpoint.ListenContext#addListenEndpoint(ListenEndpoint)
388 */
389 public ListenCookie addListenEndpoint(ListenEndpoint listenEndpoint)
390 throws IOException
391 {
392 Record r = handles.get(listenEndpoint);
393 if (r == null) {
394 // Start listening
395 r = new Record();
396 r.handle = listenEndpoint.listen(GroupRequestHandler.this);
397 r.cookie = r.handle.getCookie();
398 handles.put(listenEndpoint, r);
399 }
400 return r.cookie;
401 }
402
403 } // END ListenContext
404
405 } // END GroupRequestHandler