View Javadoc

1   /*
2    * Copyright (c) 1998-2006 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi.protocols;
20  
21  import java.io.BufferedInputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import jgroup.core.JgroupException;
27  import jgroup.core.MemberId;
28  import jgroup.core.MembershipService;
29  import jgroup.core.multicast.MulticastListener;
30  import jgroup.core.multicast.MulticastService;
31  import jgroup.relacs.gmi.GroupAckListener;
32  import jgroup.relacs.gmi.GroupInvocationDispatcher;
33  import jgroup.relacs.gmi.InvocationResult;
34  import jgroup.relacs.gmi.MethodSemantics;
35  
36  import org.apache.log4j.Logger;
37  import org.apache.log4j.MDC;
38  
39  /**
40   * Multicast represents a protocol that dispatch method invocations
41   * received through the local endpoint to all the other members of
42   * the group, including the local server.  The result from all members
43   * is returned to the local endpoint whom first received the method
44   * invocation request, and one of them is selected and passed back to
45   * the client.
46   *
47   * @author Hein Meling <hein.meling@uis.no>
48   */
49  public class Multicast
50    extends BasicDispatcher
51    implements MulticastListener
52  {
53  
54    ////////////////////////////////////////////////////////////////////////////////////////////
55    // Logger
56    ////////////////////////////////////////////////////////////////////////////////////////////
57  
58    /** Obtain logger for this class */
59    private static final Logger log = Logger.getLogger(Multicast.class);
60  
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Constants
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Protocol name for demux in the multicast layer */
67    private static final String PROTOCOL_NAME = MethodSemantics.MULTICAST.toString();
68  
69  
70    ////////////////////////////////////////////////////////////////////////////////////////////
71    // Fields
72    ////////////////////////////////////////////////////////////////////////////////////////////
73  
74    /** The multicast service interface */
75    private MulticastService mcast;
76  
77    /** The membership service interface */
78    private MembershipService pgms;
79  
80  
81    ////////////////////////////////////////////////////////////////////////////////////////////
82    // Constructor
83    ////////////////////////////////////////////////////////////////////////////////////////////
84  
85    public Multicast(GroupInvocationDispatcher dispatcher,
86        MulticastService mcast, MembershipService pgms)
87    {
88      super(dispatcher);
89      this.mcast = mcast;
90      this.pgms = pgms;
91      /*
92       * We need to listen to multicast events, and since this is
93       * not a layer it will not be handled automatically by the
94       * group manager construction mechanism.
95       */
96      mcast.addListener(this);
97    }
98  
99  
100   ////////////////////////////////////////////////////////////////////////////////////////////
101   // ProtocolDispatcher interface (overriding BasicDispatcher)
102   ////////////////////////////////////////////////////////////////////////////////////////////
103 
104   /**
105    * Handle inbound request to the local endpoint with <i>multicast</i>
106    * invocation semantics.
107    * 
108    * The local endpoint will send a multicast message to all the other group
109    * members for method invocation dispatching at each members.  They each
110    * return a result to the local endpoint, and one of these values are selected
111    * and returned to the client (GroupInvocationHandler).
112    * 
113    * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
114    */
115   public InvocationResult dispatch(InputStream in)
116     throws IOException
117   {
118     // Reduce overhead if group has a single member only
119     if (pgms.members() == 1)
120       return super.dispatch(in);
121 
122     //HACK: buffer the inputstream and mark the current position.
123     BufferedInputStream bis = new BufferedInputStream(in);
124     bis.mark(50000);
125     OutputStream mout = mcast.getMessage(PROTOCOL_NAME);
126     // Write request into multicast output stream
127     byte[] buf = new byte[1500];
128     int bytesRead;
129     while ((bytesRead = bis.read(buf)) != -1) {
130       mout.write(buf, 0, bytesRead);
131     }
132     // Do multicast
133     GroupAckListener ackListener = new GroupAckListener(this);
134     try {
135       if (log.isDebugEnabled())
136         log.debug("Multicast: multicasting invocation to group members");
137       mcast.mcast(mout, ackListener.getRemoteAckListener());
138     } catch (JgroupException jex) {
139       log.warn("Multicast invocation failed; member is not ready", jex);
140       return null;
141     }
142     // Wait for ack listener
143     boolean hasResults = ackListener.pendingCompletionOrTimeout(2);
144     if (hasResults) {
145       return (InvocationResult) ackListener.getResult();
146     } else {
147       //HACK: if no results received within timeout value; dispatch only locally.
148       bis.reset();
149       return super.dispatch(bis);
150     }
151   }
152 
153 
154   ////////////////////////////////////////////////////////////////////////////////////////////
155   // MulticastListener methods
156   ////////////////////////////////////////////////////////////////////////////////////////////
157 
158   /* (non-Javadoc)
159    * @see jgroup.core.multicast.MulticastListener#getProtocolName()
160    */
161   public String getProtocolName()
162   {
163     return PROTOCOL_NAME;
164   }
165 
166   /**
167    * Received a multicast message from a mediating endpoint for invocation
168    * dispatching at the local server.
169    * 
170    * @param msg
171    *   Multicast message received
172    * @return
173    *   The result of the multicast invocation, or <code>null</code>
174    *   if the invocation failed.
175    * 
176    * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
177    */
178   public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
179   {
180     if (log.isDebugEnabled()) {
181       MDC.put("group", "[Group: " + pgms.getGid() + "]");
182       log.debug("deliverStream() start");
183     }
184     try {
185       // Perform the actual method invocation on the local server (BasicDispatcher)
186       return super.dispatch(msg);
187     } catch (IOException e) {
188       log.warn("Failed to dispatch multicast invocation", e);
189     }
190     return null;
191   }
192 
193   /* (non-Javadoc)
194    * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object, jgroup.core.MemberId, int)
195    */
196   public Object deliverObject(Object msg, MemberId sender, int seqNo)
197   {
198     throw new UnsupportedOperationException();
199   }
200 
201 } // END Multicast