1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.gm;
20
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.io.ObjectOutputStream;
24 import java.io.OutputStream;
25 import java.util.HashMap;
26 import java.util.Map;
27
28 import jgroup.core.JgroupException;
29 import jgroup.core.MemberId;
30 import jgroup.core.MembershipService;
31 import jgroup.core.multicast.AckListener;
32 import jgroup.core.multicast.ChainIdentifier;
33 import jgroup.core.multicast.MulticastListener;
34 import jgroup.core.multicast.MulticastService;
35 import jgroup.relacs.daemon.DaemonInteraction;
36 import jgroup.relacs.daemon.MsgMcast;
37 import jgroup.relacs.events.DeliveryAck;
38 import jgroup.relacs.events.DeliveryEvent;
39 import jgroup.relacs.events.Event;
40 import jgroup.relacs.events.EventTags;
41 import jgroup.relacs.events.MulticastRequest;
42 import jgroup.util.InMessage;
43 import jgroup.util.Network;
44 import jgroup.util.OutMessage;
45
46 import org.apache.log4j.Logger;
47 import org.apache.log4j.NDC;
48
49
50
51
52
53
54
55
56 public final class MulticastLayer
57 implements MulticastService, DispatcherListener, EventTags
58 {
59
60
61
62
63
64
65 private static final Logger log = Logger.getLogger(MulticastLayer.class);
66
67
68
69
70
71
72
73 private final Map<String,MulticastListener> multicastListeners =
74 new HashMap<String,MulticastListener>();
75
76
77 private final MembershipService pgms;
78
79
80 private final MemberId me;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 private MulticastLayer(MembershipService pgms)
96 throws JgroupException
97 {
98 this.pgms = pgms;
99 this.me = pgms.getMyIdentifier();
100 }
101
102
103
104
105
106
107 public static MulticastLayer getLayer(DispatcherService ds, MembershipService pgms)
108 throws JgroupException
109 {
110 return new MulticastLayer(pgms);
111 }
112
113
114
115
116
117
118
119
120
121
122 public void addListener(Object listener)
123 {
124 if (listener instanceof MulticastListener) {
125 MulticastListener mListener = (MulticastListener) listener;
126 String protocol = mListener.getProtocolName();
127 Object currentListener = multicastListeners.get(protocol);
128 if (currentListener == null)
129 multicastListeners.put(protocol, mListener);
130 else if (!currentListener.equals(listener))
131 throw new IllegalArgumentException("Multiple multicast listeners for the same protocol is illegal: " + protocol);
132 } else {
133 throw new IllegalArgumentException("Specified listener does not implement a MulticastListener");
134 }
135 }
136
137
138
139
140
141
142
143
144
145
146 public void mcast(OutputStream out, AckListener ackl, ChainIdentifier chId)
147 throws JgroupException
148 {
149 doChecks();
150 if (!(out instanceof OutMessage))
151 throw new JgroupException("Only OutputStream obtained through getMessage(protocol) can be multicast");
152
153 if (log.isDebugEnabled())
154 log.debug("mcast output stream: " + out);
155
156 DaemonInteraction.addEvent(
157 new MulticastRequest(pgms.getGid(), false, (OutMessage) out, me, ackl, chId));
158 }
159
160
161
162
163 public void mcast(OutputStream stream, AckListener ackl)
164 throws JgroupException
165 {
166 doChecks();
167 if (!(stream instanceof OutMessage))
168 throw new JgroupException("Only OutputStream obtained through getMessage(protocol) can be multicast");
169
170 if (log.isDebugEnabled())
171 log.debug("mcast output stream: " + stream);
172
173 DaemonInteraction.addEvent(
174 new MulticastRequest(pgms.getGid(), false, (OutMessage) stream, me, ackl, null));
175 }
176
177
178
179
180 public void mcast(String protocol, Object obj, AckListener ackl)
181 throws JgroupException, IOException
182 {
183 doChecks();
184 if (!multicastListeners.containsKey(protocol))
185 throw new UnsupportedOperationException("Unsupported protocol: " + protocol);
186
187 if (log.isDebugEnabled())
188 log.debug("mcast object (" + protocol + ")");
189 OutMessage bout = (OutMessage) getMessage(protocol);
190 ObjectOutputStream out = new ObjectOutputStream(bout);
191 out.writeObject(obj);
192 out.close();
193
194
195 DaemonInteraction.addEvent(
196 new MulticastRequest(pgms.getGid(), true, bout, me, ackl, null));
197 }
198
199
200
201
202 public OutputStream getMessage(String protocol)
203 throws IOException
204 {
205 if (multicastListeners.isEmpty())
206 throw new UnsupportedOperationException("Multicast not supported");
207 if (!multicastListeners.containsKey(protocol))
208 throw new UnsupportedOperationException("Unsupported protocol: " + protocol);
209
210 OutMessage outmsg = (OutMessage) MsgMcast.createOutputStream();
211 outmsg.writeUTF(protocol);
212 return outmsg;
213 }
214
215 private void doChecks()
216 throws JgroupException
217 {
218 if (multicastListeners.isEmpty())
219 throw new UnsupportedOperationException("Multicast not supported");
220 if (!pgms.isMemberOrJoining())
221 throw new JgroupException("The gm is not member of any group (may be leaving)");
222 }
223
224
225
226
227
228
229
230
231
232 public int[] eventTypes()
233 {
234 return new int[] {
235 DELIVERY_EVENT
236 };
237 }
238
239
240
241
242
243 public void notifyEvent(Event event)
244 {
245 if (event.getTag() == DELIVERY_EVENT) {
246 handleDeliveryEvent((DeliveryEvent) event);
247 }
248 }
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 private void handleDeliveryEvent(final DeliveryEvent event)
265 {
266 if (multicastListeners.isEmpty()) {
267 log.warn("Received a multicast message addressed to a non-multicast member");
268 return;
269 }
270 if (log.isDebugEnabled()) {
271 NDC.push(Network.getMachineName(event.getSender().getCanonicalHostName()));
272 log.debug("Handling " + event);
273 }
274
275 if (pgms.isMemberOrJoining()) {
276 final InMessage msg = event.getPayload();
277 final MemberId sender = event.getSender();
278 final int seqNo = event.getMessageId();
279 try {
280 Object result = null;
281 if (event.isObject()) {
282 ObjectInputStream in = new ObjectInputStream(msg);
283 String protocol = in.readUTF();
284 MulticastListener mcastListener = getListener(protocol);
285 Object obj = in.readObject();
286 result = mcastListener.deliverObject(obj, sender, seqNo);
287 } else {
288 msg.mark(0);
289 String protocol = msg.readUTF();
290 MulticastListener mcastListener = getListener(protocol);
291 result = mcastListener.deliverStream(msg, sender, seqNo);
292 }
293 if (!event.isAckRequired())
294 result = null;
295
296
297
298
299
300
301
302
303 DeliveryAck deliveryAck = new DeliveryAck(
304 pgms.getGid(), event.getHostIndex(), pgms.getMemberIndex(), pgms.getViewIndex(),
305 seqNo, sender, result, me);
306 DaemonInteraction.addEvent(deliveryAck);
307
308 } catch (Exception e) {
309
310
311
312
313 log.warn("Exception caught during message unmarshaling", e);
314 } finally {
315 if (log.isDebugEnabled()) {
316 log.debug("handleDeliveryEvent end");
317 NDC.pop();
318 }
319 }
320 }
321 }
322
323
324
325
326
327
328
329
330
331
332 private MulticastListener getListener(String protocol)
333 throws JgroupException
334 {
335 if (log.isDebugEnabled())
336 log.debug("Looking for protocol: " + protocol);
337 MulticastListener mcastListener = multicastListeners.get(protocol);
338 if (mcastListener == null) {
339 throw new JgroupException("Received message with unknown protocol: " + protocol);
340 } else {
341 return mcastListener;
342 }
343 }
344
345 }