View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gm;
20  
21  import java.io.IOException;
22  import java.io.ObjectInputStream;
23  import java.io.ObjectOutputStream;
24  import java.io.OutputStream;
25  import java.util.HashMap;
26  import java.util.Map;
27  
28  import jgroup.core.JgroupException;
29  import jgroup.core.MemberId;
30  import jgroup.core.MembershipService;
31  import jgroup.core.multicast.AckListener;
32  import jgroup.core.multicast.ChainIdentifier;
33  import jgroup.core.multicast.MulticastListener;
34  import jgroup.core.multicast.MulticastService;
35  import jgroup.relacs.daemon.DaemonInteraction;
36  import jgroup.relacs.daemon.MsgMcast;
37  import jgroup.relacs.events.DeliveryAck;
38  import jgroup.relacs.events.DeliveryEvent;
39  import jgroup.relacs.events.Event;
40  import jgroup.relacs.events.EventTags;
41  import jgroup.relacs.events.MulticastRequest;
42  import jgroup.util.InMessage;
43  import jgroup.util.Network;
44  import jgroup.util.OutMessage;
45  
46  import org.apache.log4j.Logger;
47  import org.apache.log4j.NDC;
48  
49  /**
50   *  The <code>MulticastLayer</code> class implements the multicast service.
51   *
52   *  @author Alberto Montresor
53   *  @author Hein Meling
54   *  @since Jgroup 2.2
55   */
56  public final class MulticastLayer
57    implements MulticastService, DispatcherListener, EventTags
58  {
59  
60    ////////////////////////////////////////////////////////////////////////////////////////////
61    // Logger
62    ////////////////////////////////////////////////////////////////////////////////////////////
63  
64    /** Obtain logger for this class */
65    private static final Logger log = Logger.getLogger(MulticastLayer.class);
66  
67  
68    ////////////////////////////////////////////////////////////////////////////////////////////
69    // Fields
70    ////////////////////////////////////////////////////////////////////////////////////////////
71  
72    /** Dynamic map of multicast listeners; protocol name as key. */
73    private final Map<String,MulticastListener> multicastListeners =
74      new HashMap<String,MulticastListener>();
75  
76    /** The membership service */
77    private final MembershipService pgms;
78  
79    /** Member identifier */
80    private final MemberId me;
81  
82  
83    ////////////////////////////////////////////////////////////////////////////////////////////
84    // Constructor
85    ////////////////////////////////////////////////////////////////////////////////////////////
86  
87    /**
88     *  Initialize a new <code>MulticastLayer</code> object.
89     *
90     *  @param pgms
91     *    The membership service.
92     *  @exception JgroupException
93     *    Raised if the distributed system configuration was not available.
94     */
95    private MulticastLayer(MembershipService pgms)
96      throws JgroupException
97    {
98      this.pgms = pgms;
99      this.me   = pgms.getMyIdentifier();
100   }
101 
102 
103   ////////////////////////////////////////////////////////////////////////////////////////////
104   // Static factory
105   ////////////////////////////////////////////////////////////////////////////////////////////
106 
107   public static MulticastLayer getLayer(DispatcherService ds, MembershipService pgms)
108     throws JgroupException
109   {
110     return new MulticastLayer(pgms);
111   }
112 
113 
114   ////////////////////////////////////////////////////////////////////////////////////////////
115   // Layer interface methods
116   ////////////////////////////////////////////////////////////////////////////////////////////
117 
118   /**
119    *  Add a listener object for this layer to provide upcalls to in
120    *  response to multicast events.
121    */
122   public void addListener(Object listener)
123   {
124     if (listener instanceof MulticastListener) {
125       MulticastListener mListener = (MulticastListener) listener;
126       String protocol = mListener.getProtocolName();
127       Object currentListener = multicastListeners.get(protocol);
128       if (currentListener == null)
129         multicastListeners.put(protocol, mListener);
130       else if (!currentListener.equals(listener))
131         throw new IllegalArgumentException("Multiple multicast listeners for the same protocol is illegal: " + protocol);
132     } else {
133       throw new IllegalArgumentException("Specified listener does not implement a MulticastListener");
134     }
135   }
136 
137 
138   ////////////////////////////////////////////////////////////////////////////////////////////
139   // Methods from MulticastService
140   ////////////////////////////////////////////////////////////////////////////////////////////
141 
142 
143   /* (non-Javadoc)
144    * @see jgroup.core.multicast.MulticastService#mcast(java.io.OutputStream, jgroup.core.multicast.AckListener, jgroup.core.multicast.ChainIdentifier)
145    */
146   public void mcast(OutputStream out, AckListener ackl, ChainIdentifier chId)
147     throws JgroupException
148   {
149     doChecks();
150     if (!(out instanceof OutMessage))
151       throw new JgroupException("Only OutputStream obtained through getMessage(protocol) can be multicast");
152 
153     if (log.isDebugEnabled())
154       log.debug("mcast output stream: " + out);
155     /* Send the multicast request to the daemon */
156     DaemonInteraction.addEvent(
157       new MulticastRequest(pgms.getGid(), false, (OutMessage) out, me, ackl, chId));
158   }
159 
160   /* (non-Javadoc)
161    * @see jgroup.core.multicast.MulticastService#mcast(java.io.OutputStream, jgroup.core.multicast.AckListener)
162    */
163   public void mcast(OutputStream stream, AckListener ackl)
164     throws JgroupException
165   {
166     doChecks();
167     if (!(stream instanceof OutMessage))
168       throw new JgroupException("Only OutputStream obtained through getMessage(protocol) can be multicast");
169 
170     if (log.isDebugEnabled())
171       log.debug("mcast output stream: " + stream);
172     /* Send the multicast request to the daemon */
173     DaemonInteraction.addEvent(
174       new MulticastRequest(pgms.getGid(), false, (OutMessage) stream, me, ackl, null));
175   }
176 
177   /* (non-Javadoc)
178    * @see jgroup.core.multicast.MulticastService#mcast(java.lang.String, java.lang.Object, jgroup.core.multicast.AckListener)
179    */
180   public void mcast(String protocol, Object obj, AckListener ackl)
181     throws JgroupException, IOException
182   {
183     doChecks();
184     if (!multicastListeners.containsKey(protocol))
185       throw new UnsupportedOperationException("Unsupported protocol: " + protocol);
186 
187     if (log.isDebugEnabled())
188       log.debug("mcast object (" + protocol + ")");
189     OutMessage bout = (OutMessage) getMessage(protocol);
190     ObjectOutputStream out = new ObjectOutputStream(bout);
191     out.writeObject(obj);
192     out.close();
193 
194     /* Send the multicast request to the daemon */
195     DaemonInteraction.addEvent(
196       new MulticastRequest(pgms.getGid(), true, bout, me, ackl, null));
197   }
198 
199   /* (non-Javadoc)
200    * @see jgroup.core.multicast.MulticastService#getMessage(java.lang.String)
201    */
202   public OutputStream getMessage(String protocol)
203     throws IOException
204   {
205     if (multicastListeners.isEmpty())
206       throw new UnsupportedOperationException("Multicast not supported");
207     if (!multicastListeners.containsKey(protocol))
208       throw new UnsupportedOperationException("Unsupported protocol: " + protocol);
209 
210     OutMessage outmsg = (OutMessage) MsgMcast.createOutputStream();
211     outmsg.writeUTF(protocol);
212     return outmsg;
213   }
214 
215   private void doChecks()
216     throws JgroupException
217   {
218     if (multicastListeners.isEmpty())
219       throw new UnsupportedOperationException("Multicast not supported");
220     if (!pgms.isMemberOrJoining())
221       throw new JgroupException("The gm is not member of any group (may be leaving)");
222   }
223 
224 
225   ////////////////////////////////////////////////////////////////////////////////////////////
226   // Methods from DispatcherListener
227   ////////////////////////////////////////////////////////////////////////////////////////////
228 
229   /* (non-Javadoc)
230    * @see jgroup.relacs.gm.DispatcherListener#eventTypes()
231    */
232   public int[] eventTypes()
233   {
234     return new int[] {
235       DELIVERY_EVENT
236     };
237   }
238 
239 
240   /* (non-Javadoc)
241    * @see jgroup.relacs.gm.DispatcherListener#notifyEvent(jgroup.relacs.events.Event)
242    */
243   public void notifyEvent(Event event)
244   {
245     if (event.getTag() == DELIVERY_EVENT) {
246       handleDeliveryEvent((DeliveryEvent) event);
247     }
248   }
249 
250 
251   ////////////////////////////////////////////////////////////////////////////////////////////
252   // Private methods
253   ////////////////////////////////////////////////////////////////////////////////////////////
254 
255   /**
256    * Delivers the message to the application layer. Note that the
257    * byte structure are condivided up by all of the members.
258    * 
259    * Note that care must be taken regarding the update of state variables in
260    * a server (or layer) using both MulticastListener and MembershipListener.
261    * Such updates must be syncrhonized since there is no protection from
262    * any of these layers.
263    */
264   private void handleDeliveryEvent(final DeliveryEvent event)
265   {
266     if (multicastListeners.isEmpty()) { 
267       log.warn("Received a multicast message addressed to a non-multicast member");
268       return;
269     }
270     if (log.isDebugEnabled()) {
271       NDC.push(Network.getMachineName(event.getSender().getCanonicalHostName()));
272       log.debug("Handling " + event);
273     }
274 
275     if (pgms.isMemberOrJoining()) {
276       final InMessage msg = event.getPayload();
277       final MemberId sender = event.getSender();
278       final int seqNo = event.getMessageId();
279       try {
280         Object result = null;
281         if (event.isObject()) {
282           ObjectInputStream in = new ObjectInputStream(msg);
283           String protocol = in.readUTF();
284           MulticastListener mcastListener = getListener(protocol);
285           Object obj = in.readObject();
286           result = mcastListener.deliverObject(obj, sender, seqNo);
287         } else {
288           msg.mark(0);
289           String protocol = msg.readUTF();
290           MulticastListener mcastListener = getListener(protocol);
291           result = mcastListener.deliverStream(msg, sender, seqNo);
292         }
293         if (!event.isAckRequired())
294           result = null;
295 
296         /*
297          * Returns a local ack to the daemon, to confirm that the
298          * message has been delivered to the application. Note that
299          * this ack is a local ack for the daemon, different from
300          * the remote ack that later may be sent to the other members
301          * of the group, depending whether the ack is required or not.
302          */
303         DeliveryAck deliveryAck = new DeliveryAck(
304           pgms.getGid(), event.getHostIndex(), pgms.getMemberIndex(), pgms.getViewIndex(),
305           seqNo, sender, result, me);
306         DaemonInteraction.addEvent(deliveryAck);
307 
308       } catch (Exception e) {
309         /*
310          * An exception was thrown during message delivery; we just
311          * print a warning message together with the exception.
312          */ 
313         log.warn("Exception caught during message unmarshaling", e);
314       } finally {
315         if (log.isDebugEnabled()) {
316           log.debug("handleDeliveryEvent end");
317           NDC.pop();
318         }
319       }
320     }
321   }
322 
323 
324   /**
325    * Returns the <code>MulticastListener</code> for the given protocol so that
326    * it can be invoked appropriately.
327    * 
328    * @param protocol the protocol whose listener to obtain.
329    * @return the <code>MulticastListener</code> for the provided protocol.
330    * @throws JgroupException if the provided protocol has no registered listener.
331    */
332   private MulticastListener getListener(String protocol)
333     throws JgroupException
334   {
335     if (log.isDebugEnabled())
336       log.debug("Looking for protocol: " + protocol);
337     MulticastListener mcastListener = multicastListeners.get(protocol);
338     if (mcastListener == null) {
339       throw new JgroupException("Received message with unknown protocol: " + protocol);
340     } else {
341       return mcastListener;
342     }
343   }
344 
345 } // END MulticastLayer