View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  import java.io.IOException;
22  import java.io.ObjectInput;
23  import java.io.OutputStream;
24  
25  import jgroup.core.MemberId;
26  import jgroup.core.multicast.AckListener;
27  import jgroup.relacs.events.MulticastRequest;
28  import jgroup.relacs.mss.MssConstants;
29  import jgroup.relacs.types.Flag;
30  import jgroup.relacs.types.GroupId;
31  import jgroup.relacs.types.GroupIndex;
32  import jgroup.relacs.types.MemberIdImpl;
33  import jgroup.relacs.types.MessageId;
34  import jgroup.relacs.types.MessageLen;
35  import jgroup.relacs.types.ViewId;
36  import jgroup.util.InMessage;
37  import jgroup.util.MsgFactory;
38  import jgroup.util.OutMessage;
39  
40  /**
41   * The <code>MsgMcast</code> class
42   *
43   * @author  Alberto Montresor
44   * @since   Jgroup 0.1
45   */
46  public final class MsgMcast
47    implements MssConstants, DaemonMsg
48  {
49  
50    ////////////////////////////////////////////////////////////////////////////////////////////
51    // Position constants
52    ////////////////////////////////////////////////////////////////////////////////////////////
53  
54    private static final int START_FIXED = MSS_HEADER_SIZE;
55  
56    private static final int FIXED_SIZE  = GroupId.SIZE + Flag.SIZE*2 + MemberIdImpl.SIZE + MessageLen.SIZE;
57    private static final int START_VAR   = START_FIXED + FIXED_SIZE;
58  
59    private static final int VAR_SIZE    = GroupIndex.SIZE + MessageId.SIZE + ViewId.SIZE;
60    private static final int START_DATA  = START_VAR + VAR_SIZE;
61  
62  
63    ////////////////////////////////////////////////////////////////////////////////////////////
64    // Serialized fields
65    ////////////////////////////////////////////////////////////////////////////////////////////
66  
67    /** True if this message carries a single object */
68    boolean isObject;
69  
70    /** True if an ack is required */
71    boolean ackr;
72    
73    /** Identifier of the sender */
74    MemberId sender;
75    
76    /** Position of the sender in view identified by <code>vid</code> */
77    int hpos;
78    
79    /** View identifier */
80    long vid;
81    
82    /** Message identifier */
83    int mid;
84    
85    /** Ack listener */
86    AckListener ackl;
87  
88    /** Message to be sent */
89    private OutMessage outmsg;
90    
91    /** Message received from the Mss */
92    private InMessage inmsg;
93    
94    /** Set of acks associated with this message */
95    // FIXME: Should be moved in an external datastructure
96    private boolean[] ackset;
97  
98    /** Number of false values in ackset */
99    private int nfalse;
100 
101   /** MsgMcast message size (excluding the mss header) */
102   private int size;
103 
104 
105   ////////////////////////////////////////////////////////////////////////////////////////////
106   // Constructors and additional methods
107   ////////////////////////////////////////////////////////////////////////////////////////////
108 
109   /*
110    *  Constructs a <code>MsgMcast</code> data structure starting from a
111    *  m-received message.
112    */
113   private MsgMcast(InMessage inmsg)
114     throws IOException, ClassNotFoundException
115   {
116     isObject = Flag.unmarshal(inmsg);
117     ackr = Flag.unmarshal(inmsg);
118     sender = new MemberIdImpl();
119     sender.readExternal(inmsg);
120     size = MessageLen.unmarshal(inmsg);
121     hpos = GroupIndex.unmarshal(inmsg);
122     vid  = ViewId.unmarshal(inmsg);
123     mid  = MessageId.unmarshal(inmsg);
124 
125     this.inmsg = inmsg;
126     inmsg.seek(START_DATA);
127     outmsg = new OutMessage(inmsg);
128   }
129 
130 
131   /**
132    *  Creates an uncompleted <code>MsgMcast</code> object; other fields
133    *  will be set when the information will be available.
134    */
135   MsgMcast(MulticastRequest mcastRequest)
136     throws IOException
137   {
138     this.outmsg = mcastRequest.getPayload();
139     /*
140      * Computes the total size of this MsgMcast message, including that
141      * application level data of the mcast.  This excludes the size
142      * of the mss header fields.  Note that we can use the outmsg.size()
143      * method to compute the application data size, since only app data
144      * has been written to the outmsg stream.
145      */
146     size = outmsg.size() + FIXED_SIZE + VAR_SIZE;
147 
148     // Store info for local message
149     this.isObject = mcastRequest.isObject();
150     this.ackl = mcastRequest.getAckListener();
151     this.ackr = (ackl != null);
152     this.sender = mcastRequest.getSender();
153 
154     // Writes fixed part of the header
155     outmsg.seek(START_FIXED);
156     GroupId.marshal(outmsg, mcastRequest.getGid());
157     Flag.marshal(outmsg, isObject);
158     Flag.marshal(outmsg, ackr);
159     sender.writeExternal(outmsg);
160     MessageLen.marshal(outmsg, size);
161   }
162 
163 
164   /**
165    *  Complete the message structure with the data related to the view
166    *  in which the message is sent.
167    *
168    *  @param     hpos    host index in the view.
169    *  @param     vid       view identifier.
170    *  @param     mid     message identifier.
171    */
172   void complete(int hpos, long vid, int mid)
173     throws IOException
174   {
175     // Writes variable part of the header
176     outmsg.seek(START_VAR);
177     GroupIndex.marshal(outmsg, hpos);
178     ViewId.marshal(outmsg, vid);
179     MessageId.marshal(outmsg, mid);
180 
181     // Store info for local message
182     this.vid  = vid;
183     this.hpos = hpos;
184     this.mid  = mid;
185 
186     inmsg = new InMessage(outmsg);
187   }
188 
189 
190   ////////////////////////////////////////////////////////////////////////////////////////////
191   // Static methods
192   ////////////////////////////////////////////////////////////////////////////////////////////
193 
194   /**
195    *
196    */
197   public static OutputStream createOutputStream()
198   {
199     OutMessage stream = MsgFactory.get();
200     stream.seek(START_DATA);
201     return stream;
202   }
203 
204 
205   /**
206    *  Returns a <code>MsgMcast</code> object that contains the decoding
207    *  of the m-received input stream.
208    *
209    *  @param inmsg    the message input stream to decode
210    */
211   static MsgMcast unmarshal(ObjectInput inmsg)
212     throws IOException, ClassNotFoundException
213   {
214     return new MsgMcast((InMessage) inmsg);
215   }
216 
217 
218   /**
219    *
220    */
221   InMessage getInMessage()
222   {
223     /*
224      * This must be synchronized on the inmsg object so as to avoid
225      * concurrent access to the inmsg object, since most methods in 
226      * the InMessage class modify its state (the position field in
227      * particular), including methods such as seek(), read(), insert()
228      * and so on.  Such concurrent access may occur when two competing
229      * threads (Ehandler and Dispatcher) want to access the inmsg object.
230      * For example, one thread may be reading the inmsg object for
231      * delivery to an upper-layer (see IntGroupHandler.deliverStream()),
232      * while another thread wants to send the inmsg to another member of
233      * the group, and thus needs to seek to the START_DATA position;
234      * see MsgMcast.getInMessage().
235      */
236     synchronized (inmsg) {
237       inmsg.seek(START_DATA);
238       return inmsg;
239     }
240   }
241 
242 
243   ////////////////////////////////////////////////////////////////////////////////////////////
244   // Methods from DaemonMsg
245   ////////////////////////////////////////////////////////////////////////////////////////////
246 
247   public int size()
248   {
249     return size;
250   }
251 
252   public OutMessage getOutMessage()
253   {
254     return outmsg;
255   }
256 
257 
258   ////////////////////////////////////////////////////////////////////////////////////////////
259   // Ack management
260   ////////////////////////////////////////////////////////////////////////////////////////////
261 
262   void setAckArray(int len)
263   {
264     ackset = new boolean[len];
265     nfalse = len;
266   }
267   
268   void setAck(int pos)
269   {
270     if (!ackset[pos])
271       nfalse--;
272     ackset[pos] = true;
273   }
274 
275   boolean isComplete()
276   {
277     return (nfalse == 0);
278   }
279 
280 
281   /**
282    *  Returns a string representation of this object
283    */
284   public String toString()
285   {
286     StringBuilder b = new StringBuilder();
287     b.append("[MsgMcast: ");
288     b.append("mid=");
289     b.append(mid);
290     b.append(", sender=");
291     b.append(sender);
292     b.append(", vid=");
293     b.append(vid);
294     if (inmsg != null) {
295       b.append(", ");
296       b.append(inmsg);
297     }
298     b.append("]");
299     return b.toString();
300   }
301 
302 } // END MsgMcast