1 /* 2 * Copyright (c) 1998-2002 The Jgroup Team. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2 as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU Lesser General Public License for more details. 12 * 13 * You should have received a copy of the GNU Lesser General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 16 * 17 */ 18 19 package jgroup.relacs.rmi; 20 21 import java.io.InputStream; 22 import java.io.ObjectInputStream; 23 import java.io.ObjectOutputStream; 24 import java.io.OutputStream; 25 import java.lang.reflect.InvocationHandler; 26 import java.lang.reflect.Method; 27 28 import jgroup.core.Callback; 29 import jgroup.core.InternalGMIListener; 30 import jgroup.core.InternalGMIService; 31 import jgroup.core.JgroupException; 32 import jgroup.core.MemberId; 33 import jgroup.core.multicast.MulticastListener; 34 import jgroup.core.multicast.MulticastService; 35 import jgroup.relacs.gmi.GroupAckListener; 36 import jgroup.util.Abort; 37 38 import org.apache.log4j.Logger; 39 40 /** 41 * Handler for group internal invocations. It is used to dynamically 42 * generate a proxy for internal GMI. 43 * 44 * Note that the <code>InternalGMIService</code> interface is not 45 * implemented by the <code>IntGroupHandler</code> layer, but rather by 46 * the dynamically generated proxy object. The interface is included 47 * here, only to follow the rules of the layer configuration structure 48 * which does not deal with proxy based layers as is. That is a layer 49 * must implement its service interface 50 * (<code>InternalGMIService</code> in this case), and since the 51 * dynamically generated proxy is the object actually implementing the 52 * service interface, we must do this trick. The method from the 53 * <code>InternalGMIService</code> interface simply throws an 54 * <code>UnsupportedOperationException</code> and thus should never be 55 * called. 56 * 57 * @author Alberto Montresor 58 * @author Hein Meling 59 * @since Jgroup 0.9 60 */ 61 public class IntGroupHandler 62 implements InternalGMIService, InvocationHandler, MulticastListener 63 { 64 65 //////////////////////////////////////////////////////////////////////////////////////////// 66 // Logger 67 //////////////////////////////////////////////////////////////////////////////////////////// 68 69 /** Obtain logger for this class */ 70 private static final Logger log = Logger.getLogger(IntGroupHandler.class); 71 72 73 //////////////////////////////////////////////////////////////////////////////////////////// 74 // Static section 75 //////////////////////////////////////////////////////////////////////////////////////////// 76 77 /** Protocol name used to distinguish messages from other protocols. */ 78 private static final String PROTOCOL_NAME = "IGMI"; 79 80 /** Method invoke of interface InternalGMIService */ 81 private static Method invokeMethod; 82 83 static { 84 try { 85 Class[] types = new Class[] { Method.class, Object[].class, Callback.class }; 86 Class cl = InternalGMIService.class; 87 invokeMethod = cl.getDeclaredMethod("invoke", types); 88 } catch (NoSuchMethodException e) { 89 // Can't happen, unless InternalGMIService has been modified 90 Abort.exit("Method invoke() in class InternalGMIService not found", e, 1); 91 } 92 } 93 94 95 //////////////////////////////////////////////////////////////////////////////////////////// 96 // Fields 97 //////////////////////////////////////////////////////////////////////////////////////////// 98 99 /** 100 * The method table holds entries for all IGMI methods implemented 101 * by the server and/or layer that is registered as a listener for 102 * IGMI events. 103 */ 104 private ServerMethodTable methodTable; 105 106 /** The multicast service */ 107 private MulticastService multicastService; 108 109 110 //////////////////////////////////////////////////////////////////////////////////////////// 111 // Constructors 112 //////////////////////////////////////////////////////////////////////////////////////////// 113 114 /** 115 * Constructs a new <code>IntGroupHandler</code> object. 116 */ 117 private IntGroupHandler(MulticastService multicastService) 118 { 119 this.multicastService = multicastService; 120 } 121 122 123 //////////////////////////////////////////////////////////////////////////////////////////// 124 // Static factory 125 //////////////////////////////////////////////////////////////////////////////////////////// 126 127 public static IntGroupHandler getLayer(MulticastService mcastService) 128 { 129 return new IntGroupHandler(mcastService); 130 } 131 132 133 //////////////////////////////////////////////////////////////////////////////////////////// 134 // Layer interface methods 135 //////////////////////////////////////////////////////////////////////////////////////////// 136 137 /** 138 * Add a server/layer that is listening for internal group method 139 * invocations. 140 * 141 * @exception IllegalStateException 142 * Thrown if this method has already been invoked with another 143 * listener for group method invocations. It is not allowed to 144 * have multiple local listeners for group method invocations. 145 * @exception IllegalArgumentException 146 * Raised if the specified listener is not an InternalGMIListener. 147 */ 148 public void addListener(Object listener) 149 { 150 if (listener instanceof InternalGMIListener) { 151 /* 152 * Compute the method table for the internal interfaces 153 * implemented by the provided listener. 154 */ 155 //FIXME use the new method table once it supports IGMI 156 if (methodTable == null) 157 methodTable = new ServerMethodTable(); 158 methodTable.addMethods(listener, InternalGMIListener.class); 159 } else { 160 /* 161 * Since the InternalGMIListener is a marker interface, we cannot 162 * throw an exception here since the application using the IGMI 163 * service may actually use IGMI only in a layer or in the server 164 * object, and thus the addListener() method may get invoked also 165 * for objects (server or layer) that does not actually implement 166 * a InternalGMIListener interface. Thus, we just print a log 167 * message indicating this. 168 */ 169 if (log.isDebugEnabled()) { 170 log.debug(listener.getClass().getName() + " does not implement an InternalGMIListener"); 171 } 172 } 173 } 174 175 176 //////////////////////////////////////////////////////////////////////////////////////////// 177 // Methods from InternalGMIService interface 178 //////////////////////////////////////////////////////////////////////////////////////////// 179 180 /** 181 * Dummy method. The method of the InternalGMIService interface is 182 * not implemented IntGroupHandler layer, but the interface is 183 * implemented by the dynamically generated proxy object. This 184 * method is included here, only to follow the rules of the layer 185 * configuration structure which does not deal with proxy based 186 * layers as is. That is a layer must implement its service 187 * interface (InternalGMIService in this case), and since the 188 * dynamically generated proxy is the object actually implementing 189 * the service interface, we must do this trick. 190 */ 191 public Object invoke(Method m, Object[] args, Callback callback) 192 throws Exception 193 { 194 throw new UnsupportedOperationException(); 195 } 196 197 198 //////////////////////////////////////////////////////////////////////////////////////////// 199 // Methods from InvocationHandler interface 200 //////////////////////////////////////////////////////////////////////////////////////////// 201 202 /** 203 * Processes a method invocation on a proxy instance and returns 204 * the result. This method will be invoked on an invocation handler 205 * when a method is invoked on a proxy instance that it is 206 * associated with. 207 */ 208 public Object invoke(Object proxy, Method m, Object[] args) 209 throws Throwable 210 { 211 if (log.isDebugEnabled()) 212 log.debug("igmi: " + m); 213 boolean asynchronous = false; 214 Callback callback = null; 215 216 /* 217 * Check whether the method has asynchronous semantics; if so, read 218 * the arguments. Otherwise, just use the method directly with 219 * synchronous semantics. 220 */ 221 if (m.equals(invokeMethod)) { 222 asynchronous = true; 223 m = (Method) args[0]; 224 callback = (Callback) args[2]; 225 args = (Object[]) args[1]; 226 } 227 228 try { 229 /* 230 * Prepare message 231 */ 232 long hash = methodTable.get(m); 233 OutputStream message = multicastService.getMessage(PROTOCOL_NAME); 234 ObjectOutputStream out = new MarshalOutputStream(message); 235 out.writeLong(hash); 236 int len = (args == null ? 0 : args.length); 237 out.writeInt(len); 238 for (int i=0; i < len; i++) 239 out.writeObject(args[i]); 240 out.flush(); 241 242 if (asynchronous) { 243 // Asynchronous semantics 244 if (callback != null) 245 multicastService.mcast(message, 246 new AsynchAckListener(callback).getRemoteAckListener()); 247 else 248 multicastService.mcast(message, null); 249 return null; 250 } else { 251 // Synchronous semantics 252 GroupAckListener listener = new GroupAckListener(this); 253 multicastService.mcast(message, listener.getRemoteAckListener()); 254 return listener.getResults(); 255 } 256 257 } catch (JgroupException e) { 258 throw e; 259 } catch (Exception e) { 260 log.warn("Unable to perform internal group method invocation", e); 261 throw e; 262 } 263 } 264 265 //////////////////////////////////////////////////////////////////////////////////////////// 266 // Methods from MulticastListener 267 //////////////////////////////////////////////////////////////////////////////////////////// 268 269 /** 270 * Returns a string naming the protocol implemented by this multicast 271 * listener. 272 */ 273 public String getProtocolName() 274 { 275 return PROTOCOL_NAME; 276 } 277 278 279 /** 280 * Upcall that is invoked by Jgroup to deliver a message 281 * <code>msg</code> related to the <i>IGMI</i> protocol. 282 * 283 * @param inmsg the stream from which the message may be read. 284 */ 285 public Object deliverStream(InputStream inmsg, MemberId sender, int seqNo) 286 { 287 if (log.isDebugEnabled()) 288 log.debug("IntGroupHandler: received stream message"); 289 try { 290 long hash; 291 Object[] args; 292 synchronized (inmsg) { 293 /* 294 * This must be synchronized on the inmsg object so as to avoid 295 * concurrent access to the inmsg object, since most methods in 296 * the InMessage class modify its state (the position field in 297 * particular), including methods such as seek(), read(), insert() 298 * and so on. Such concurrent access may occur when two competing 299 * threads (Ehandler and Dispatcher) want to access the inmsg object. 300 * For example, one thread may be reading the inmsg object for 301 * delivery to an upper-layer (see IntGroupHandler.deliverStream()), 302 * while another thread wants to send the inmsg to another member of 303 * the group, and thus needs to seek to the START_DATA position; 304 * see MsgMcast.getInMessage(). 305 */ 306 ObjectInputStream in = new MarshalInputStream(inmsg); 307 hash = in.readLong(); 308 int size = in.readInt(); 309 args = new Object[size]; 310 for (int i=0; i < args.length; i++) 311 args[i] = in.readObject(); 312 } 313 314 /* Obtain the method from the method hash */ 315 Method m = methodTable.get(hash); 316 Object server = methodTable.getServer(hash); 317 try { 318 return m.invoke(server, args); 319 } catch (Exception e) { 320 return e; 321 } 322 } catch (Exception e) { 323 /* 324 * If we for some reason received an incorrectly formated message; 325 * we simply log the raised exception and ignore the message since 326 * we cannot know its origin. That is it may be spam. 327 */ 328 log.warn("IntGroupHandler: Unable to unmarshal stream message", e); 329 return null; 330 } 331 } 332 333 334 /** 335 * Upcall that is invoked by Jgroup to deliver a message <code> obj</code> 336 * related to the group identified by <code> group</code>. In this case, 337 * the message is an object marshalled and unmarshalled through java 338 * serialization. 339 * 340 * @param obj the object containing the message. 341 */ 342 public Object deliverObject(Object obj, MemberId sender, int seqNo) 343 { 344 throw new UnsupportedOperationException(); 345 } 346 347 } // END IntGroupHandler