View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.rmi;
20  
21  import java.io.InputStream;
22  import java.io.ObjectInputStream;
23  import java.io.ObjectOutputStream;
24  import java.io.OutputStream;
25  import java.lang.reflect.InvocationHandler;
26  import java.lang.reflect.Method;
27  
28  import jgroup.core.Callback;
29  import jgroup.core.InternalGMIListener;
30  import jgroup.core.InternalGMIService;
31  import jgroup.core.JgroupException;
32  import jgroup.core.MemberId;
33  import jgroup.core.multicast.MulticastListener;
34  import jgroup.core.multicast.MulticastService;
35  import jgroup.relacs.gmi.GroupAckListener;
36  import jgroup.util.Abort;
37  
38  import org.apache.log4j.Logger;
39  
40  /**
41   *  Handler for group internal invocations.  It is used to dynamically
42   *  generate a proxy for internal GMI.
43   *
44   *  Note that the <code>InternalGMIService</code> interface is not
45   *  implemented by the <code>IntGroupHandler</code> layer, but rather by
46   *  the dynamically generated proxy object.  The interface is included
47   *  here, only to follow the rules of the layer configuration structure
48   *  which does not deal with proxy based layers as is.  That is a layer
49   *  must implement its service interface
50   *  (<code>InternalGMIService</code> in this case), and since the
51   *  dynamically generated proxy is the object actually implementing the
52   *  service interface, we must do this trick.  The method from the
53   *  <code>InternalGMIService</code> interface simply throws an
54   *  <code>UnsupportedOperationException</code> and thus should never be
55   *  called.
56   *
57   *  @author Alberto Montresor
58   *  @author Hein Meling
59   *  @since Jgroup 0.9
60   */
61  public class IntGroupHandler
62    implements InternalGMIService, InvocationHandler, MulticastListener
63  {
64  
65    ////////////////////////////////////////////////////////////////////////////////////////////
66    // Logger
67    ////////////////////////////////////////////////////////////////////////////////////////////
68  
69    /** Obtain logger for this class */
70    private static final Logger log = Logger.getLogger(IntGroupHandler.class);
71  
72  
73    ////////////////////////////////////////////////////////////////////////////////////////////
74    // Static section
75    ////////////////////////////////////////////////////////////////////////////////////////////
76  
77    /** Protocol name used to distinguish messages from other protocols. */
78    private static final String PROTOCOL_NAME = "IGMI";
79  
80    /** Method invoke of interface InternalGMIService */
81    private static Method invokeMethod;
82  
83    static {
84      try {
85        Class[] types = new Class[] { Method.class, Object[].class, Callback.class };
86        Class cl = InternalGMIService.class;
87        invokeMethod = cl.getDeclaredMethod("invoke", types);
88      } catch (NoSuchMethodException e) {
89        // Can't happen, unless InternalGMIService has been modified
90        Abort.exit("Method invoke() in class InternalGMIService not found", e, 1);
91      }
92    }
93  
94  
95    ////////////////////////////////////////////////////////////////////////////////////////////
96    // Fields
97    ////////////////////////////////////////////////////////////////////////////////////////////
98  
99    /**
100    *  The method table holds entries for all IGMI methods implemented
101    *  by the server and/or layer that is registered as a listener for
102    *  IGMI events.
103    */
104   private ServerMethodTable methodTable;
105 
106   /** The multicast service */
107   private MulticastService multicastService;
108 
109 
110   ////////////////////////////////////////////////////////////////////////////////////////////
111   // Constructors
112   ////////////////////////////////////////////////////////////////////////////////////////////
113 
114   /**
115    *  Constructs a new <code>IntGroupHandler</code> object.
116    */
117   private IntGroupHandler(MulticastService multicastService)
118   {
119     this.multicastService = multicastService;
120   }
121 
122 
123   ////////////////////////////////////////////////////////////////////////////////////////////
124   // Static factory
125   ////////////////////////////////////////////////////////////////////////////////////////////
126 
127   public static IntGroupHandler getLayer(MulticastService mcastService)
128   {
129     return new IntGroupHandler(mcastService);
130   }
131 
132 
133   ////////////////////////////////////////////////////////////////////////////////////////////
134   // Layer interface methods
135   ////////////////////////////////////////////////////////////////////////////////////////////
136 
137   /**
138    *  Add a server/layer that is listening for internal group method
139    *  invocations.
140    *
141    *  @exception IllegalStateException
142    *    Thrown if this method has already been invoked with another
143    *    listener for group method invocations.  It is not allowed to
144    *    have multiple local listeners for group method invocations.
145    *  @exception IllegalArgumentException
146    *    Raised if the specified listener is not an InternalGMIListener.
147    */
148   public void addListener(Object listener)
149   {
150     if (listener instanceof InternalGMIListener) {
151       /*
152        * Compute the method table for the internal interfaces
153        * implemented by the provided listener.
154        */
155       //FIXME use the new method table once it supports IGMI
156       if (methodTable == null)
157         methodTable = new ServerMethodTable();
158       methodTable.addMethods(listener, InternalGMIListener.class);
159     } else {
160       /*
161        * Since the InternalGMIListener is a marker interface, we cannot
162        * throw an exception here since the application using the IGMI
163        * service may actually use IGMI only in a layer or in the server
164        * object, and thus the addListener() method may get invoked also
165        * for objects (server or layer) that does not actually implement
166        * a InternalGMIListener interface.  Thus, we just print a log
167        * message indicating this.
168        */
169       if (log.isDebugEnabled()) {
170         log.debug(listener.getClass().getName() + " does not implement an InternalGMIListener");
171       }
172     }
173   }
174 
175 
176   ////////////////////////////////////////////////////////////////////////////////////////////
177   // Methods from InternalGMIService interface
178   ////////////////////////////////////////////////////////////////////////////////////////////
179 
180   /**
181    *  Dummy method.  The method of the InternalGMIService interface is
182    *  not implemented IntGroupHandler layer, but the interface is
183    *  implemented by the dynamically generated proxy object.  This
184    *  method is included here, only to follow the rules of the layer
185    *  configuration structure which does not deal with proxy based
186    *  layers as is.  That is a layer must implement its service
187    *  interface (InternalGMIService in this case), and since the
188    *  dynamically generated proxy is the object actually implementing
189    *  the service interface, we must do this trick.
190    */
191   public Object invoke(Method m, Object[] args, Callback callback)
192     throws Exception
193   {
194     throw new UnsupportedOperationException();
195   }
196 
197 
198   ////////////////////////////////////////////////////////////////////////////////////////////
199   // Methods from InvocationHandler interface
200   ////////////////////////////////////////////////////////////////////////////////////////////
201 
202   /**
203    * Processes a method invocation on a proxy instance and returns
204    * the result.  This method will be invoked on an invocation handler
205    * when a method is invoked on a proxy instance that it is
206    * associated with.
207    */
208   public Object invoke(Object proxy, Method m, Object[] args)
209     throws Throwable
210   {
211     if (log.isDebugEnabled())
212       log.debug("igmi: " + m);
213     boolean asynchronous = false;
214     Callback callback = null;
215 
216     /*
217      * Check whether the method has asynchronous semantics; if so, read
218      * the arguments.  Otherwise, just use the method directly with
219      * synchronous semantics.
220      */
221     if (m.equals(invokeMethod)) {
222       asynchronous = true;
223       m = (Method) args[0];
224       callback = (Callback) args[2];
225       args = (Object[]) args[1];
226     }
227 
228     try {
229       /*
230        * Prepare message
231        */
232       long hash = methodTable.get(m);
233       OutputStream message = multicastService.getMessage(PROTOCOL_NAME);
234       ObjectOutputStream out = new MarshalOutputStream(message);
235       out.writeLong(hash);
236       int len = (args == null ? 0 : args.length); 
237       out.writeInt(len);
238       for (int i=0; i < len; i++)
239         out.writeObject(args[i]);
240       out.flush();
241 
242       if (asynchronous) {
243         // Asynchronous semantics
244         if (callback != null)
245           multicastService.mcast(message,
246               new AsynchAckListener(callback).getRemoteAckListener());
247         else
248           multicastService.mcast(message, null);
249         return null;
250       } else {
251         // Synchronous semantics
252         GroupAckListener listener = new GroupAckListener(this);
253         multicastService.mcast(message, listener.getRemoteAckListener());
254         return listener.getResults();
255       }
256 
257     } catch (JgroupException e) {
258       throw e;
259     } catch (Exception e) {
260       log.warn("Unable to perform internal group method invocation", e);
261       throw e;
262     }
263   }
264 
265   ////////////////////////////////////////////////////////////////////////////////////////////
266   // Methods from MulticastListener
267   ////////////////////////////////////////////////////////////////////////////////////////////
268 
269   /**
270    *  Returns a string naming the protocol implemented by this multicast
271    *  listener.
272    */
273   public String getProtocolName() 
274   {
275     return PROTOCOL_NAME;
276   }
277 
278 
279   /**
280    *  Upcall that is invoked by Jgroup to deliver a message
281    *  <code>msg</code> related to the <i>IGMI</i> protocol.
282    *
283    *  @param inmsg      the stream from which the message may be read.
284    */
285   public Object deliverStream(InputStream inmsg, MemberId sender, int seqNo)
286   {
287     if (log.isDebugEnabled())
288       log.debug("IntGroupHandler: received stream message");
289     try {
290       long hash;
291       Object[] args;
292       synchronized (inmsg) {
293         /*
294          * This must be synchronized on the inmsg object so as to avoid
295          * concurrent access to the inmsg object, since most methods in 
296          * the InMessage class modify its state (the position field in
297          * particular), including methods such as seek(), read(), insert()
298          * and so on.  Such concurrent access may occur when two competing
299          * threads (Ehandler and Dispatcher) want to access the inmsg object.
300          * For example, one thread may be reading the inmsg object for
301          * delivery to an upper-layer (see IntGroupHandler.deliverStream()),
302          * while another thread wants to send the inmsg to another member of
303          * the group, and thus needs to seek to the START_DATA position;
304          * see MsgMcast.getInMessage().
305          */
306         ObjectInputStream in = new MarshalInputStream(inmsg);
307         hash = in.readLong();
308         int size = in.readInt();
309         args = new Object[size];
310         for (int i=0; i < args.length; i++)
311           args[i] = in.readObject();
312       }
313 
314       /* Obtain the method from the method hash */
315       Method m = methodTable.get(hash);
316       Object server = methodTable.getServer(hash);
317       try {
318         return m.invoke(server, args);
319       } catch (Exception e) {
320         return e;
321       }
322     } catch (Exception e) {
323       /*
324        * If we for some reason received an incorrectly formated message;
325        * we simply log the raised exception and ignore the message since
326        * we cannot know its origin.  That is it may be spam.
327        */
328       log.warn("IntGroupHandler: Unable to unmarshal stream message", e);
329       return null;
330     }
331   }
332 
333 
334   /**
335    *  Upcall that is invoked by Jgroup to deliver a message <code> obj</code>
336    *  related to the group identified by <code> group</code>. In this case,
337    *  the message is an object marshalled and unmarshalled through java
338    *  serialization.
339    *
340    *  @param obj        the object containing the message.
341    */
342   public Object deliverObject(Object obj, MemberId sender, int seqNo)
343   {
344     throw new UnsupportedOperationException();
345   }
346 
347 } // END IntGroupHandler