1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.rmi;
20
21 import java.io.InputStream;
22 import java.io.ObjectInputStream;
23 import java.io.ObjectOutputStream;
24 import java.io.OutputStream;
25 import java.lang.reflect.InvocationHandler;
26 import java.lang.reflect.Method;
27
28 import jgroup.core.Callback;
29 import jgroup.core.InternalGMIListener;
30 import jgroup.core.InternalGMIService;
31 import jgroup.core.JgroupException;
32 import jgroup.core.MemberId;
33 import jgroup.core.multicast.MulticastListener;
34 import jgroup.core.multicast.MulticastService;
35 import jgroup.relacs.gmi.GroupAckListener;
36 import jgroup.util.Abort;
37
38 import org.apache.log4j.Logger;
39
40 /**
41 * Handler for group internal invocations. It is used to dynamically
42 * generate a proxy for internal GMI.
43 *
44 * Note that the <code>InternalGMIService</code> interface is not
45 * implemented by the <code>IntGroupHandler</code> layer, but rather by
46 * the dynamically generated proxy object. The interface is included
47 * here, only to follow the rules of the layer configuration structure
48 * which does not deal with proxy based layers as is. That is a layer
49 * must implement its service interface
50 * (<code>InternalGMIService</code> in this case), and since the
51 * dynamically generated proxy is the object actually implementing the
52 * service interface, we must do this trick. The method from the
53 * <code>InternalGMIService</code> interface simply throws an
54 * <code>UnsupportedOperationException</code> and thus should never be
55 * called.
56 *
57 * @author Alberto Montresor
58 * @author Hein Meling
59 * @since Jgroup 0.9
60 */
61 public class IntGroupHandler
62 implements InternalGMIService, InvocationHandler, MulticastListener
63 {
64
65 ////////////////////////////////////////////////////////////////////////////////////////////
66 // Logger
67 ////////////////////////////////////////////////////////////////////////////////////////////
68
69 /** Obtain logger for this class */
70 private static final Logger log = Logger.getLogger(IntGroupHandler.class);
71
72
73 ////////////////////////////////////////////////////////////////////////////////////////////
74 // Static section
75 ////////////////////////////////////////////////////////////////////////////////////////////
76
77 /** Protocol name used to distinguish messages from other protocols. */
78 private static final String PROTOCOL_NAME = "IGMI";
79
80 /** Method invoke of interface InternalGMIService */
81 private static Method invokeMethod;
82
83 static {
84 try {
85 Class[] types = new Class[] { Method.class, Object[].class, Callback.class };
86 Class cl = InternalGMIService.class;
87 invokeMethod = cl.getDeclaredMethod("invoke", types);
88 } catch (NoSuchMethodException e) {
89 // Can't happen, unless InternalGMIService has been modified
90 Abort.exit("Method invoke() in class InternalGMIService not found", e, 1);
91 }
92 }
93
94
95 ////////////////////////////////////////////////////////////////////////////////////////////
96 // Fields
97 ////////////////////////////////////////////////////////////////////////////////////////////
98
99 /**
100 * The method table holds entries for all IGMI methods implemented
101 * by the server and/or layer that is registered as a listener for
102 * IGMI events.
103 */
104 private ServerMethodTable methodTable;
105
106 /** The multicast service */
107 private MulticastService multicastService;
108
109
110 ////////////////////////////////////////////////////////////////////////////////////////////
111 // Constructors
112 ////////////////////////////////////////////////////////////////////////////////////////////
113
114 /**
115 * Constructs a new <code>IntGroupHandler</code> object.
116 */
117 private IntGroupHandler(MulticastService multicastService)
118 {
119 this.multicastService = multicastService;
120 }
121
122
123 ////////////////////////////////////////////////////////////////////////////////////////////
124 // Static factory
125 ////////////////////////////////////////////////////////////////////////////////////////////
126
127 public static IntGroupHandler getLayer(MulticastService mcastService)
128 {
129 return new IntGroupHandler(mcastService);
130 }
131
132
133 ////////////////////////////////////////////////////////////////////////////////////////////
134 // Layer interface methods
135 ////////////////////////////////////////////////////////////////////////////////////////////
136
137 /**
138 * Add a server/layer that is listening for internal group method
139 * invocations.
140 *
141 * @exception IllegalStateException
142 * Thrown if this method has already been invoked with another
143 * listener for group method invocations. It is not allowed to
144 * have multiple local listeners for group method invocations.
145 * @exception IllegalArgumentException
146 * Raised if the specified listener is not an InternalGMIListener.
147 */
148 public void addListener(Object listener)
149 {
150 if (listener instanceof InternalGMIListener) {
151 /*
152 * Compute the method table for the internal interfaces
153 * implemented by the provided listener.
154 */
155 //FIXME use the new method table once it supports IGMI
156 if (methodTable == null)
157 methodTable = new ServerMethodTable();
158 methodTable.addMethods(listener, InternalGMIListener.class);
159 } else {
160 /*
161 * Since the InternalGMIListener is a marker interface, we cannot
162 * throw an exception here since the application using the IGMI
163 * service may actually use IGMI only in a layer or in the server
164 * object, and thus the addListener() method may get invoked also
165 * for objects (server or layer) that does not actually implement
166 * a InternalGMIListener interface. Thus, we just print a log
167 * message indicating this.
168 */
169 if (log.isDebugEnabled()) {
170 log.debug(listener.getClass().getName() + " does not implement an InternalGMIListener");
171 }
172 }
173 }
174
175
176 ////////////////////////////////////////////////////////////////////////////////////////////
177 // Methods from InternalGMIService interface
178 ////////////////////////////////////////////////////////////////////////////////////////////
179
180 /**
181 * Dummy method. The method of the InternalGMIService interface is
182 * not implemented IntGroupHandler layer, but the interface is
183 * implemented by the dynamically generated proxy object. This
184 * method is included here, only to follow the rules of the layer
185 * configuration structure which does not deal with proxy based
186 * layers as is. That is a layer must implement its service
187 * interface (InternalGMIService in this case), and since the
188 * dynamically generated proxy is the object actually implementing
189 * the service interface, we must do this trick.
190 */
191 public Object invoke(Method m, Object[] args, Callback callback)
192 throws Exception
193 {
194 throw new UnsupportedOperationException();
195 }
196
197
198 ////////////////////////////////////////////////////////////////////////////////////////////
199 // Methods from InvocationHandler interface
200 ////////////////////////////////////////////////////////////////////////////////////////////
201
202 /**
203 * Processes a method invocation on a proxy instance and returns
204 * the result. This method will be invoked on an invocation handler
205 * when a method is invoked on a proxy instance that it is
206 * associated with.
207 */
208 public Object invoke(Object proxy, Method m, Object[] args)
209 throws Throwable
210 {
211 if (log.isDebugEnabled())
212 log.debug("igmi: " + m);
213 boolean asynchronous = false;
214 Callback callback = null;
215
216 /*
217 * Check whether the method has asynchronous semantics; if so, read
218 * the arguments. Otherwise, just use the method directly with
219 * synchronous semantics.
220 */
221 if (m.equals(invokeMethod)) {
222 asynchronous = true;
223 m = (Method) args[0];
224 callback = (Callback) args[2];
225 args = (Object[]) args[1];
226 }
227
228 try {
229 /*
230 * Prepare message
231 */
232 long hash = methodTable.get(m);
233 OutputStream message = multicastService.getMessage(PROTOCOL_NAME);
234 ObjectOutputStream out = new MarshalOutputStream(message);
235 out.writeLong(hash);
236 int len = (args == null ? 0 : args.length);
237 out.writeInt(len);
238 for (int i=0; i < len; i++)
239 out.writeObject(args[i]);
240 out.flush();
241
242 if (asynchronous) {
243 // Asynchronous semantics
244 if (callback != null)
245 multicastService.mcast(message,
246 new AsynchAckListener(callback).getRemoteAckListener());
247 else
248 multicastService.mcast(message, null);
249 return null;
250 } else {
251 // Synchronous semantics
252 GroupAckListener listener = new GroupAckListener(this);
253 multicastService.mcast(message, listener.getRemoteAckListener());
254 return listener.getResults();
255 }
256
257 } catch (JgroupException e) {
258 throw e;
259 } catch (Exception e) {
260 log.warn("Unable to perform internal group method invocation", e);
261 throw e;
262 }
263 }
264
265 ////////////////////////////////////////////////////////////////////////////////////////////
266 // Methods from MulticastListener
267 ////////////////////////////////////////////////////////////////////////////////////////////
268
269 /**
270 * Returns a string naming the protocol implemented by this multicast
271 * listener.
272 */
273 public String getProtocolName()
274 {
275 return PROTOCOL_NAME;
276 }
277
278
279 /**
280 * Upcall that is invoked by Jgroup to deliver a message
281 * <code>msg</code> related to the <i>IGMI</i> protocol.
282 *
283 * @param inmsg the stream from which the message may be read.
284 */
285 public Object deliverStream(InputStream inmsg, MemberId sender, int seqNo)
286 {
287 if (log.isDebugEnabled())
288 log.debug("IntGroupHandler: received stream message");
289 try {
290 long hash;
291 Object[] args;
292 synchronized (inmsg) {
293 /*
294 * This must be synchronized on the inmsg object so as to avoid
295 * concurrent access to the inmsg object, since most methods in
296 * the InMessage class modify its state (the position field in
297 * particular), including methods such as seek(), read(), insert()
298 * and so on. Such concurrent access may occur when two competing
299 * threads (Ehandler and Dispatcher) want to access the inmsg object.
300 * For example, one thread may be reading the inmsg object for
301 * delivery to an upper-layer (see IntGroupHandler.deliverStream()),
302 * while another thread wants to send the inmsg to another member of
303 * the group, and thus needs to seek to the START_DATA position;
304 * see MsgMcast.getInMessage().
305 */
306 ObjectInputStream in = new MarshalInputStream(inmsg);
307 hash = in.readLong();
308 int size = in.readInt();
309 args = new Object[size];
310 for (int i=0; i < args.length; i++)
311 args[i] = in.readObject();
312 }
313
314 /* Obtain the method from the method hash */
315 Method m = methodTable.get(hash);
316 Object server = methodTable.getServer(hash);
317 try {
318 return m.invoke(server, args);
319 } catch (Exception e) {
320 return e;
321 }
322 } catch (Exception e) {
323 /*
324 * If we for some reason received an incorrectly formated message;
325 * we simply log the raised exception and ignore the message since
326 * we cannot know its origin. That is it may be spam.
327 */
328 log.warn("IntGroupHandler: Unable to unmarshal stream message", e);
329 return null;
330 }
331 }
332
333
334 /**
335 * Upcall that is invoked by Jgroup to deliver a message <code> obj</code>
336 * related to the group identified by <code> group</code>. In this case,
337 * the message is an object marshalled and unmarshalled through java
338 * serialization.
339 *
340 * @param obj the object containing the message.
341 */
342 public Object deliverObject(Object obj, MemberId sender, int seqNo)
343 {
344 throw new UnsupportedOperationException();
345 }
346
347 } // END IntGroupHandler