View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  
23  import jgroup.relacs.types.GroupIndex;
24  import jgroup.util.InMessage;
25  import jgroup.util.MsgFactory;
26  import jgroup.util.OutMessage;
27  
28  import org.apache.log4j.Logger;
29  
30  /**
31   *  The <code>MsgIamAlive</code> class
32   *
33   *  @author Hein Meling
34   *  @since Jgroup 1.2
35   */
36  final class MsgIamAlive
37    implements Msg, MssConstants, MssTag
38  {
39  
40    ////////////////////////////////////////////////////////////////////////////////////////////
41    // Logger
42    ////////////////////////////////////////////////////////////////////////////////////////////
43  
44    /** Obtain logger for this class */
45    private static Logger log = Logger.getLogger(MsgIamAlive.class);
46  
47  
48    ////////////////////////////////////////////////////////////////////////////////////////////
49    // Unique message instance
50    ////////////////////////////////////////////////////////////////////////////////////////////
51  
52    /**
53     *  As this message is never stored in a queue, a single message
54     *  instance is sufficient.  This allows us to save the cost of
55     *  allocating and garbage collecting messages.
56     */
57    private static final MsgIamAlive msg = new MsgIamAlive();
58  
59  
60    ////////////////////////////////////////////////////////////////////////////////////////////
61    // Message Fields (dynamic part)
62    ////////////////////////////////////////////////////////////////////////////////////////////
63  
64    /** Array of flow control entries */
65    private FCEntry[] fc;
66  
67  
68    ////////////////////////////////////////////////////////////////////////////////////////////
69    // Transient fields (recomputed during unmarshalling)
70    ////////////////////////////////////////////////////////////////////////////////////////////
71  
72    /** Message to be sent */
73    private transient OutMessage outmsg;
74  
75    /** Message received from the Mss */
76    private transient InMessage inmsg;
77  
78    /** The index of the flow control entry associated with the local host */
79    private transient int localFCIndex = UNDEF; 
80  
81    /** The local host (where this message resides; sender or receiver) */
82    private transient MssHost localHost;
83  
84    /** The message sender (obtained from the fragment header) */
85    private transient MssHost sender;
86  
87    /** The incarnation identifier for the sender (obtained from the fragment header) */
88    private transient int incarnationId;
89  
90  
91    ////////////////////////////////////////////////////////////////////////////////////////////
92    // Marshalling and unmarshalling methods
93    ////////////////////////////////////////////////////////////////////////////////////////////
94  
95    /**
96     *  Marshal an IAMALIVE message.
97     */
98    static MsgIamAlive marshal(FCEntry[] fc)
99      throws IOException
100   {
101     msg.fc     = fc;
102     msg.sender = HostTable.getLocalHost();
103 
104     /*
105      * Here we check if this <code>MsgIamAlive</code> object has already
106      * been initialized with an outmsg object, in which case we simply
107      * clear the message object and reuse it.  This avoids to create a
108      * new outmsg object (allocating additional memory) and there is no
109      * need to garbage collect the old one.
110      *
111      * Note that all <code>MsgIamAlive</code> messages are of the same
112      * size, including the flow control data of all hosts.
113      */
114     int size = GroupIndex.SIZE + fc.length*FCEntry.SIZE;
115     if (msg.outmsg == null) {
116       msg.outmsg = MsgFactory.get(size);
117     }
118     if (log.isDebugEnabled()) {
119       log.debug("MsgIamAlive.size=" + size);
120     }
121     msg.outmsg.clear(size);
122 
123     GroupIndex.marshal(msg.outmsg, fc.length);
124     for (int i = 0; i < fc.length; i++)
125       fc[i].writeExternal(msg.outmsg);
126     return msg;
127   }
128 
129 
130   /**
131    *  Unmarshal an IAMALIVE message from the given stream.
132    */
133   static MsgIamAlive unmarshal(InMessage inmsg, FragmentHeader header)
134     throws IOException, ClassNotFoundException
135   {
136     msg.inmsg         = inmsg;
137     msg.localHost     = HostTable.getLocalHost();
138     msg.sender        = header.getSender();
139     msg.incarnationId = header.getIncarnationId();
140 
141     int len = GroupIndex.unmarshal(inmsg);
142     msg.fc = new FCEntry[len];
143     for (int i = 0; i < len; i++) {
144       msg.fc[i] = new FCEntry();
145       msg.fc[i].readExternal(inmsg);
146       if (msg.localFCIndex == UNDEF && msg.fc[i].key.isLocal())
147         msg.localFCIndex = i;
148     }
149     return msg;
150   }
151 
152 
153   ////////////////////////////////////////////////////////////////////////////////////////////
154   // MsgIamAlive specific methods (receiver side only)
155   ////////////////////////////////////////////////////////////////////////////////////////////
156 
157   /**
158    *  Update the flow control data on the receiver side.
159    */
160   void updateFC(MssDS mssds)
161   {
162     if (sender.isLocal()) {
163 
164       localHost.flush();
165 
166     } else {
167 
168       for (int i = 0; i < fc.length; i++) {
169         MssHost host = mssds.hostLookup(fc[i].key);
170         if (host == null) {
171           log.error("Unavailable host: " + fc[i].key);
172           continue;
173         }
174         if (log.isDebugEnabled()) {
175           log.debug("host=" + host + ", lastMsgRcvd=" + fc[i].lastMsgRcvd
176                     + ", cindex=" + sender.getClusterIndex());
177         }
178         host.getMsgFlow().clusterWindow.set(sender.getClusterIndex(), fc[i].lastMsgRcvd);
179       }
180 
181     }
182   }
183 
184 
185   /**
186    *  Returns incarnation identifier for the sender of this message.
187    */
188   public int getIncarnationId()
189   {
190     return incarnationId;
191   }
192 
193 
194   ////////////////////////////////////////////////////////////////////////////////////////////
195   // Fragmentation handling (from Msg interface)
196   ////////////////////////////////////////////////////////////////////////////////////////////
197 
198   /*
199    * The fragmentation iterator instance for this message; it can be
200    * reused for multiple iterations over this message.
201    */
202   private transient MsgFragmentIterator fragIter = null;
203 
204 
205   /**
206    *  Returns a <code>FragmentIterator</code> for this
207    *  <code>MsgIamAlive</code> object.  This iterator allows to send the
208    *  entire message as multiple fragments of specified size (payload).
209    *  At the same time, it marks each fragment with a tag and message
210    *  identifier provided through the <code>next()</code> method of the
211    *  iterator.
212    *
213    *  If there is already an iterator for this message, we reuse it.
214    */
215   public FragmentIterator iterator(MsgCntrl msgCntrl)
216   {
217     if (fragIter == null)
218       fragIter = new MsgFragmentIterator(this, msgCntrl);
219     else 
220       fragIter.reset(outmsg);
221     return fragIter;
222   }
223 
224 
225   ////////////////////////////////////////////////////////////////////////////////////////////
226   // Msg interface methods
227   ////////////////////////////////////////////////////////////////////////////////////////////
228 
229   /**
230    *  Returns the tag associated with this message.
231    */
232   public byte getTag()
233   {
234     return IAMALIVE;
235   }
236 
237 
238   /**
239    *  Returns the message identifier for this message; all IAMALIVE
240    *  messages have <code>UNDEF</code> as their message identifier.
241    */
242   public int getMid()
243   {
244     return UNDEF;
245   }
246 
247 
248   /**
249    *  Returns the sender of this message.
250    */
251   public MssHost getSender()
252   {
253     return sender;
254   }
255 
256 
257   /**
258    *  Returns false always, since topology messages should never be routed.
259    */
260   public boolean hasToBeRouted()
261   {
262     return false;
263   }
264 
265 
266   /**
267    *  Returns true if the sender of this message is the local host.
268    */
269   boolean isLocal()
270   {
271     return sender.isLocal();
272   }
273 
274 
275   /**
276    *  Returns the message flow controller for the sender side.
277    */
278   public MsgFlowSndrSide getMsgFlow()
279   {
280     return sender.getCluster().getMsgFlow();
281   }
282 
283 
284   /**
285    *  Returns the <code>OutMessage</code> associated with this message.
286    */
287   public OutMessage getOutMessage()
288   {
289     return outmsg;
290   }
291 
292 
293   ////////////////////////////////////////////////////////////////////////////////////////////
294   // Methods from Object
295   ////////////////////////////////////////////////////////////////////////////////////////////
296 
297   /**
298    *  Returns a string representation of this object
299    */
300   public String toString()
301   {
302     StringBuilder buf = new StringBuilder();
303     buf.append("IAMALIVE, ");
304     for (int i=0; i < fc.length; i++)
305       buf.append(fc[i]);
306     return buf.toString();
307   }
308   
309 } // END MsgIamAlive