View Javadoc

1   /*
2    * Copyright (c) 1998-2006 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi.protocols;
20  
21  import static jgroup.relacs.gmi.protocols.Leadercast.MessageType.LEADERCAST_MSG;
22  import static jgroup.relacs.gmi.protocols.Leadercast.MessageType.STATE_MSG;
23  
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.ObjectInputStream;
27  import java.io.ObjectOutputStream;
28  import java.io.OutputStream;
29  
30  import jgroup.core.JgroupException;
31  import jgroup.core.MemberId;
32  import jgroup.core.MembershipService;
33  import jgroup.core.StateListener;
34  import jgroup.core.multicast.ChainIdentifier;
35  import jgroup.core.multicast.MulticastListener;
36  import jgroup.core.multicast.MulticastService;
37  import jgroup.relacs.gmi.GroupAckListener;
38  import jgroup.relacs.gmi.GroupInvocationDispatcher;
39  import jgroup.relacs.gmi.InvocationResult;
40  import jgroup.relacs.gmi.MethodSemantics;
41  
42  import org.apache.log4j.Logger;
43  import org.apache.log4j.MDC;
44  
45  /**
46   * Leadercast represents a protocol that will always try to invoke the the
47   * group leader first.  The group leader will pass the servers post-invocation
48   * state to the other members of the group, if the invocation resulted in a
49   * state change.  If a client tries to invoke a non-leader (follower) replica,
50   * that replica will forward the invocation to the leader, and will also mediate
51   * the result back to the client.  In such failover scenarios, the client will
52   * invoke the leader directly on the next interaction.  
53   *
54   * @author Hein Meling <hein.meling@uis.no>
55   */
56  public class Leadercast
57    extends BasicDispatcher
58    implements MulticastListener
59  {
60  
61    ////////////////////////////////////////////////////////////////////////////////////////////
62    // Logger
63    ////////////////////////////////////////////////////////////////////////////////////////////
64  
65    /** Obtain logger for this class */
66    private static final Logger log = Logger.getLogger(Leadercast.class);
67  
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Constants
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** Message types supported by this protocol */
74    enum MessageType { STATE_MSG, LEADERCAST_MSG }
75  
76    /** Protocol name for demux in the multicast layer */
77    private static final String PROTOCOL_NAME = MethodSemantics.LEADERCAST.toString();
78  
79  
80    ////////////////////////////////////////////////////////////////////////////////////////////
81    // Fields
82    ////////////////////////////////////////////////////////////////////////////////////////////
83  
84    /** The multicast service interface */
85    private final MulticastService mcast;
86  
87    /** The membership service interface */
88    private final MembershipService pgms;
89  
90    /** The state listener object */
91    private StateListener stateListener;
92  
93  
94    ////////////////////////////////////////////////////////////////////////////////////////////
95    // Constructor
96    ////////////////////////////////////////////////////////////////////////////////////////////
97  
98    public Leadercast(GroupInvocationDispatcher dispatcher,
99        MulticastService mcast, MembershipService pgms)
100   {
101     super(dispatcher);
102     this.mcast = mcast;
103     this.pgms = pgms;
104     /*
105      * We need to listen to multicast events, and since this is
106      * not a layer it will not be handled automatically by the
107      * group manager construction mechanism.
108      */
109     mcast.addListener(this);
110   }
111 
112 
113   ////////////////////////////////////////////////////////////////////////////////////////////
114   // ProtocolDispatcher interface (overriding BasicDispatcher)
115   ////////////////////////////////////////////////////////////////////////////////////////////
116 
117   /**
118    * Handle inbound request to the local endpoint with <i>leadercast</i>
119    * invocation semantics.
120    * 
121    * If the local endpoint is the group leader, then just compute the result
122    * value and return it to the client (GroupInvocationHandler).
123    * 
124    * Otherwise, if the local endpoint is a follower, then we multicast the
125    * request to all group members, allowing the leader to pick it up
126    * (@see deliverStream below) and provide a return value back to this
127    * endpoint whom will pass it back to the client (GroupInvocationHandler).
128    *
129    * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
130    */
131   public InvocationResult dispatch(InputStream in)
132     throws IOException
133   {
134     if (pgms.isLeader()) {
135       /*
136        * Note that in this case, the chainId is set to null, since
137        * this request was received from a client directly, and not
138        * mediated through a multicast from another group member.
139        */
140       return dispatchAtLeader(in, null);
141     } else {
142       /*
143        * I'm a follower; multicast the request, await result from leader,
144        * and pass it back to the client (GroupInvocationHandler).
145        */
146       return dispatchLeadercast(in);
147     }
148   }
149 
150   
151   /**
152    * Set the server object for use by the <code>Leadercast</code> protocol.<p>
153    * 
154    * This allows the leadercast protocol to obtain access to the <code>StateListener</code>
155    * interface of the server.  This overrides the empty implementation in
156    * <code>BasicDispatcher</code>.
157    * 
158    * If the provided server does not implement a <code>StateListener</code> interface,
159    * the call is ignored.  This allows the server to use the leadercast protocol without
160    * having its state (if any) passed to the followers.
161    * 
162    * @throws IllegalStateException
163    *   If the server has already been set.  The method should not be called multiple times.
164    *   
165    * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#setServer(java.lang.Object)
166    */
167   public void setServer(Object server)
168   {
169     /*
170      * This method protects against updating the 'stateListener', and hence
171      * we do not need to synchronize on this method.
172      */ 
173     if (this.stateListener == null) {
174       if (server instanceof StateListener) {
175         stateListener = (StateListener) server;
176       }
177     } else {
178       throw new IllegalStateException("Multiple state listeners not supported.");
179     }
180   }
181 
182 
183   ////////////////////////////////////////////////////////////////////////////////////////////
184   // MulticastListener methods
185   ////////////////////////////////////////////////////////////////////////////////////////////
186 
187   /* (non-Javadoc)
188    * @see jgroup.core.multicast.MulticastListener#getProtocolName()
189    */
190   public String getProtocolName()
191   {
192     return PROTOCOL_NAME;
193   }
194 
195   /* (non-Javadoc)
196    * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
197    */
198   public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
199   {
200     if (log.isDebugEnabled()) {
201       MDC.put("group", "[Group: " + pgms.getGid() + "]");
202       log.debug("deliverStream() start");
203     }
204     try {
205       int ordinal = msg.read();
206       MessageType msgType = MessageType.values()[ordinal];
207       switch (msgType) {
208 
209         case LEADERCAST_MSG:
210           // Only the leader will process this leadercast (coming from a follower)
211           if (pgms.isLeader()) {
212             ChainIdentifier chainId = new ChainIdentifier(sender, seqNo);
213             return dispatchAtLeader(msg, chainId);
214           }
215           break;
216 
217         case STATE_MSG:
218           // Only followers will process state objects
219           if (!pgms.isLeader() && stateListener != null) {
220             // Read state object from message stream
221             Object state = new ObjectInputStream(msg).readObject();
222             if (log.isDebugEnabled())
223               log.debug("Received state object from leader: " + state);
224             // Apply state object to the state listener (server replica)
225             stateListener.putState(state);
226           }
227           break;
228 
229         default:
230           log.error("Unknown message type: " + msgType);
231           break;
232       }
233     } catch (ArrayIndexOutOfBoundsException e) {
234       log.warn("Unknown message type (ordinal from stream is wrong)", e);
235     } catch (Exception e) {
236       log.warn("Failed to deliver multicast message", e);
237     }
238     return null;
239   }
240 
241   /* (non-Javadoc)
242    * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object, jgroup.core.MemberId, int)
243    */
244   public Object deliverObject(Object msg, MemberId sender, int seqNo)
245   {
246     throw new UnsupportedOperationException();
247   }
248 
249 
250   ////////////////////////////////////////////////////////////////////////////////////////////
251   // Private methods
252   ////////////////////////////////////////////////////////////////////////////////////////////
253 
254   /**
255    * Handles inbound requests with <i>leadercast</i> semantics received at a follower.
256    * 
257    * Note that this should happen rarely, since the client should select the
258    * group leader as its communicating endpoint in most cases.  However, in cases
259    * where failover is necessary, the endpoint selected by the client may not be
260    * the group leader.
261    * 
262    * The inbound request will be multicasted to all members,
263    * but only the leader will perform the request.  Thus only
264    * the leader will provide a result.
265    * 
266    * @param in <code>InputStream</code> to read invocation from
267    *
268    * @see jgroup.relacs.gmi.JeriEGMILayer#deliverStream(java.io.InputStream)
269    */
270   private InvocationResult dispatchLeadercast(InputStream in) 
271     throws IOException
272   {
273     OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
274     mout.write(LEADERCAST_MSG.ordinal());
275     // Write request into multicast output stream
276     byte[] buf = new byte[1500];
277     int bytesRead;
278     while ((bytesRead = in.read(buf)) != -1) {
279       mout.write(buf, 0, bytesRead);
280     }
281     // Do multicast
282     GroupAckListener ackListener = new GroupAckListener(this);
283     try {
284       if (log.isDebugEnabled())
285         log.debug("Leadercast: multicasting invocation to group members");
286       mcast.mcast(mout, ackListener.getRemoteAckListener());
287     } catch (JgroupException jex) {
288       log.warn("Leadercast invocation failed; member is not ready", jex);
289       return null;
290     }
291     // Wait for ack listener
292     return (InvocationResult) ackListener.getLeaderResult();
293   }
294 
295 
296   /**
297    * Dispatch the received message to the local endpoint, which is the
298    * leader replica.  The message may have been received through another
299    * mediating member (follower) or directly from the client (if the provided
300    * <code>ChainIdentifier</code> is <code>null</code>).
301    * 
302    * Checks if there is a <code>StateListener</code>, in which case
303    * the state is passed on to the follower replicas.  However, only
304    * if the state of the leader replica changed as a consequence of
305    * the invocation.
306    * 
307    * @param in
308    *   The input stream encapsulating the method invocation
309    * @param chainId 
310    *   The chain identifyer used to identify that a potential state message
311    *   is associated with another multicast.
312    * @return
313    *   The <code>InvocationResult</code> of the method invocation
314    * @throws IOException
315    *   Raised if the state broadcast failed
316    */
317   private InvocationResult dispatchAtLeader(InputStream in, ChainIdentifier chainId)
318     throws IOException
319   {
320     if (log.isDebugEnabled())
321       log.debug("dispatchAtLeader()");
322     InvocationResult res = null;
323     // Avoid broadcasting state, if there is only a single members
324     if (stateListener == null || pgms.members() == 1) {
325       // Perform the actual method invocation on the local server (BasicDispatcher)
326       res = super.dispatch(in);
327     } else {
328       // Get the state prior to the invocation on the leader
329       Object preInvocState = stateListener.getState();
330       int preInvocHash = (preInvocState != null) ? preInvocState.hashCode() : 0;
331       if (log.isDebugEnabled())
332         log.debug(" preHash=" +  preInvocHash + ",  preState=" +  preInvocState);
333 
334       // Perform the actual method invocation on the local server (BasicDispatcher)
335       res = super.dispatch(in);
336 
337       // Get the state after to the invocation
338       Object postInvocState = stateListener.getState();
339       int postInvocHash = (postInvocState != null) ? postInvocState.hashCode() : 0;
340       if (log.isDebugEnabled())
341         log.debug("postHash=" + postInvocHash + ", postState=" + postInvocState);
342 
343       // Check if the method invocation changed the replica state 
344       if (preInvocHash != postInvocHash) {
345         if (log.isDebugEnabled())
346           log.debug("State has changed at leader; broadcasting to followers:");
347         broadcastState(postInvocState, chainId);
348       }
349     }
350     return res;
351   }
352 
353 
354   /**
355    *  Broadcast the state of the leader replica after an invocation
356    *  has been completed.
357    * 
358    *  @param state
359    *    The state of the leader replica
360    */
361   private void broadcastState(Object state, ChainIdentifier chainId) 
362     throws IOException
363   {
364     OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
365     mout.write(STATE_MSG.ordinal());
366     // Write the state object to the stream
367     new ObjectOutputStream(mout).writeObject(state);
368     // Do multicast
369     GroupAckListener ackListener = new GroupAckListener(this);
370     try {
371       if (log.isDebugEnabled())
372         log.debug("Multicasting state to group members (including leader)");
373       mcast.mcast(mout, ackListener.getRemoteAckListener(), chainId);
374     } catch (JgroupException jex) {
375       log.warn("Multicasting state failed; member is not ready", jex);
376     }
377     // Wait for ack listener (ignore the result which is null anyway)
378     ackListener.getResult();
379   }
380 
381 } // END Leadercast