View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  import java.io.ObjectInput;
23  import java.util.Iterator;
24  import java.util.List;
25  
26  import jgroup.core.EndPoint;
27  import jgroup.relacs.daemon.DaemonMsg;
28  import jgroup.relacs.types.EndPointImpl;
29  import jgroup.relacs.types.GroupIndex;
30  import jgroup.relacs.types.MessageId;
31  import jgroup.util.InMessage;
32  import jgroup.util.OutMessage;
33  
34  import org.apache.log4j.Logger;
35  
36  /**
37   *  The <code>MsgJG</code> class is the main <i>mss</i> level message,
38   *  used for passing Jgroup related multicast messages to group members.
39   *
40   *  @author Hein Meling
41   *  @since Jgroup 1.2
42   */
43  public final class MsgJG
44    implements Msg, MssConstants, MssTag
45  {
46  
47    ////////////////////////////////////////////////////////////////////////////////////////////
48    // Logger
49    ////////////////////////////////////////////////////////////////////////////////////////////
50  
51    /** Obtain logger for this class */
52    private static Logger log = Logger.getLogger(MsgJG.class);
53  
54  
55    ////////////////////////////////////////////////////////////////////////////////////////////
56    // Static section
57    ////////////////////////////////////////////////////////////////////////////////////////////
58  
59    /**
60     *  Constant null array used when no receivers are set; meaning that
61     *  all should receive the message.  Object reuse pattern; see Item 27
62     *  in Effective Java.
63     */
64    private final static EndPoint[] NULL_ENDPOINT_ARRAY = new EndPoint[0];
65  
66  
67    ////////////////////////////////////////////////////////////////////////////////////////////
68    // Message Identifier Generator
69    ////////////////////////////////////////////////////////////////////////////////////////////
70  
71    private static transient int fragIdGen = 0;
72  
73  
74    ////////////////////////////////////////////////////////////////////////////////////////////
75    // Message Fields (static part)
76    ////////////////////////////////////////////////////////////////////////////////////////////
77  
78    /** Fragment identifier */
79    private int fragId;
80  
81    /** Tag assigned in upper level (@see jgroup.relacs.daemon.Tag) */
82    private byte JGtag;
83  
84    /** The cluster to which the message will be sent (only endpoint is transmitted) */
85    private transient Cluster cluster;
86  
87    /**
88     *  Pointer to the message trailer, which contains the dynamic part of
89     *  the message (see below)
90     */
91    private int trailerStartPos;
92  
93  
94    ////////////////////////////////////////////////////////////////////////////////////////////
95    // Message Fields (dynamic part)
96    ////////////////////////////////////////////////////////////////////////////////////////////
97  
98    /**
99     *  Receivers array (zero length if ALL should receive this message).
100    *  This array contains all receivers, when accessed from the sending
101    *  side, while on the receiver side it contains only those whom was
102    *  marshalled into the message destined for a particular cluster.
103    */
104   private EndPoint[] receivers;
105 
106   /** Distinguish between local and external flow control information */
107   private byte FCtype;
108 
109   /** Array of flow control entries */
110   private FCEntry[] fc;
111 
112 
113   ////////////////////////////////////////////////////////////////////////////////////////////
114   // Transient fields (recomputed during unmarshalling)
115   ////////////////////////////////////////////////////////////////////////////////////////////
116 
117   /** The MssDS object */
118   private transient MssDS mssds;
119 
120   /** Message to be sent */
121   private transient OutMessage outmsg;
122 
123   /** Message received from the Mss */
124   private transient InMessage inmsg;
125 
126   /** True if this host is in the destination set of this message */
127   private transient boolean isForMe;
128 
129   /** The index of the flow control entry associated with the local host */
130   private transient int localFCIndex = UNDEF; 
131 
132   /** The local host (where this message resides; sender or receiver) */
133   private transient MssHost localHost;
134 
135   /** The sender host */
136   private transient MssHost sender;
137 
138   /** The start position of the cluster entry in the outmsg stream */
139   private transient int clusterStartPosition;
140 
141   /** The start position of the receiver set in the outmsg stream */
142   private transient int receiversStartPosition;
143 
144 
145   ////////////////////////////////////////////////////////////////////////////////////////////
146   // Constructors and marshalling methods
147   ////////////////////////////////////////////////////////////////////////////////////////////
148 
149   /**
150    *  Marshal a message with receiver set all.
151    */
152   static MsgJG marshal(DaemonMsg dmsg, byte JGtag, MssDS mssds)
153     throws IOException
154   {
155     return new MsgJG(dmsg, JGtag, NULL_ENDPOINT_ARRAY, mssds);
156   }
157 
158 
159   static MsgJG marshal(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds)
160     throws IOException
161   {
162     return new MsgJG(dmsg, JGtag, receivers, mssds);
163   }
164 
165 
166   /**
167    *  Marshalling constructor: Creates a new <code>MsgJG</code>
168    *  instance, based on an upper level message originated by the Jgroup
169    *  daemon.
170    *
171    *  @param dmsg
172    *    A <code>DaemonMsg</code> from the upper level.  The set
173    *    of possible messages are in the @see jgroup.relacs.daemon
174    *    package.
175    *  @param JGtag
176    *    A one byte tag indicating an upper level message type that is
177    *    encapsulated within the given <code>outmsg</code>.  For the set
178    *    of possible values @see jgroup.relacs.daemon.Tag.
179    *  @param dest  a <code>Cluster</code> value
180    *  @param receivers  a <code>MssHost[]</code> value
181    *  @param mssds  a <code>MssDS</code> value
182    *
183    *  @exception IOException if an error occurs
184    */
185   private MsgJG(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds)
186     throws IOException
187   {
188     localHost = HostTable.getLocalHost();
189     sender = localHost;
190     this.receivers = receivers;
191     this.mssds = mssds;
192     this.outmsg = dmsg.getOutMessage();
193 
194     this.fragId = fragIdGen++;
195     this.JGtag = JGtag;
196 
197     /*
198      * The static <i>mss</i> data associated with a <code>MsgJG</code>
199      * is kept in the beginning of the stream (in the header), while the
200      * dynamic <i>mss</i> level data is kept at the end of the
201      * <code>OutMessage</code> stream (in the trailer).  In addition to
202      * the static data, we need to keep the trailer index in the header
203      * to allow unmarshalling, starting from the beginning of the
204      * <code>OutMessage</code> stream.
205      *
206      * Note that the <code>dmsg.size()</code> returns the number of
207      * bytes that originates from the daemon layer, and since the first
208      * part of every Mss level message is <code>MSS_HEADER_SIZE</code>
209      * bytes, we have to add this to get the correct trailer position.
210      */
211     trailerStartPos = dmsg.size() + MSS_HEADER_SIZE;
212     log.assertLog(dmsg.size() != 0, "Illegal position for provided DaemonMsg");
213     outmsg.seek(0);
214 
215     /*
216      * Store the trailer index in the static section of the <i>mss</i>
217      * level data.  The trailer is written using the methods
218      * <code>setCluster()</code> and <code>setReceivers()</code>.
219      */
220     outmsg.writeInt(trailerStartPos);
221     MessageId.marshal(outmsg, fragId);
222     outmsg.writeByte(JGtag);
223     clusterStartPosition = outmsg.getPosition();
224   }
225 
226 
227   /**
228    *  Returns a <code>MsgJG</code> object that contains a partially
229    *  decoded m-received input stream, containing only the static part
230    *  of the message structure.  To decode the whole stream, including
231    *  the dynamic part of the <i>mss</i> level message, all message
232    *  fragments must have been added into the message object using the
233    *  <code>addFragment()<code> method.
234    *
235    *  @param inmsg
236    *    The message input stream to decode.
237    */
238   static MsgJG unmarshal(ObjectInput inmsg, FragmentHeader header, MssDS mssds)
239     throws IOException, ClassNotFoundException
240   {
241     return new MsgJG((InMessage) inmsg, header, mssds);
242   }
243 
244 
245   /**
246    *  Unmarshalling constructor: Constructs a new <code>MsgJG</code>
247    *  data structure from an m-received message.
248    *
249    *  @param inmsg
250    *    The m-received <code>InMessage</code> object
251    *
252    *  @exception IOException
253    *    Raised if there was problem unmarshalling the message.
254    */
255   private MsgJG(InMessage inmsg, FragmentHeader header, MssDS mssds)
256     throws IOException, ClassNotFoundException
257   {
258     this.inmsg = inmsg;
259     this.mssds = mssds;
260 
261     /* Get the local host of the receiver */
262     localHost = HostTable.getLocalHost();
263     sender = header.getSender();
264 
265     /*
266      * Unmarshalling static header part.
267      */
268     /*
269      * Retrive pointer to the trailer start position in which the
270      * <i>mss</i> level message data is collected.
271      */
272     trailerStartPos = inmsg.readInt();
273     fragId = MessageId.unmarshal(inmsg);
274     JGtag = inmsg.readByte();
275     EndPointImpl endpoint = new EndPointImpl();
276     endpoint.readExternal(inmsg);
277     cluster = mssds.getClusterTable().lookup(endpoint);
278   }
279 
280 
281   /**
282    *  Unmarshalling method to complete the unmarshalling of a
283    *  <code>MsgJG</code> object.  It assumes that all fragments has been
284    *  added to the stream using the <code>addFragment()</code> method.
285    */
286   InMessage complete()
287     throws IOException, ClassNotFoundException
288   {
289     // Seek to the start of the dynamic data; the trailer
290     inmsg.seek(trailerStartPos);
291 
292     // Flow control data
293     FCtype = inmsg.readByte();
294     int len = GroupIndex.unmarshal(inmsg);
295     fc = new FCEntry[len];
296     for (int i = 0; i < len; i++) {
297       fc[i] = new FCEntry();
298       fc[i].readExternal(inmsg);
299       if (localFCIndex == UNDEF && fc[i].key.isLocal())
300         localFCIndex = i;
301     }
302 
303     // Set of receivers
304     int nreceivers = GroupIndex.unmarshal(inmsg);
305     if (nreceivers == ALL) {
306       isForMe = true;
307       receivers = NULL_ENDPOINT_ARRAY;
308     } else {
309       isForMe = false;
310       receivers = new EndPoint[nreceivers];
311       for (int i = 0; i < nreceivers; i++) {
312         receivers[i] = new EndPointImpl();
313         receivers[i].readExternal(inmsg);
314         if (receivers[i].isLocal())
315           isForMe = true;
316       }
317     }
318 
319     /*
320      * We are now ready to update the flow control information, attached
321      * with this message.
322      */
323     updateFC();
324 
325     /*
326      * Seek to position <code>MSS_HEADER_SIZE</code> and mark that
327      * position so that any reset will return to that position
328      * instead; this is to avoid that external message users see the
329      * static header part of this message. <p>
330      *
331      * PS; Note that the parameter to the mark() method is not used.
332      */
333     inmsg.seek(MSS_HEADER_SIZE);
334     inmsg.mark(0);
335     return inmsg;
336   }
337 
338 
339   ////////////////////////////////////////////////////////////////////////////////////////////
340   // Fragmentation handling (from Msg interface)
341   ////////////////////////////////////////////////////////////////////////////////////////////
342 
343   /**
344    *  Returns a <code>FragmentIterator</code> for this
345    *  <code>MsgJG</code> object.  This iterator allows to send the
346    *  entire message as multiple fragments of specified size (payload).
347    *  At the same time, it marks each fragment with a tag and message
348    *  identifier provided through the <code>next()</code> method of the
349    *  iterator.
350    *
351    *  Note that <code>MsgJG</code> messages cannot reuse the same
352    *  iterator for sending to multiple clusters.  This is because, at
353    *  each iteration reuse, the fragment identifier is changed, causing
354    *  problems when doing lookup in the sent queue to resend a message.
355    */
356   public FragmentIterator iterator(MsgCntrl msgCntrl)
357   {
358     return new MsgFragmentIterator(this, msgCntrl);
359   }
360 
361 
362   ////////////////////////////////////////////////////////////////////////////////////////////
363   // Msg interface methods
364   ////////////////////////////////////////////////////////////////////////////////////////////
365 
366   /**
367    *  Returns the tag associated with this message.
368    */
369   public byte getTag()
370   {
371     return JG;
372   }
373 
374 
375   /**
376    *  Returns the message identifier for this message.
377    */
378   public int getMid()
379   {
380     return fragId;
381   }
382 
383 
384   /**
385    *  Returns the sender of this message.
386    */
387   public MssHost getSender()
388   {
389     return sender;
390   }
391 
392 
393   /**
394    *  Returns true if this message has to be routed to a different
395    *  cluster.
396    */
397   public boolean hasToBeRouted()
398   {
399     return !cluster.isLocal();
400   }
401 
402 
403   /**
404    *  Returns the message flow controller for the sender side.
405    */
406   public MsgFlowSndrSide getMsgFlow()
407   {
408     return cluster.getMsgFlow();
409   }
410 
411 
412   /**
413    *  Returns the <code>OutMessage</code> associated with this message.
414    */
415   public OutMessage getOutMessage()
416   {
417     return outmsg;
418   }
419 
420 
421   ////////////////////////////////////////////////////////////////////////////////////////////
422   // MsgJG specified methods
423   ////////////////////////////////////////////////////////////////////////////////////////////
424 
425   /**
426    *  Marshal receivers into the stream of this message.  If there are
427    *  no receivers specified (the <code>receivers</code> list is empty),
428    *  this means that all members should receive this message. <p>
429    *
430    *  This method is invoked for each cluster, to marshal the receivers
431    *  that belong to that cluster into the stream.  That means, not all
432    *  actual receivers may be marshalled into the stream.
433    *
434    *  @param receivers
435    *    List of receivers for this message; if the list is empty, all
436    *    should receive the message.
437    */
438   public void setReceivers(List receivers)
439   {
440     outmsg.seek(receiversStartPosition);
441     try {
442       GroupIndex.marshal(outmsg, receivers.size());
443       for (Iterator iter = receivers.iterator(); iter.hasNext(); ) {
444         EndPoint recv = (EndPoint) iter.next();
445         recv.writeExternal(outmsg);
446         if (log.isDebugEnabled()) {
447           log.debug("setReceivers: " + recv);
448         }
449       }
450     } catch (IOException e) {
451       log.warn("setReceivers: Could not update receiver information in message stream");
452     }
453   }
454 
455 
456   public void setCluster(Cluster cluster)
457   {
458     if (log.isDebugEnabled()) {
459       log.debug("setCluster: " + cluster);
460     }
461 
462     this.cluster = cluster;
463     if (cluster.isLocal()) {
464       FCtype = LOCALFC;
465       fc = mssds.getAllFCEntry();
466     } else {
467       FCtype = EXTERNFC;
468       fc = mssds.getClusterFCEntry(cluster.getEndPoint());
469     }
470 
471     /* Find the location for writting the cluster. */
472     outmsg.seek(clusterStartPosition);
473     try {
474       cluster.getEndPoint().writeExternal(outmsg);
475     } catch (IOException e) {
476       log.warn("setCluster: Could not update cluster in message stream");
477     }
478 
479     /*
480      * Search for the trailer location, and write out the flow control
481      * information.
482      */
483     outmsg.seek(trailerStartPos);
484     try {
485       outmsg.writeByte(FCtype);
486       GroupIndex.marshal(outmsg, fc.length);
487       for (int i = 0; i < fc.length; i++) {
488         fc[i].writeExternal(outmsg);
489       }
490     } catch (IOException e) {
491       log.warn("setCluster: Could not update flow control information in message stream");
492     }
493 
494     /* Store the location for the receiver set. */
495     receiversStartPosition = outmsg.getPosition();
496   }
497 
498 
499   /**
500    *  Returns the destination cluster of this message.
501    */
502   public Cluster getCluster()
503   {
504     return cluster;
505   }
506 
507 
508   /**
509    *  Returns the receiver set for this message.  On the sending side,
510    *  this includes all receivers, while on the receiver side this may
511    *  only include those that were marshalled into the message for a
512    *  particular cluster.
513    */
514   public EndPoint[] getReceivers()
515   {
516     return receivers;
517   }
518 
519 
520   private void updateFC()
521   {
522     switch (FCtype) {
523 
524     case EXTERNFC:
525       /* Check if there is a flow control entry for the local host */
526       if (localFCIndex != UNDEF) {
527         /* Update congestion information (sender side flow control) */
528         sender.flush(fc[localFCIndex].lastMsgRcvd);
529       }
530       break;
531 
532     case LOCALFC:
533       if (sender.isLocal()) {
534 
535         localHost.flush();
536 
537       } else {
538 
539         for (int i = 0; i < fc.length; i++) {
540           MssHost host = mssds.hostLookup(fc[i].key);
541           if (host == null) {
542             log.warn("Unavailable host: " + fc[i].key);
543             continue;
544           }
545           host.getMsgFlow().clusterWindow.set(sender.getClusterIndex(), fc[i].lastMsgRcvd);
546         }
547 
548       }
549       break;
550 
551     default:
552       log.warn("Erroneous flow control type");
553     }
554   }
555 
556 
557   /**
558    *  Returns the upper level Jgroup tag associated with this message.
559    *
560    *  @see jgroup.relacs.daemon.Tag
561    */
562   public byte getJGTag()
563   {
564     return JGtag;
565   }
566 
567 
568   /**
569    *  Returns true if this message is destined for the local host.
570    */
571   boolean isForMe()
572   {
573     return isForMe;
574   }
575 
576 
577   ////////////////////////////////////////////////////////////////////////////////////////////
578   // Methods from Object
579   ////////////////////////////////////////////////////////////////////////////////////////////
580 
581   /**
582    *  Returns a string representation of this object
583    */
584   public String toString()
585   {
586     StringBuilder buf = new StringBuilder("MsgJG: jgtag=");
587     buf.append(JGtag);
588     buf.append(", sender=");
589     buf.append(sender);
590     buf.append(", cluster=");
591     buf.append(cluster);
592     buf.append(", nreceivers=");
593     buf.append(receivers.length);
594     for (int i = 0; i < receivers.length; i++) {
595       buf.append(", ");
596       buf.append(receivers[i]);
597       if (receivers[i].isLocal())
598         buf.append(" (LOCAL)");
599     }
600     if (outmsg != null) {
601       buf.append(", ");
602       buf.append(outmsg.toString());
603     } else if (inmsg != null) {
604       buf.append(", ");
605       buf.append(inmsg.toString());
606     }
607     return buf.toString();
608   }
609 
610 }