View Javadoc

1   /*
2    * Copyright (c) 1998-2003 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  package jgroup.relacs.gmi;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.ObjectOutputStream;
23  import java.io.OutputStream;
24  import java.util.EnumMap;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.locks.Condition;
29  import java.util.concurrent.locks.Lock;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import jgroup.core.MemberId;
33  import jgroup.relacs.gmi.protocols.ProtocolDispatcher;
34  import net.jini.jeri.Endpoint;
35  import net.jini.jeri.InboundRequest;
36  import net.jini.jeri.RequestDispatcher;
37  import net.jini.jeri.ServerEndpoint;
38  import net.jini.jeri.ServerEndpoint.ListenCookie;
39  import net.jini.jeri.ServerEndpoint.ListenEndpoint;
40  import net.jini.jeri.tcp.TcpServerEndpoint;
41  
42  import org.apache.log4j.Logger;
43  import org.apache.log4j.MDC;
44  import org.apache.log4j.NDC;
45  
46  /**
47   * Handles external group invocation requests.
48   *
49   * @author Tor Arve Stangeland
50   * @author Hein Meling
51   */
52  public class GroupRequestHandler
53    implements RequestDispatcher
54  {
55  
56    ////////////////////////////////////////////////////////////////////////////////////////////
57    // Logger
58    ////////////////////////////////////////////////////////////////////////////////////////////
59  
60    /** Obtain logger for this class */
61    private static final Logger log = Logger.getLogger(GroupRequestHandler.class);
62  
63  
64    ////////////////////////////////////////////////////////////////////////////////////////////
65    // Fields
66    ////////////////////////////////////////////////////////////////////////////////////////////
67  
68    /** Invocation dispatcher  */
69    private final Map<ListenEndpoint,Record> handles = new HashMap<ListenEndpoint,Record>();
70  
71    /** Mapping from a method semantics to the corresponding protocol dispatcher. */
72    private final EnumMap<MethodSemantics,ProtocolDispatcher> protocols =
73      new EnumMap<MethodSemantics,ProtocolDispatcher>(MethodSemantics.class);
74  
75    /** My member identifier */
76    private final MemberId me;
77  
78    /** Server group view identifier */
79    private final int groupId;
80  
81    /** Server endpoint */
82    private TcpServerEndpoint serverEndpoint;
83  
84    /**
85     * True if invocation requests should be blocked awaiting a new view.
86     * Initially, we do not accept any invocations until we have installed a view.
87     */
88    private volatile boolean blockInvocation = true;
89  
90    /** Lock and condition used to enable/disable method dispatching */
91    private final Lock lock = new ReentrantLock();
92    private final Condition notDispatching = lock.newCondition();
93  
94  
95    ////////////////////////////////////////////////////////////////////////////////////////////
96    // Constructors
97    ////////////////////////////////////////////////////////////////////////////////////////////
98  
99    /**
100    * Constructs a new request handler for the group.
101    */
102   public GroupRequestHandler(int gid, MemberId me)
103   {
104     this.groupId = gid;
105     this.me = me;
106   }
107 
108 
109   ////////////////////////////////////////////////////////////////////////////////////////////
110   // Public methods
111   ////////////////////////////////////////////////////////////////////////////////////////////
112 
113   /**
114    * Returns true if the given method semantics has registered its
115    * protocol instance.
116    */
117   public boolean hasProtocol(MethodSemantics semantics)
118   {
119     return protocols.containsKey(semantics);
120   }
121 
122   /**
123    * Associate the given protocol with the given method invocation semantics
124    * as stored in the internal table of protocols.  The method will override
125    * any existing mapping for the given invocation semantics.
126    */
127   public void addProtocol(MethodSemantics semantics, ProtocolDispatcher protocol)
128   {
129     protocols.put(semantics, protocol);
130   }
131 
132 
133   ////////////////////////////////////////////////////////////////////////////////////////////
134   // Methods from net.jini.jeri.RequestDispatcher
135   ////////////////////////////////////////////////////////////////////////////////////////////
136 
137   /**
138    * Handles inbound requests to this local endpoint (a member of a group).
139    * 
140    * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
141    */
142   public void dispatch(InboundRequest request)
143   {
144     if (log.isDebugEnabled())
145       MDC.put("group", "[Group: " + groupId + "]");
146     doDispatch(request);
147   }
148 
149 
150   /**
151    * Handles inbound requests to this local endpoint (a member of a group).
152    * 
153    * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
154    */
155   public void BLOCKING_dispatch(InboundRequest request)
156   {
157     if (log.isDebugEnabled())
158       MDC.put("group", "[Group: " + groupId + "]");
159 
160     /*
161      * Check if the invocation should be blocked to await a new view installation.
162      * Allowing layers (e.g. the merging layer) to complete the updating of the
163      * server state before performing new invocations which may itself update the
164      * server state.
165      * 
166      * Note that multiple threads (inbound invocation requests) may be blocked
167      * here, pending execution on some local server.  We have no control over
168      * which of the pending requests are executed first after the invocation
169      * dispatching is again unblocked.
170      */
171     lock.lock();
172     try {
173       while (blockInvocation) {
174         if (log.isDebugEnabled())
175           log.debug("Blocking invocation " + request);
176         // wait for signal to start dispatching again
177 //        notDispatching.awaitUninterruptibly();
178 
179         // wait for signal or timeout; and start dispatching again
180         try {
181           boolean elapsed = !notDispatching.await(5, TimeUnit.SECONDS);
182           if (elapsed) {
183             // Unblock since we haved waited for 5 seconds.
184             blockInvocation = false;
185             log.warn("UNBLOCKED DUE TO TIMEOUT");
186           }
187         } catch (InterruptedException e) { }
188 
189         if (!blockInvocation && log.isDebugEnabled())
190           log.debug("Unblocking invocation " + request);
191       }
192     } finally {
193       lock.unlock();
194     }
195     doDispatch(request);
196   }
197 
198 
199   /**
200    * Handles inbound requests to this local endpoint (a member of a group).
201    * 
202    * @see net.jini.jeri.RequestDispatcher#dispatch(InboundRequest)
203    */
204   public void CLIENT_BLOCKING_dispatch(InboundRequest request)
205   {
206     if (log.isDebugEnabled())
207       MDC.put("group", "[Group: " + groupId + "]");
208 
209     /*
210      * Check if the invocation should be blocked to await a new view installation.
211      * Allowing layers (e.g. the merging layer) to complete the updating of the
212      * server state before performing new invocations which may itself update the
213      * server state.
214      * 
215      * Note that multiple threads (inbound invocation requests) may be blocked
216      * here, pending execution on some local server.  We have no control over
217      * which of the pending requests are executed first after the invocation
218      * dispatching is again unblocked.
219      */
220     if (blockInvocation) {
221       if (log.isDebugEnabled())
222         log.debug("Blocked invocation " + request);
223       OutputStream out = request.getResponseOutputStream();
224       try {
225         out.write(InvocationResult.INVOCATION_BLOCKED);
226         // Flush and close the return object stream
227         out.flush();
228         out.close();
229       } catch (IOException ioex) {
230         // If an exception is thrown, something is wrong with the network (?)
231         log.warn("Failed to perform method invocation; aborting...", ioex);
232         request.abort();
233       }
234     } else {
235       doDispatch(request);
236     }
237   }
238 
239   /**
240    * Perform the method dispatching according to the protocol type incorporated
241    * within the inbound request object.  Note that methods may be dispatched in
242    * such a way that they interleave with other invocations using the same or a
243    * different protocol at the local endpoint.
244    * 
245    * Methods that cannot tolerate the above concurrency behavior must implement
246    * its own synchronization policy (locking synchronous access to its variables)
247    * at the server level, and may in addition use the Atomic protocol.
248    */
249   private void doDispatch(InboundRequest request)
250   {
251     // Get in/out streams from the inbound request object
252     InputStream in = request.getRequestInputStream();
253     OutputStream out = request.getResponseOutputStream();
254     try {
255       boolean invoked = false;
256       InvocationResult res = null;
257       int ordinal = in.read();
258       try {
259         MethodSemantics semantics = MethodSemantics.valueOf(ordinal);
260         // no need to synchronize access to 'protocols' since it is not modified after init
261         ProtocolDispatcher protocol = protocols.get(semantics);
262         if (protocol == null) {
263           log.warn("No protocol registered for: " + semantics);
264           out.write(InvocationResult.UNKNOWN_INVOCATION_SEMANTICS);
265         } else {
266           if (log.isDebugEnabled())
267             NDC.push(semantics.toString());
268           /*
269            * Note that we do not synchronize here to block invocations if other
270            * requests are being handled; this has to be handled by the server
271            * implementations themselves.
272            */
273           res = protocol.dispatch(in);
274           if (log.isDebugEnabled())
275             NDC.pop();
276           invoked = true;
277         }
278       } catch (ArrayIndexOutOfBoundsException e) {
279         log.warn("Unknown invocation semantics in call: " + ordinal);
280         out.write(InvocationResult.UNKNOWN_INVOCATION_SEMANTICS);
281       }
282 
283       if (invoked) {
284         if (res != null) {
285           // Write result to stream if successful
286           out.write(InvocationResult.INVOCATION_COMPLETED);
287           ObjectOutputStream oout = new ObjectOutputStream(out);
288           oout.writeObject(res);
289         } else {
290           log.warn("Invocation failed!");
291           out.write(InvocationResult.INVOCATION_FAILED);
292         }
293       }
294 
295       // Flush and close the return object stream
296       out.flush();
297       out.close();
298     } catch (IOException ioex) {
299       // If an exception is thrown, something is wrong with the network (?)
300       log.warn("Failed to perform method invocation; aborting...", ioex);
301       request.abort();
302     }
303   }
304 
305 
306   ////////////////////////////////////////////////////////////////////////////////////////////
307   // Public methods
308   ////////////////////////////////////////////////////////////////////////////////////////////
309 
310   /**
311    * Update local view, and unblock invocation dispatching.
312    */
313   void enableDispatching()
314   {
315     if (log.isDebugEnabled())
316       log.debug("Enable invocation dispatching");
317     lock.lock();
318     try {
319       // Unblock all pending invocations
320       blockInvocation = false;
321       notDispatching.signalAll();
322     } finally {
323       lock.unlock();
324     }
325   }
326 
327   /**
328    * To block invocation dispatching during view change periods.
329    */
330   void disableDispatching()
331   {
332     if (log.isDebugEnabled())
333       log.debug("Disable invocation dispatching");
334     lock.lock();
335     try {
336       // Block new invocations from now on and until enableDispatching is called
337       blockInvocation = true;
338     } finally {
339       lock.unlock();
340     }
341   }
342 
343   Endpoint initServerEndpoint()
344     throws IOException
345   {
346     if (serverEndpoint == null) {
347       int port = me.getServerPort();
348       // Create a ServerEndpoint on the given port.
349       serverEndpoint = TcpServerEndpoint.getInstance(port);
350       if (log.isDebugEnabled())
351         log.debug("Created server endpoint on port: " + port);
352     }
353     // Enumerate endpoints
354     return serverEndpoint.enumerateListenEndpoints(new GroupRequestHandler.ListenContext());
355   }
356 
357   ServerEndpoint getServerEndpoint()
358   {
359     return serverEndpoint;
360   }
361 
362 
363   ////////////////////////////////////////////////////////////////////////////////////////////
364   // Nested class for holding the listen handle and cookie
365   ////////////////////////////////////////////////////////////////////////////////////////////
366 
367   /** Describe this class */
368   private class Record
369   {
370     public ServerEndpoint.ListenHandle handle;
371     public ServerEndpoint.ListenCookie cookie;
372   }
373   
374 
375   ////////////////////////////////////////////////////////////////////////////////////////////
376   // Nested class implementing the ServerEndpoint.ListenContext interface
377   ////////////////////////////////////////////////////////////////////////////////////////////
378 
379   /**
380    * Internal listen context
381    */
382   private final class ListenContext
383     implements ServerEndpoint.ListenContext
384   {
385 
386     /**
387      * @see net.jini.jeri.ServerEndpoint.ListenContext#addListenEndpoint(ListenEndpoint)
388      */
389     public ListenCookie addListenEndpoint(ListenEndpoint listenEndpoint)
390       throws IOException 
391     {
392       Record r = handles.get(listenEndpoint);
393       if (r == null) {
394         // Start listening
395         r = new Record();
396         r.handle = listenEndpoint.listen(GroupRequestHandler.this);
397         r.cookie = r.handle.getCookie();
398         handles.put(listenEndpoint, r);
399       }
400       return r.cookie;
401     }
402 
403   } // END ListenContext
404 
405 } // END GroupRequestHandler