1 /*
2 * Copyright (c) 1998-2006 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.gmi.protocols;
20
21 import static jgroup.relacs.gmi.protocols.Leadercast.MessageType.LEADERCAST_MSG;
22 import static jgroup.relacs.gmi.protocols.Leadercast.MessageType.STATE_MSG;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.ObjectInputStream;
27 import java.io.ObjectOutputStream;
28 import java.io.OutputStream;
29
30 import jgroup.core.JgroupException;
31 import jgroup.core.MemberId;
32 import jgroup.core.MembershipService;
33 import jgroup.core.StateListener;
34 import jgroup.core.multicast.ChainIdentifier;
35 import jgroup.core.multicast.MulticastListener;
36 import jgroup.core.multicast.MulticastService;
37 import jgroup.relacs.gmi.GroupAckListener;
38 import jgroup.relacs.gmi.GroupInvocationDispatcher;
39 import jgroup.relacs.gmi.InvocationResult;
40 import jgroup.relacs.gmi.MethodSemantics;
41
42 import org.apache.log4j.Logger;
43 import org.apache.log4j.MDC;
44
45 /**
46 * Leadercast represents a protocol that will always try to invoke the the
47 * group leader first. The group leader will pass the servers post-invocation
48 * state to the other members of the group, if the invocation resulted in a
49 * state change. If a client tries to invoke a non-leader (follower) replica,
50 * that replica will forward the invocation to the leader, and will also mediate
51 * the result back to the client. In such failover scenarios, the client will
52 * invoke the leader directly on the next interaction.
53 *
54 * @author Hein Meling <hein.meling@uis.no>
55 */
56 public class Leadercast
57 extends BasicDispatcher
58 implements MulticastListener
59 {
60
61 ////////////////////////////////////////////////////////////////////////////////////////////
62 // Logger
63 ////////////////////////////////////////////////////////////////////////////////////////////
64
65 /** Obtain logger for this class */
66 private static final Logger log = Logger.getLogger(Leadercast.class);
67
68
69 ////////////////////////////////////////////////////////////////////////////////////////////
70 // Constants
71 ////////////////////////////////////////////////////////////////////////////////////////////
72
73 /** Message types supported by this protocol */
74 enum MessageType { STATE_MSG, LEADERCAST_MSG }
75
76 /** Protocol name for demux in the multicast layer */
77 private static final String PROTOCOL_NAME = MethodSemantics.LEADERCAST.toString();
78
79
80 ////////////////////////////////////////////////////////////////////////////////////////////
81 // Fields
82 ////////////////////////////////////////////////////////////////////////////////////////////
83
84 /** The multicast service interface */
85 private final MulticastService mcast;
86
87 /** The membership service interface */
88 private final MembershipService pgms;
89
90 /** The state listener object */
91 private StateListener stateListener;
92
93
94 ////////////////////////////////////////////////////////////////////////////////////////////
95 // Constructor
96 ////////////////////////////////////////////////////////////////////////////////////////////
97
98 public Leadercast(GroupInvocationDispatcher dispatcher,
99 MulticastService mcast, MembershipService pgms)
100 {
101 super(dispatcher);
102 this.mcast = mcast;
103 this.pgms = pgms;
104 /*
105 * We need to listen to multicast events, and since this is
106 * not a layer it will not be handled automatically by the
107 * group manager construction mechanism.
108 */
109 mcast.addListener(this);
110 }
111
112
113 ////////////////////////////////////////////////////////////////////////////////////////////
114 // ProtocolDispatcher interface (overriding BasicDispatcher)
115 ////////////////////////////////////////////////////////////////////////////////////////////
116
117 /**
118 * Handle inbound request to the local endpoint with <i>leadercast</i>
119 * invocation semantics.
120 *
121 * If the local endpoint is the group leader, then just compute the result
122 * value and return it to the client (GroupInvocationHandler).
123 *
124 * Otherwise, if the local endpoint is a follower, then we multicast the
125 * request to all group members, allowing the leader to pick it up
126 * (@see deliverStream below) and provide a return value back to this
127 * endpoint whom will pass it back to the client (GroupInvocationHandler).
128 *
129 * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
130 */
131 public InvocationResult dispatch(InputStream in)
132 throws IOException
133 {
134 if (pgms.isLeader()) {
135 /*
136 * Note that in this case, the chainId is set to null, since
137 * this request was received from a client directly, and not
138 * mediated through a multicast from another group member.
139 */
140 return dispatchAtLeader(in, null);
141 } else {
142 /*
143 * I'm a follower; multicast the request, await result from leader,
144 * and pass it back to the client (GroupInvocationHandler).
145 */
146 return dispatchLeadercast(in);
147 }
148 }
149
150
151 /**
152 * Set the server object for use by the <code>Leadercast</code> protocol.<p>
153 *
154 * This allows the leadercast protocol to obtain access to the <code>StateListener</code>
155 * interface of the server. This overrides the empty implementation in
156 * <code>BasicDispatcher</code>.
157 *
158 * If the provided server does not implement a <code>StateListener</code> interface,
159 * the call is ignored. This allows the server to use the leadercast protocol without
160 * having its state (if any) passed to the followers.
161 *
162 * @throws IllegalStateException
163 * If the server has already been set. The method should not be called multiple times.
164 *
165 * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#setServer(java.lang.Object)
166 */
167 public void setServer(Object server)
168 {
169 /*
170 * This method protects against updating the 'stateListener', and hence
171 * we do not need to synchronize on this method.
172 */
173 if (this.stateListener == null) {
174 if (server instanceof StateListener) {
175 stateListener = (StateListener) server;
176 }
177 } else {
178 throw new IllegalStateException("Multiple state listeners not supported.");
179 }
180 }
181
182
183 ////////////////////////////////////////////////////////////////////////////////////////////
184 // MulticastListener methods
185 ////////////////////////////////////////////////////////////////////////////////////////////
186
187 /* (non-Javadoc)
188 * @see jgroup.core.multicast.MulticastListener#getProtocolName()
189 */
190 public String getProtocolName()
191 {
192 return PROTOCOL_NAME;
193 }
194
195 /* (non-Javadoc)
196 * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
197 */
198 public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
199 {
200 if (log.isDebugEnabled()) {
201 MDC.put("group", "[Group: " + pgms.getGid() + "]");
202 log.debug("deliverStream() start");
203 }
204 try {
205 int ordinal = msg.read();
206 MessageType msgType = MessageType.values()[ordinal];
207 switch (msgType) {
208
209 case LEADERCAST_MSG:
210 // Only the leader will process this leadercast (coming from a follower)
211 if (pgms.isLeader()) {
212 ChainIdentifier chainId = new ChainIdentifier(sender, seqNo);
213 return dispatchAtLeader(msg, chainId);
214 }
215 break;
216
217 case STATE_MSG:
218 // Only followers will process state objects
219 if (!pgms.isLeader() && stateListener != null) {
220 // Read state object from message stream
221 Object state = new ObjectInputStream(msg).readObject();
222 if (log.isDebugEnabled())
223 log.debug("Received state object from leader: " + state);
224 // Apply state object to the state listener (server replica)
225 stateListener.putState(state);
226 }
227 break;
228
229 default:
230 log.error("Unknown message type: " + msgType);
231 break;
232 }
233 } catch (ArrayIndexOutOfBoundsException e) {
234 log.warn("Unknown message type (ordinal from stream is wrong)", e);
235 } catch (Exception e) {
236 log.warn("Failed to deliver multicast message", e);
237 }
238 return null;
239 }
240
241 /* (non-Javadoc)
242 * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object, jgroup.core.MemberId, int)
243 */
244 public Object deliverObject(Object msg, MemberId sender, int seqNo)
245 {
246 throw new UnsupportedOperationException();
247 }
248
249
250 ////////////////////////////////////////////////////////////////////////////////////////////
251 // Private methods
252 ////////////////////////////////////////////////////////////////////////////////////////////
253
254 /**
255 * Handles inbound requests with <i>leadercast</i> semantics received at a follower.
256 *
257 * Note that this should happen rarely, since the client should select the
258 * group leader as its communicating endpoint in most cases. However, in cases
259 * where failover is necessary, the endpoint selected by the client may not be
260 * the group leader.
261 *
262 * The inbound request will be multicasted to all members,
263 * but only the leader will perform the request. Thus only
264 * the leader will provide a result.
265 *
266 * @param in <code>InputStream</code> to read invocation from
267 *
268 * @see jgroup.relacs.gmi.JeriEGMILayer#deliverStream(java.io.InputStream)
269 */
270 private InvocationResult dispatchLeadercast(InputStream in)
271 throws IOException
272 {
273 OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
274 mout.write(LEADERCAST_MSG.ordinal());
275 // Write request into multicast output stream
276 byte[] buf = new byte[1500];
277 int bytesRead;
278 while ((bytesRead = in.read(buf)) != -1) {
279 mout.write(buf, 0, bytesRead);
280 }
281 // Do multicast
282 GroupAckListener ackListener = new GroupAckListener(this);
283 try {
284 if (log.isDebugEnabled())
285 log.debug("Leadercast: multicasting invocation to group members");
286 mcast.mcast(mout, ackListener.getRemoteAckListener());
287 } catch (JgroupException jex) {
288 log.warn("Leadercast invocation failed; member is not ready", jex);
289 return null;
290 }
291 // Wait for ack listener
292 return (InvocationResult) ackListener.getLeaderResult();
293 }
294
295
296 /**
297 * Dispatch the received message to the local endpoint, which is the
298 * leader replica. The message may have been received through another
299 * mediating member (follower) or directly from the client (if the provided
300 * <code>ChainIdentifier</code> is <code>null</code>).
301 *
302 * Checks if there is a <code>StateListener</code>, in which case
303 * the state is passed on to the follower replicas. However, only
304 * if the state of the leader replica changed as a consequence of
305 * the invocation.
306 *
307 * @param in
308 * The input stream encapsulating the method invocation
309 * @param chainId
310 * The chain identifyer used to identify that a potential state message
311 * is associated with another multicast.
312 * @return
313 * The <code>InvocationResult</code> of the method invocation
314 * @throws IOException
315 * Raised if the state broadcast failed
316 */
317 private InvocationResult dispatchAtLeader(InputStream in, ChainIdentifier chainId)
318 throws IOException
319 {
320 if (log.isDebugEnabled())
321 log.debug("dispatchAtLeader()");
322 InvocationResult res = null;
323 // Avoid broadcasting state, if there is only a single members
324 if (stateListener == null || pgms.members() == 1) {
325 // Perform the actual method invocation on the local server (BasicDispatcher)
326 res = super.dispatch(in);
327 } else {
328 // Get the state prior to the invocation on the leader
329 Object preInvocState = stateListener.getState();
330 int preInvocHash = (preInvocState != null) ? preInvocState.hashCode() : 0;
331 if (log.isDebugEnabled())
332 log.debug(" preHash=" + preInvocHash + ", preState=" + preInvocState);
333
334 // Perform the actual method invocation on the local server (BasicDispatcher)
335 res = super.dispatch(in);
336
337 // Get the state after to the invocation
338 Object postInvocState = stateListener.getState();
339 int postInvocHash = (postInvocState != null) ? postInvocState.hashCode() : 0;
340 if (log.isDebugEnabled())
341 log.debug("postHash=" + postInvocHash + ", postState=" + postInvocState);
342
343 // Check if the method invocation changed the replica state
344 if (preInvocHash != postInvocHash) {
345 if (log.isDebugEnabled())
346 log.debug("State has changed at leader; broadcasting to followers:");
347 broadcastState(postInvocState, chainId);
348 }
349 }
350 return res;
351 }
352
353
354 /**
355 * Broadcast the state of the leader replica after an invocation
356 * has been completed.
357 *
358 * @param state
359 * The state of the leader replica
360 */
361 private void broadcastState(Object state, ChainIdentifier chainId)
362 throws IOException
363 {
364 OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
365 mout.write(STATE_MSG.ordinal());
366 // Write the state object to the stream
367 new ObjectOutputStream(mout).writeObject(state);
368 // Do multicast
369 GroupAckListener ackListener = new GroupAckListener(this);
370 try {
371 if (log.isDebugEnabled())
372 log.debug("Multicasting state to group members (including leader)");
373 mcast.mcast(mout, ackListener.getRemoteAckListener(), chainId);
374 } catch (JgroupException jex) {
375 log.warn("Multicasting state failed; member is not ready", jex);
376 }
377 // Wait for ack listener (ignore the result which is null anyway)
378 ackListener.getResult();
379 }
380
381 } // END Leadercast