View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  
23  import jgroup.core.EndPoint;
24  import jgroup.relacs.types.GroupIndex;
25  import jgroup.util.InMessage;
26  import jgroup.util.MsgFactory;
27  import jgroup.util.OutMessage;
28  
29  import org.apache.log4j.Logger;
30  
31  /**
32   *  The <code>MsgRouting</code> class
33   *
34   *  @author Hein Meling
35   *  @since Jgroup 1.2
36   */
37  public final class MsgRouting
38    implements Msg, MssConstants, MssTag
39  {
40  
41    ////////////////////////////////////////////////////////////////////////////////////////////
42    // Logger
43    ////////////////////////////////////////////////////////////////////////////////////////////
44  
45    /** Obtain logger for this class */
46    private static Logger log = Logger.getLogger(MsgRouting.class);
47  
48  
49    ////////////////////////////////////////////////////////////////////////////////////////////
50    // Unique message instance
51    ////////////////////////////////////////////////////////////////////////////////////////////
52  
53    /**
54     *  As this message is never stored in a queue, a single message
55     *  instance is sufficient.  This allows us to save the cost of
56     *  allocating and garbage collecting messages.
57     */
58    private static final MsgRouting msg = new MsgRouting();
59  
60  
61    ////////////////////////////////////////////////////////////////////////////////////////////
62    // Message pointers
63    ////////////////////////////////////////////////////////////////////////////////////////////
64  
65    private transient int fcStartPosition;
66  
67  
68    ////////////////////////////////////////////////////////////////////////////////////////////
69    // Message Fields (static part)
70    ////////////////////////////////////////////////////////////////////////////////////////////
71  
72    /* Tag assigned in upper level (@see jgroup.relacs.mss.MssTag) */
73    private byte routingTag;
74  
75  
76    ////////////////////////////////////////////////////////////////////////////////////////////
77    // Message Fields (dynamic part)
78    ////////////////////////////////////////////////////////////////////////////////////////////
79  
80    /** Routing table */
81    private RoutingTable rt;
82  
83    /** Array of flow control entries */
84    private FCEntry[] fc;
85  
86  
87    ////////////////////////////////////////////////////////////////////////////////////////////
88    // Transient fields (recomputed during unmarshalling)
89    ////////////////////////////////////////////////////////////////////////////////////////////
90  
91    /** The MssDS object */
92    private transient MssDS mssds;
93  
94    /** Message to be sent */
95    private transient OutMessage outmsg;
96  
97    /** The index of the flow control entry associated with the local host */
98    private transient int localFCIndex = UNDEF; 
99  
100   /** The message sender (obtained from the fragment header) */
101   private transient MssHost sender;
102 
103 
104   ////////////////////////////////////////////////////////////////////////////////////////////
105   // Marshalling and unmarshalling methods
106   ////////////////////////////////////////////////////////////////////////////////////////////
107 
108   /**
109    *  Marshal a ROUTING message.
110    */
111   static MsgRouting marshal(MssDS mssds)
112     throws IOException
113   {
114     msg.routingTag = ROUTING;
115     msg.mssds      = mssds;
116     msg.sender     = HostTable.getLocalHost();
117     msg.rt         = mssds.getRoutingTable();
118 
119     /*
120      * Compute the max size for this message.
121      */
122     int size = GroupIndex.SIZE + mssds.size()*FCEntry.SIZE;
123     /*
124      * Here we check if this <code>MsgRouting</code> object has already
125      * been initialized with an outmsg object, in which case we simply
126      * clear the message object and reuse it.  This avoids to create a
127      * new outmsg object (allocating additional memory) and there is no
128      * need to garbage collect the old one.
129      */
130     if (msg.outmsg == null) {
131       int maxsize = size + RoutingTable.getMaxSize(mssds.size(), mssds.numOfClusters());
132       if (log.isDebugEnabled()) {
133         log.debug("MsgRouting.maxsize=" + maxsize);
134       }
135       msg.outmsg = MsgFactory.get(maxsize);
136     }
137     size += msg.rt.getMarshalSize();
138     if (log.isDebugEnabled()) {
139       log.debug("MsgRouting.size=" + size);
140     }
141     msg.outmsg.clear(size);
142 
143     /* Marshal the routing table */
144     msg.rt.writeExternal(msg.outmsg);
145     msg.fcStartPosition = msg.outmsg.getPosition();
146     /*
147      * Note that, we do not initialize the flow control here, since the
148      * mss will do so using the <code>setFCData()</code> method below,
149      * for each cluster; allowing the mss to reuse the same
150      * <code>MsgRouting</code> object for sending to all clusters.
151      */
152     return msg;
153   }
154 
155 
156   /**
157    *  Unmarshal an FWDROUTING message from the given stream.
158    */
159   static MsgRouting unmarshal(InMessage in, FragmentHeader header)
160     throws IOException, ClassNotFoundException
161   {
162     /* Obtain stuff from the header */
163     msg.routingTag = header.getTag();
164     msg.sender     = header.getSender();
165 
166     /* Obtain the routing table from the stream */
167     msg.rt = new RoutingTable();
168     msg.rt.readExternal(in);
169 
170     /* Obtain the flow control information table from the stream */
171     int len = GroupIndex.unmarshal(in);
172     msg.fc = new FCEntry[len];
173     for (int i=0; i < len; i++) {
174       msg.fc[i] = new FCEntry();
175       msg.fc[i].readExternal(in);
176       if (msg.localFCIndex == UNDEF && msg.fc[i].key.isLocal())
177         msg.localFCIndex = i;
178     }
179     return msg;
180   }
181 
182 
183   ////////////////////////////////////////////////////////////////////////////////////////////
184   // MsgRouting specific method
185   ////////////////////////////////////////////////////////////////////////////////////////////
186 
187   /**
188    *  Return the largest possible size that a <code>MsgRouting</code>
189    *  message may consume, for the given distributed system size.
190    */
191   public static int getMaxSize(int numOfHosts, int numOfClusters)
192   {
193     return GroupIndex.SIZE + numOfHosts*FCEntry.SIZE
194       + RoutingTable.getMaxSize(numOfHosts, numOfClusters);
195   }
196 
197 
198   /**
199    *  Update the flow control data on the receiver side.
200    */
201   void updateFC() 
202   {
203     if (routingTag == FWDROUTING) {
204 
205       /* Check if there is a flow control entry for the local host */
206       if (localFCIndex != UNDEF) {
207         /* Update congestion information (sender side flow control) */
208         sender.flush(fc[localFCIndex].lastMsgRcvd);
209       }
210 
211     } else {
212 
213       log.warn("Erroneous routing tag; can only process FWDROUTING: " + routingTag);
214 
215     }
216   }
217 
218 
219   /**
220    *  Returns the sender cluster of this message.
221    */
222   Cluster getCluster()
223   {
224     return sender.getCluster();
225   }
226 
227 
228   /**
229    *  Returns the topology table associated with the routing table
230    *  attached with this message.
231    */
232   public TopologyEntry[] getTopologyTable()
233   {
234     return rt.table;
235   }
236 
237 
238   /**
239    *  Negate the cost as reverse poison 
240    */
241   void splitHorizonOn(EndPoint key) 
242   {
243     rt.splitHorizonOn(key);
244   }
245 
246 
247   /**
248    *  
249    */
250   void splitHorizonOff() 
251   {
252     rt.splitHorizonOff();
253   }
254 
255 
256   void setFCData(EndPoint destCluster)
257   {
258     fc = mssds.getClusterFCEntry(destCluster);
259     outmsg.seek(fcStartPosition);
260     try {
261       GroupIndex.marshal(outmsg, fc.length);
262       for (int i=0; i < fc.length; i++)
263         fc[i].writeExternal(outmsg);
264     } catch (IOException e) {
265       log.error("setFCData: Could not update flow control information in message stream", e);
266     }
267   }
268 
269 
270   ////////////////////////////////////////////////////////////////////////////////////////////
271   // Fragmentation handling (from Msg interface)
272   ////////////////////////////////////////////////////////////////////////////////////////////
273 
274   /*
275    * The fragmentation iterator instance for this message; it can be
276    * reused for multiple iterations over this message.
277    */
278   private transient MsgFragmentIterator fragIter = null;
279 
280 
281   /**
282    *  Returns a <code>FragmentIterator</code> for this
283    *  <code>MsgRouting</code> object.  This iterator allows to send the
284    *  entire message as multiple fragments of specified size (payload).
285    *  At the same time, it marks each fragment with a tag and message
286    *  identifier provided through the <code>next()</code> method of the
287    *  iterator.
288    */
289   public FragmentIterator iterator(MsgCntrl msgCntrl)
290   {
291     /* If there is already an iterator for this message, we reuse it. */
292     if (fragIter == null)
293       fragIter = new MsgFragmentIterator(this, msgCntrl);
294     else 
295       fragIter.reset(outmsg);
296     return fragIter;
297   }
298 
299 
300   ////////////////////////////////////////////////////////////////////////////////////////////
301   // Msg interface methods
302   ////////////////////////////////////////////////////////////////////////////////////////////
303 
304   /**
305    *  Returns the tag associated to this message.
306    */
307   public byte getTag()
308   {
309     return routingTag;
310   }
311 
312 
313   /**
314    *  Returns the message identifier for this message; all routing
315    *  messages have <code>UNDEF</code> as their message identifier.
316    */
317   public int getMid()
318   {
319     return UNDEF;
320   }
321 
322 
323   /**
324    *  Returns the sender of this message.
325    */
326   public MssHost getSender()
327   {
328     return sender;
329   }
330 
331 
332   /**
333    *  Returns false always, since routing messages should never be routed.
334    */
335   public boolean hasToBeRouted()
336   {
337     return false;
338   }
339 
340 
341   /**
342    *  Returns true if the sender of this message is the local host.
343    */
344   boolean isLocal()
345   {
346     return sender.isLocal();
347   }
348 
349 
350   /**
351    *  Returns the message flow controller for the sender side.
352    */
353   public MsgFlowSndrSide getMsgFlow()
354   {
355     return sender.getCluster().getMsgFlow();
356   }
357 
358 
359   /**
360    *  Returns the <code>OutMessage</code> associated with this message.
361    */
362   public OutMessage getOutMessage()
363   {
364     return outmsg;
365   }
366 
367 
368   ////////////////////////////////////////////////////////////////////////////////////////////
369   // Methods from Object
370   ////////////////////////////////////////////////////////////////////////////////////////////
371 
372   /**
373    *  Returns a string representation of this object
374    */
375   public String toString()
376   {
377     StringBuilder buf = new StringBuilder();
378     switch (routingTag) {
379     case FWDROUTING:
380       buf.append("FWDROUTING, ");
381       buf.append(rt.toString());
382       for (int i=0; i < fc.length; i++)
383         buf.append(fc[i]);
384       break;
385 
386     case ROUTING:
387       buf.append("ROUTING, ");
388       buf.append(rt.toString());
389       for (int i=0; i < fc.length; i++)
390         buf.append(fc[i]);
391       break;
392 
393     default:
394       buf.append("Illegal routing type: " + routingTag);
395     }
396     return buf.toString();
397   }
398 
399 } // END MsgRouting