1 /*
2 * Copyright (c) 1998-2006 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.gmi.protocols;
20
21 import java.io.BufferedInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import jgroup.core.JgroupException;
27 import jgroup.core.MemberId;
28 import jgroup.core.MembershipService;
29 import jgroup.core.multicast.MulticastListener;
30 import jgroup.core.multicast.MulticastService;
31 import jgroup.relacs.gmi.GroupAckListener;
32 import jgroup.relacs.gmi.GroupInvocationDispatcher;
33 import jgroup.relacs.gmi.InvocationResult;
34 import jgroup.relacs.gmi.MethodSemantics;
35
36 import org.apache.log4j.Logger;
37 import org.apache.log4j.MDC;
38
39 /**
40 * Multicast represents a protocol that dispatch method invocations
41 * received through the local endpoint to all the other members of
42 * the group, including the local server. The result from all members
43 * is returned to the local endpoint whom first received the method
44 * invocation request, and one of them is selected and passed back to
45 * the client.
46 *
47 * @author Hein Meling <hein.meling@uis.no>
48 */
49 public class Multicast
50 extends BasicDispatcher
51 implements MulticastListener
52 {
53
54 ////////////////////////////////////////////////////////////////////////////////////////////
55 // Logger
56 ////////////////////////////////////////////////////////////////////////////////////////////
57
58 /** Obtain logger for this class */
59 private static final Logger log = Logger.getLogger(Multicast.class);
60
61
62 ////////////////////////////////////////////////////////////////////////////////////////////
63 // Constants
64 ////////////////////////////////////////////////////////////////////////////////////////////
65
66 /** Protocol name for demux in the multicast layer */
67 private static final String PROTOCOL_NAME = MethodSemantics.MULTICAST.toString();
68
69
70 ////////////////////////////////////////////////////////////////////////////////////////////
71 // Fields
72 ////////////////////////////////////////////////////////////////////////////////////////////
73
74 /** The multicast service interface */
75 private MulticastService mcast;
76
77 /** The membership service interface */
78 private MembershipService pgms;
79
80
81 ////////////////////////////////////////////////////////////////////////////////////////////
82 // Constructor
83 ////////////////////////////////////////////////////////////////////////////////////////////
84
85 public Multicast(GroupInvocationDispatcher dispatcher,
86 MulticastService mcast, MembershipService pgms)
87 {
88 super(dispatcher);
89 this.mcast = mcast;
90 this.pgms = pgms;
91 /*
92 * We need to listen to multicast events, and since this is
93 * not a layer it will not be handled automatically by the
94 * group manager construction mechanism.
95 */
96 mcast.addListener(this);
97 }
98
99
100 ////////////////////////////////////////////////////////////////////////////////////////////
101 // ProtocolDispatcher interface (overriding BasicDispatcher)
102 ////////////////////////////////////////////////////////////////////////////////////////////
103
104 /**
105 * Handle inbound request to the local endpoint with <i>multicast</i>
106 * invocation semantics.
107 *
108 * The local endpoint will send a multicast message to all the other group
109 * members for method invocation dispatching at each members. They each
110 * return a result to the local endpoint, and one of these values are selected
111 * and returned to the client (GroupInvocationHandler).
112 *
113 * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
114 */
115 public InvocationResult dispatch(InputStream in)
116 throws IOException
117 {
118 // Reduce overhead if group has a single member only
119 if (pgms.members() == 1)
120 return super.dispatch(in);
121
122 //HACK: buffer the inputstream and mark the current position.
123 BufferedInputStream bis = new BufferedInputStream(in);
124 bis.mark(50000);
125 OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
126 // Write request into multicast output stream
127 byte[] buf = new byte[1500];
128 int bytesRead;
129 while ((bytesRead = bis.read(buf)) != -1) {
130 mout.write(buf, 0, bytesRead);
131 }
132 // Do multicast
133 GroupAckListener ackListener = new GroupAckListener(this);
134 try {
135 if (log.isDebugEnabled())
136 log.debug("Multicast: multicasting invocation to group members");
137 mcast.mcast(mout, ackListener.getRemoteAckListener());
138 } catch (JgroupException jex) {
139 log.warn("Multicast invocation failed; member is not ready", jex);
140 return null;
141 }
142 // Wait for ack listener
143 boolean hasResults = ackListener.pendingCompletionOrTimeout(2);
144 if (hasResults) {
145 return (InvocationResult) ackListener.getResult();
146 } else {
147 //HACK: if no results received within timeout value; dispatch only locally.
148 bis.reset();
149 return super.dispatch(bis);
150 }
151 }
152
153
154 ////////////////////////////////////////////////////////////////////////////////////////////
155 // MulticastListener methods
156 ////////////////////////////////////////////////////////////////////////////////////////////
157
158 /* (non-Javadoc)
159 * @see jgroup.core.multicast.MulticastListener#getProtocolName()
160 */
161 public String getProtocolName()
162 {
163 return PROTOCOL_NAME;
164 }
165
166 /**
167 * Received a multicast message from a mediating endpoint for invocation
168 * dispatching at the local server.
169 *
170 * @param msg
171 * Multicast message received
172 * @return
173 * The result of the multicast invocation, or <code>null</code>
174 * if the invocation failed.
175 *
176 * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
177 */
178 public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
179 {
180 if (log.isDebugEnabled()) {
181 MDC.put("group", "[Group: " + pgms.getGid() + "]");
182 log.debug("deliverStream() start");
183 }
184 try {
185 // Perform the actual method invocation on the local server (BasicDispatcher)
186 return super.dispatch(msg);
187 } catch (IOException e) {
188 log.warn("Failed to dispatch multicast invocation", e);
189 }
190 return null;
191 }
192
193 /* (non-Javadoc)
194 * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object, jgroup.core.MemberId, int)
195 */
196 public Object deliverObject(Object msg, MemberId sender, int seqNo)
197 {
198 throw new UnsupportedOperationException();
199 }
200
201 } // END Multicast