View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  import java.net.DatagramPacket;
23  
24  import jgroup.core.ConfigurationException;
25  import jgroup.core.EndPoint;
26  import jgroup.core.JgroupException;
27  import jgroup.relacs.config.DistributedSystemConfig;
28  import jgroup.relacs.config.TransportConfig;
29  import jgroup.relacs.daemon.DaemonMsg;
30  import jgroup.relacs.events.Event;
31  import jgroup.util.InMessage;
32  
33  import org.apache.log4j.Logger;
34  
35  
36  /**
37   *  The <code>Mss</code> class implements a Multi-Send Service.
38   *
39   *  @author Salvatore Cammarata
40   *  @author Alberto Montresor
41   *  @author Hein Meling
42   *  @since Jgroup 1.2
43   */
44  public final class Mss
45    implements EhandlerUser, MssConstants, MssTag
46  {
47  
48    ////////////////////////////////////////////////////////////////////////////////////////////
49    // Logger
50    ////////////////////////////////////////////////////////////////////////////////////////////
51  
52    /** Obtain logger for this class */
53    private static final Logger log = Logger.getLogger(Mss.class);
54  
55  
56    ////////////////////////////////////////////////////////////////////////////////////////////
57    // Unique message instance
58    ////////////////////////////////////////////////////////////////////////////////////////////
59  
60    /**
61     *  As this message is never stored in a queue, a single message
62     *  instance is sufficient.  This allows us to save the cost of
63     *  allocating and garbage collecting messages.  In fact, this message
64     *  does not change at all.
65     */
66    private static byte[] sentFrag;
67  
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Private Fields
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** The host on which the daemon is running. */
74    private MssHost localHost;
75  
76    /** Upper layer */
77    private MssUser mssuser;
78    
79    /** Distributed System configuration */
80    private MssDS mssds;
81  
82    /** Event scheduler */
83    private EventHandler ehandler;
84  
85    /** Message Flow Control Layer */
86    private MsgCntrl msgCntrl;
87  
88    // Timeouts
89    
90    /** Routing timeout; how often the routing table are sent */
91    private long routingTimeout;
92  
93    /** Congestion timeout */
94    private long congestionTimeout;
95  
96    /** Last routing event scheduled; used to avoid having multiple timeouts */ 
97    private ScheduledEvent lastRoutingScheduled;
98   
99    // Fragmentation
100 
101   /** Payload size */
102   private int payload;
103 
104 
105   ////////////////////////////////////////////////////////////////////////////////////////////
106   // Constructors
107   ////////////////////////////////////////////////////////////////////////////////////////////
108 
109   /**
110    *  Constructs a new <code>Mss</code> object.
111    *
112    *  @param mssuser
113    *    The upper level layer, using this layer.
114    *  @param dsc
115    *    The distributed system configuration.
116    *  @param tconf
117    *    The transport parameter configuration.
118    *
119    *  @exception ConfigurationException
120    *    Thrown if the distributed system is incorrectly configured.
121    *  @exception IOException
122    *    Thrown if there are problems at socket layer.
123    */
124   public Mss(MssUser mssuser, DistributedSystemConfig dsc, TransportConfig tconf)
125     throws ConfigurationException, IOException
126   {
127     this.mssuser = mssuser;
128 
129     /* Timeout configuration */
130     routingTimeout = tconf.getRoutingTimeout();
131     congestionTimeout = tconf.getCongestionTimeout();
132 
133     /* Message configuration parameters */
134     payload = tconf.getPayload();
135 
136     /* Configure other <code>Mss</code> layers */
137     ehandler = new EventHandler(this);
138     mssds = new MssDS(ehandler, dsc, tconf);
139     localHost = HostTable.getLocalHost();
140 
141     /*
142      * Initialize the single instance SENT message.  Needs to be done
143      * after local host initialization, since FragmentHeader marshal
144      * needs to obtain the incarnation identifier from the local
145      * MssHost object.
146      */
147     sentFrag = FragmentHeader.marshal(SENT, false, UNDEF);
148 
149     /* Flow Control Layer initialization */
150     msgCntrl = new MsgCntrl(ehandler, mssuser, mssds);
151     mssds.setControl(msgCntrl);
152 
153     /*
154      * Compute the next routing timeout; the timeout are choosen in a
155      * probabilistic way in order to have only a routing table sent out
156      * in the cluster.
157      */
158     RandomGenerator.setRoutingTimeout(routingTimeout);
159 
160     /*
161      * Schedule the topology timeouts
162      */
163     iamAliveTimeout();
164     routingTimeout();
165     // congestion timeout
166     congTimeout();
167     // now we are visible on the net: start the network layer
168     mssds.doStart();
169   }
170 
171 
172   ////////////////////////////////////////////////////////////////////////////////////////////
173   // Multicast send methods
174   ////////////////////////////////////////////////////////////////////////////////////////////
175 
176   /**
177    *  Sends message the <code>msg</code> to a single host specified by
178    *  <code>receiver</code>.  This method is optimized for sending
179    *  directly to the given <code>receiver</code>, bypassing the cluster
180    *  leader.
181    */
182   public void send(byte tag, DaemonMsg msg, EndPoint receiver)
183   {
184     if (receiver.isLocal()) {
185       throw new IllegalArgumentException("Local message sending not allowed");
186     }
187 
188     /* Add the host to the receiver set for the cluster */
189     MssHost host = mssds.hostLookup(receiver);
190     Cluster cluster = host.getCluster();
191     cluster.addReceiver(receiver);
192 
193     /*
194      * Marshal the mss level data into the message stream.
195      */
196     MsgJG msgjg = null;
197     try {
198       msgjg = MsgJG.marshal(msg, tag, new EndPoint[] { receiver }, mssds);
199     } catch (IOException e) {
200       log.error("Cannot marshal the mss level data into the message stream", e);
201       return;
202     }
203     
204     /*
205      * If there exists at least one destination host belonging to the
206      * cluster that is reachable we attempt to send to the cluster.
207      */
208     if (cluster.isReachable()) {
209       /*
210        * Send the message to the cluster of the receiving host
211        */
212       cluster.send(msgjg);
213 
214     } else if (log.isDebugEnabled()) {
215       log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
216     }
217     cluster.clearReceivers();
218   }
219 
220 
221   /**
222    *  M-sends message <code>msg</code> to all the hosts in the
223    *  hosttable.  Only a single message is sent to each cluster leader,
224    *  and from there sent to the corresponding members in that cluster.
225    */
226   public void msend(byte tag, DaemonMsg msg)
227   {
228     /*
229      * Marshal the mss level data into the message stream.
230      */
231     MsgJG msgjg = null;
232     try {
233       msgjg = MsgJG.marshal(msg, tag, mssds);
234     } catch (IOException e) {
235       log.error("Cannot marshal the mss level data into the message stream", e);
236       return;
237     }
238 
239     /*
240      * Send the message to all reachable clusters.
241      */
242     ClusterTable clustertable = mssds.getClusterTable();
243     for (int i = 0, size = clustertable.size(); i < size; i++) {
244       Cluster cluster = clustertable.get(i);
245       if (cluster.isReachable()) {
246 
247         cluster.send(msgjg);
248 
249       } else if (log.isDebugEnabled()) {
250         log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
251       }
252     }
253   }
254 
255 
256   /**
257    *  M-sends message <code>msg</code> to the hosts in the destination
258    *  array <code>receivers</code>.
259    */
260   public void msend(byte tag, DaemonMsg msg, EndPoint[] receivers) 
261   {
262     /* Add each receiver to the destination set of its cluster */
263     for (int i = 0; i < receivers.length; i++) {
264       if (!receivers[i].isLocal()) {
265         MssHost host = mssds.hostLookup(receivers[i]);
266         host.getCluster().addReceiver(receivers[i]);
267       } else if (log.isDebugEnabled()) {
268         log.debug("Local host has been removed from the destination list");
269       }
270     }
271 
272     /*
273      * Marshal the mss level data into the message stream.
274      */
275     MsgJG msgjg = null;
276     try {
277       msgjg = MsgJG.marshal(msg, tag, receivers, mssds);
278     } catch (IOException e) {
279       log.error("Cannot marshal the mss level data into the message stream", e);
280       return;
281     }
282 
283     /*
284      * Send the message to all reachable clusters with receivers.
285      */
286     ClusterTable clustertable = mssds.getClusterTable();
287     for (int i = 0, size = clustertable.size(); i < size; i++) {
288       Cluster cluster = clustertable.get(i);
289       /*
290        * If there exists at least one destination host belonging to the
291        * cluster that is reachable we attempt to send to the cluster.
292        */
293       if (cluster.hasReceivers()) {
294         if (cluster.isReachable()) {
295 
296           cluster.send(msgjg);
297 
298         } else if (log.isDebugEnabled()) {
299           log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
300         }
301       }
302       cluster.clearReceivers();
303     }
304   }
305 
306   /**
307    * Returns true if the <code>Ehandler</code> local message queue
308    * contains undelivered messages; otherwise, false is returned.
309    */
310 //  public boolean hasUndeliveredLocalMsgs(int gid)
311 //  {
312 //    return ehandler.hasUndeliveredLocalMsgs(gid);
313 //  }
314 
315   /**
316    * Returns a string containing the current content of the queues.
317    */
318   public String toString()
319   {
320     return ehandler.toString();
321   }
322   
323   ////////////////////////////////////////////////////////////////////////////////////////////
324   // Implementation of abstract methods inherithed from NIListener
325   ////////////////////////////////////////////////////////////////////////////////////////////
326 
327   /**
328    *  Handles a message received from a remote host
329    */ 
330   public void rreceive(DatagramPacket packet)
331   {
332     byte[] buf = packet.getData();
333     int bufLen = packet.getLength();
334 
335     log.assertLog(buf.length <= payload + OVERHEAD_SIZE,
336       "Buffer length (" + buf.length + ") should not exceed the payload plus overhead ("
337       + (payload + OVERHEAD_SIZE) + ")");
338 
339     try {
340       FragmentHeader header = FragmentHeader.unmarshal(mssds, buf, bufLen);
341       if (log.isDebugEnabled())
342         log.debug("packet header=" + header);
343 
344       /*
345        *  This cluster is the destination for the message fragment.
346        */
347 
348       /* Check if the sender has the same incarnation */
349       if (mssds.hasNewIncarnation(header))
350         generateSuspect();
351 
352       switch(header.getTag()) {
353 
354       case SENT:
355         {
356           if (header.isLocal()) {
357             if (log.isDebugEnabled())
358               log.debug("Discarding locally sent SENT message");
359           } else {
360             log.assertLog(lastRoutingScheduled != null, "Last scheduled ROUTING event is null");
361 
362             if (log.isDebugEnabled())
363               log.debug("rreceived packet SENT header= " + header);
364             /* Delete the scheduled ROUTING timeout */
365             ehandler.abortTimeout(lastRoutingScheduled);
366 
367             /* Schedule a new timeout for the last ROUTING event */
368             long timeout = RandomGenerator.getRoutingTimeout();
369             lastRoutingScheduled = ehandler.setTimeout(timeout, new ScheduledEvent(ROUTING));
370           }
371         }
372         break;
373 
374       case IAMALIVE:
375         {
376           /*
377            * Note that we need to process local IAMALIVE messages.
378            *
379            * Note also that IAMALIVE messages are not fragmented since
380            * the message identifier for these messages are set to UNDEF,
381            * and this conflicts with the need to give each fragment a
382            * unique number.  This can potentially be a problem for large
383            * distributed systems, using a very low payload value.
384            *
385            * FIXME: We should add a check; if the payload is too low to
386            * keep an entire IAMALIVE message.
387            */
388           InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
389           inmsg.insert(buf, bufLen);
390           handleIamAliveMsg(MsgIamAlive.unmarshal(inmsg, header));
391         }
392         break;
393 
394       case ROUTING:
395         {
396           log.assertLog(!header.isLocal(),
397             "Received ROUTING msg sent by me; shouldn't happen.");
398 
399           /*
400            * Forward the message to my cluster directly, modifying only
401            * the fragment tag, indicating that it is a forwarded routing
402            * message.  This does not change the sender field of the
403            * message, so that the receiver will know the actual
404            * originator of the message.
405            */
406           header.setTag(FWDROUTING);
407           localHost.getCluster().forward(header.getFragment(), header.getFragmentLength());
408         }
409         break;
410 
411       case FWDROUTING:
412         {
413           /*
414            * Note that FWDROUTING messages are forwarded without
415            * modifying the sender field of the message fragment.
416            */
417           log.assertLog(!header.isLocal(),
418             "Received FWDROUTING msg sent by me; should this happen?");
419 
420           /*
421            * Note that (FWD)ROUTING messages are not fragmented since
422            * the message identifier for these messages are set to UNDEF,
423            * and this conflicts with the need to give each fragment a
424            * unique number.  This can potentially be a problem for large
425            * distributed systems, using a very low payload value.
426            *
427            * FIXME: We should add a check; if the payload is too low to
428            * keep an entire ROUTING message.
429            */
430           InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
431           inmsg.insert(buf, bufLen);
432           MsgRouting msg = MsgRouting.unmarshal(inmsg, header);
433           handleRoutingMsg(msg);
434         }
435         break;
436 
437       case NACK:
438         {
439           if (header.isLocal()) {
440             if (log.isDebugEnabled())
441               log.debug("Discarding locally sent NACK message");
442           } else {
443             /*
444              * NACK messages are expected to fit within a single message
445              * fragment; is it does not require fragmentation support.
446              */
447             InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
448             inmsg.insert(buf, bufLen);
449             MsgNACK nack = MsgNACK.unmarshal(inmsg, header, mssds);
450             handleNACKMsg(nack);
451           }
452         }
453         break;
454 
455       case SYN:
456         {
457           /*
458            * SYN messages are expected to fit within a single message
459            * fragment; it does not require fragmentation support.
460            */
461           InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
462           inmsg.insert(buf, bufLen);
463           MsgSYN syn = MsgSYN.unmarshal(inmsg, header);
464           handleSYNMsg(syn);
465         }
466         break;
467 
468       case NOTLASTFRAGMENT:
469       case JG:
470         {
471           /* Avoid delivering messages sent locally */
472           if (header.isLocal()) {
473             if (log.isDebugEnabled()) {
474               log.debug("Discarding local upper layer message (" + header + ")");
475             }
476             return;
477           }
478 
479           // FIXME Add support to guarantee order of messages at low level
480           if (!handleJGInfo(header)) {
481             /*
482              * If we fail to deliver the msg, reschedule it.
483              * 
484              * Note that most messages received here will be discarded
485              * since they are received in a different view; however,
486              * we may get important synchronization messages before
487              * we are able to handle them, thus we reschedule all
488              * of these messages.
489              */
490             ehandler.rnotify(packet);
491           }
492         }
493         break;
494 
495       default:
496         log.warn("Invalid packet tag: " + header.getTag());
497       }
498     } catch (IOException e) {
499       /* Malformed message, just return without trying to deliver it. */
500       log.error("rreceive: malformed packet", e);
501     } catch (ClassCastException e) {
502       log.error("rreceive: malformed packet", e);
503     } catch (ClassNotFoundException e) {
504       log.error("rreceive: malformed packet", e);
505     } catch (JgroupException e) {
506       log.error("rreceive: packet from unknown host; all hosts must be in the config.xml", e);
507     }
508   }
509 
510 
511   /**
512    *  Handles a message received by a local member
513    */
514   public void lreceive(Event event)
515   {
516     mssuser.localReceive(event);
517   }
518 
519 
520   /**
521    *  Handle expired timeout event
522    */
523   public void treceive(ScheduledEvent event)
524   {
525     switch (event.getTag()) {
526 
527       case IAMALIVE:
528         iamAliveTimeout();
529         break;
530 
531       case ROUTING:
532         routingTimeout();
533         break;
534 
535       case REMOTENACK:
536         nackTimeout((MsgNACK) event.getData());
537         break;  
538 
539       case CONGESTION:
540         congTimeout();
541         break;
542 
543       default:
544         log.warn("Unknown timeout event received");
545     }
546   }
547 
548 
549   public void lsend(Event msg)
550   {
551     ehandler.lnotify(msg);
552   }
553 
554 
555   /**
556    *  Returns the hosttable.
557    */
558   public HostTable getHostTable()
559   {
560     return mssds.getHostTable();
561   }
562 
563 
564   ////////////////////////////////////////////////////////////////////////////////////////////
565   // Message handlers
566   ////////////////////////////////////////////////////////////////////////////////////////////
567 
568   /**
569    *  Handle a synchronization message
570    */
571   private void handleSYNMsg(MsgSYN msg) 
572   {
573     if (log.isDebugEnabled())
574       log.debug("handleSYNMsg: " + msg);
575 
576     /* The source member cannot be unreachable here */
577 
578     MssHost sender = msg.getSender();
579     switch (msg.getSYNType()) {
580 
581       case QSYN:
582         /*
583          * (SYN_REQUEST) The msg sender is asking for a SYN msg from me.
584          * We send an ASYN message as reply.
585          */
586         try {
587           /*
588            * PATCH added by Hein 22/7-03.  The message flow was not
589            * initialized correctly after two clusters had been partitioned
590            * and were merging, that is they didn't synchronize.  Therefore,
591            * we check if the sender side message flow is already initialized;
592            * if not, we reset the message flow for the sending side.
593            */
594           MsgFlowSndrSide msgFlow = msg.getMsgFlow();
595           if (!msgFlow.isInitialized()) {
596             Cluster cluster = msg.getSender().getCluster();
597             cluster.resetMsgFlow();
598             if (log.isDebugEnabled())
599               log.debug("cluster.resetMsgFlow()");
600           }
601           MsgSYN syn = MsgSYN.marshal(ASYN, msg, msgFlow.getLastMsgSent());
602           sender.unicastRouteSend(syn);
603           if (log.isDebugEnabled())
604             log.debug("Sent " + syn);
605         } catch (IOException e) {
606           log.error("Unable to marshal ASYN message; nothing sent", e);
607         }
608         break;
609 
610       case ASYN:
611         /*
612          * (SYN_REPLY) The msg sender answer to our SYN request.  We
613          * check that the sender has the right nonce.
614          */
615         if (sender.checkSynId(msg.getMid())) {
616 
617           /* Abort timeout for synchronization since we received a reply. */
618           sender.abortTimeout();
619 
620           /* Reset message flow control (receiver side) */
621           sender.resetMsgFlow(msg.getLastMsgSent());
622 
623           /* PATCH: added by Hein 28/7-02. See email.
624            *
625            * Reset message flow control for the cluster (sender side) if
626            * it was previously unreachable; that is the message flow was
627            * previously reset.
628            */
629           Cluster cluster = sender.getCluster();
630           if (!cluster.getMsgFlow().isInitialized()) {
631             cluster.resetMsgFlow(msg.getLastMsgSent());
632           }
633 
634           /*
635            * Notify the upper layer that the sender of the ASYN message is
636            * now reachable, and we have synchronized our sequence numbers.
637            */
638           mssds.getUpperView().setAsReachable(sender);
639           if (log.isDebugEnabled())
640             log.debug("Synchronized to " + sender + " nonce (" + msg.getMid()
641                       + ") - Msg flow reset to lastMsgSent=" + msg.getLastMsgSent());
642 
643           /* The two members are now synchronized. */
644         }
645         break;
646 
647       default:
648         log.warn("Invalid synchronization message received");
649     }
650     if (log.isDebugEnabled())
651       log.debug("handleSYNMsg end");
652   }
653 
654 
655   /**
656    *  Handle NACK message
657    */
658   private void handleNACKMsg(MsgNACK nack) 
659   {
660     if (log.isDebugEnabled())
661       log.debug(nack);
662 
663     /* The source member cannot be unreachable here */
664 
665     if (NACKSUPPRESSION) {
666       /*
667        * NACK suppression sends only a single NACK for each cluster.
668        */
669 
670       MssHost src = nack.getSource();
671       if (src.isLocal()) {
672         /*
673          * I'm the source of the missing message; resend it and update
674          * the congestion information (NACK count).
675          */
676         resendMsg(nack);
677         nack.getMsgFlow().incNackCount();
678 
679       } else if (!nack.isRemoteNACK()) {
680         /*
681          * Another member of the cluster has sent a NACK message; check
682          * if also this member is waiting (have scheduled) a NACK event
683          * for this particular message.  In which case we abort the
684          * current NACK event and reschedule it at a later point in
685          * time, expecting to receive the missing message soon, due to
686          * the REMOTENACK sent by the NACK leader of this cluster.
687          */
688         ScheduledEvent eventNack = src.getMsgFlow().getScheduledEvent(nack.mid);
689         if (eventNack == null) {
690           log.warn("No scheduled NACK event for " + nack.mid);
691         } else {
692           /* Delete the scheduled NACK timeout */
693           ehandler.abortTimeout(eventNack);
694           /*
695            * Reschedule a new timeout for the NACK (to allow additional
696            * time for receiving the resent message)
697            */
698           long randomTimeout = RandomGenerator.getRandomTimeout(src.getTimeout());
699           ehandler.setTimeout(randomTimeout, eventNack);
700         }
701       }
702 
703     } else if (nack.isRemoteNACK()) {
704       /*
705        * Without NACK suppression the source of the missing message will
706        * receive <i>cluster.size</i> messages for a single missing
707        * message.  Even more NACKs will be sent if the message is
708        * missing also in other clusters.
709        */
710       resendMsg(nack);
711 
712     }
713     if (log.isDebugEnabled())
714       log.debug("handleNACKMsg end");
715   }
716 
717 
718   /**
719    *  Recover the message corresponding to the given NACK message, and
720    *  resend it to the destination cluster.
721    */
722   private void resendMsg(MsgNACK nack)
723   {
724     if (log.isDebugEnabled())
725       log.debug(nack);
726     Cluster destCluster = nack.getCluster();
727     MsgFlowSndrSide msgFlow = destCluster.getMsgFlow();
728 
729     FragmentIterator fragIter = msgFlow.getSentMsgFrag(nack);
730     if (fragIter != null) {
731       if (log.isDebugEnabled()) {
732         log.debug("fragIter=" + fragIter);
733       }
734       destCluster.resend(fragIter, nack.mid);
735       destCluster.clearReceivers();
736 
737       if (log.isDebugEnabled()) {
738         log.debug("Resent msg fragment (" + nack.mid + ") to cluster " + destCluster.getEndPoint());
739       }
740     } else {
741       log.warn("Cannot resend msg fragment (" + nack.mid + "); already removed from cluster queue "
742                + destCluster.getEndPoint());
743     }
744   }
745 
746 
747   /**
748    *  Handle routing message (forwarded from a cluster member).
749    */
750   private void handleRoutingMsg(MsgRouting msg)
751   {
752     if (log.isDebugEnabled())
753       log.debug("handleRoutingMsg start");
754 
755     /*
756      * Check the incarnations associated with the FWDROUTING message.
757      */
758     if (mssds.checkIncarnation(msg))
759       generateSuspect();
760 
761     /* Update routing table */
762     mssds.updateRoutingTable(msg);
763 
764     /* Update Flow Control information */
765     msg.updateFC();
766     generateSuspect();
767 
768     if (log.isDebugEnabled())
769       log.debug("handleRoutingMsg end");
770   }
771 
772 
773   /**
774    * Handle I am alive message.
775    */
776   private void handleIamAliveMsg(MsgIamAlive msg)
777   {
778     if (log.isDebugEnabled())
779       log.debug("handleIamAliveMsg start");
780 
781     /* Update reachability info if message sender is not local */
782     if (!msg.isLocal())
783       mssds.updateReachability(msg);
784     /* Update Flow Control information */
785     msg.updateFC(mssds);
786     generateSuspect();
787 
788     if (log.isDebugEnabled())
789       log.debug("handleIamAliveMsg end");
790   }
791 
792 
793   /**
794    * Send synchronization requests to all newly reachable hosts,
795    * and reset the synchronization status for the new unreachable
796    * hosts.  Finally, notify the upper layer if the view changed.
797    */
798   private void generateSuspect() 
799   {
800 
801     /* Obtain the view */  
802     DSView view = mssds.getView();
803 
804     if (log.isDebugEnabled())
805       log.debug("generateSuspect(): view = "+ view);
806 
807     /* Send SYN request to all newly reachable hosts. */
808     MssHost[] newReachable = view.getNewReachableHosts();
809     for (int i=0; i < newReachable.length; i++) {
810       if (log.isDebugEnabled())
811         log.debug("newReachableHost: " + newReachable[i]);
812       newReachable[i].scheduleTimeout();
813     }
814 
815     /* Reset the SYN status for the new unreachable hosts. */
816     MssHost[] newUnreachable = view.getNewUnreachableHosts();
817     for (int i=0; i < newUnreachable.length; i++) {
818       if (log.isDebugEnabled())
819         log.debug("newUnreachableHost: " + newUnreachable[i]);
820       newUnreachable[i].abortTimeout();
821     }
822 
823     /* If the reachability set has changed, notify the upper layer. */
824     DSView upperview = mssds.getUpperView();
825     if (upperview.hasChanged()) {
826       if (log.isDebugEnabled()) {
827         log.debug("generateSuspect(): Upper view changed to: " + upperview);
828       }
829 
830       mssuser.remoteSuspect(upperview.getReachableEndPoints(), 
831         upperview.getNewReachableEndPoints(), 
832         upperview.getNewUnreachableEndPoints(), 
833         upperview.getNewIncarnationEndPoints());
834       upperview.clear();
835     }
836 
837   }
838 
839 
840   ////////////////////////////////////////////////////////////////////////////////////////////
841   // Timeout handling
842   ////////////////////////////////////////////////////////////////////////////////////////////
843 
844   /**
845    *  Update the topology information (due to this timeout), and send a
846    *  IAMALIVE message to the local cluster.
847    */
848   private void iamAliveTimeout()
849   {
850     // Update topology info
851     mssds.update();
852 
853     try {
854       MsgIamAlive msg = MsgIamAlive.marshal(mssds.getAllFCEntry());
855       localHost.getCluster().send(msg);
856     } catch (IOException e) {
857       log.error("Cannot marshal IAMALIVE flow control data into the message stream", e);
858       return;
859     }
860 
861     // Set a new timeout
862     ehandler.setTimeout(routingTimeout, new ScheduledEvent(IAMALIVE));
863     if (log.isDebugEnabled())
864       log.debug("Sent IAMALIVE message; set timeout " + routingTimeout);
865 
866     // Generate suspect
867     generateSuspect();
868   }
869 
870 
871   /**
872    *  Send ROUTING topology information (flow control information) to
873    *  all clusters, except the local cluster.  Also, notify the local
874    *  cluster that we sent this information, using the SENT topology
875    *  message.
876    */
877   private void routingTimeout()
878   {
879     log.assertLog(localHost.isReachable(),
880                   "Localhost unreachable - alive value=" + localHost.getReachability() +
881                   ", live cluster members=" + localHost.getCluster().getReachableCounter());
882 
883     /* Create a routing message */
884     MsgRouting msg = null;
885     try {
886       msg = MsgRouting.marshal(mssds);
887     } catch (IOException e) {
888       log.error("Cannot marshal the ROUTING data into the message stream", e);
889       return;
890     }
891 
892     /*
893      * Send the routing and flow control information to all clusters,
894      * except the local cluster.
895      */
896     ClusterTable clustertable = mssds.getClusterTable();
897     for (int i = 0, size = clustertable.size(); i < size; i++) {
898       Cluster cluster = clustertable.get(i);
899       /* Avoid sending to the local cluster */
900       if (cluster.isLocal())
901         continue;
902 
903       EndPoint clusterEP = cluster.getEndPoint();
904       /*
905        * To avoid creating a message Object for each cluster, the
906        * split horizon technique is applied and disapplied (see
907        * below) on the same Object.
908        */
909       msg.splitHorizonOn(clusterEP);
910 
911       /*
912        * Update the flow control information for the topology
913        * message; reusing the same <code>MsgRouting</code> object
914        * for each cluster.
915        */
916       msg.setFCData(clusterEP);
917 
918       cluster.send(msg);
919       if (log.isDebugEnabled())
920         log.debug("Sent FLOW CONTROL and ROUTING info to cluster: " + clusterEP);
921       msg.splitHorizonOff();
922     }
923 
924     // Schedule a new timeout
925     long timeout = RandomGenerator.getRoutingTimeout();
926     lastRoutingScheduled = ehandler.setTimeout(timeout, new ScheduledEvent(ROUTING));
927 
928     /*
929      * Notify the local cluster that a routing message was sent.
930      */
931     localHost.getCluster().forward(sentFrag, sentFrag.length);
932   }
933 
934 
935   /**
936    *  Method to update the congestion information for all clusters in
937    *  the distributed system.
938    */
939   private void congTimeout()
940   {
941     ClusterTable clustertable = mssds.getClusterTable();
942     for (int i=0; i < clustertable.size(); i++) {
943       MsgFlowSndrSide msgFlow = clustertable.get(i).getMsgFlow();
944       if (msgFlow.isCongested())
945         msgFlow.decCongWin();
946       msgFlow.clearNackCount();
947     }
948     ehandler.setTimeout(congestionTimeout, new ScheduledEvent(CONGESTION));
949   }
950 
951 
952   private void nackTimeout(MsgNACK nack) 
953   {
954     MssHost src = nack.getSource();
955     MsgFlowRcvrSide msgFlow = src.getMsgFlow();
956 
957     if (NACKSUPPRESSION) {
958       if (msgFlow.isDelivered(nack.mid)) {
959         log.warn("NACK timeout for already delivered msg (" + nack.mid + ")");
960         msgFlow.removeScheduledEvent(nack.mid);
961       } else {
962         if (src.getCluster().isLocal()) {
963           if (log.isDebugEnabled())
964             log.debug("Sending NACK for msg (" + nack.mid + ") to cluster " + localHost.getCluster());
965           localHost.getCluster().send(nack);
966         } else {
967           if (log.isDebugEnabled())
968             log.debug("Sending NACK for msg (" + nack.mid + ") to " + src);
969           src.unicastRouteSend(nack);
970           try {
971             MsgNACK sentNack = MsgNACK.marshal(SENTNACK, src, nack.mid);
972             if (log.isDebugEnabled())
973               log.debug("Sending SENTNACK to cluster " + localHost.getCluster());
974             localHost.getCluster().send(sentNack);
975           } catch (IOException e) {
976             log.error("Unable to marshal SENTNACK for local cluster: " + localHost.getCluster(), e);
977             return;
978           }
979         }
980         ScheduledEvent eventNack = msgFlow.getScheduledEvent(nack.mid);
981         if (eventNack == null) {
982           log.warn("No scheduled NACK event for " + nack.mid);
983         } else {
984           /* Delete the scheduled NACK timeout */
985           ehandler.abortTimeout(eventNack);
986           /*
987            * Reschedule a new timeout for the NACK (to allow additional
988            * time for receiving the resent message)
989            */
990           long randomTimeout = RandomGenerator.getRandomTimeout(src.getTimeout());
991           ehandler.setTimeout(randomTimeout, eventNack);
992         }
993       }
994     } else if (nack.mid <= msgFlow.getLastMsgDlvr()) {
995       log.warn("NACK timeout for already delivered msg (" + nack.mid + ")");
996       msgFlow.removeScheduledEvent(nack.mid);
997     } else {
998       if (log.isDebugEnabled())
999         log.debug("Sending NACK for msg (" + nack.mid + ") to " + src);
1000       src.unicastRouteSend(nack);
1001       // remove the old nack from the list
1002       msgFlow.removeScheduledEvent(nack.mid);
1003       // set again the nack timeout
1004       ScheduledEvent eventNack = new ScheduledEvent(nack.getNACKType(), nack);
1005       ehandler.setTimeout(src.getTimeout(), eventNack);
1006       msgFlow.putScheduledEvent(nack.mid, eventNack);
1007     }
1008   }
1009 
1010 
1011   ////////////////////////////////////////////////////////////////////////////////////////////
1012   // Message receiving section
1013   ////////////////////////////////////////////////////////////////////////////////////////////
1014 
1015   /**
1016    *  Handle the receiption of a JG message fragment.  This means to
1017    *  forward it to the local cluster, if the fragment has its broadcast
1018    *  flag set to true.  Otherwise, the message fragment will be passed
1019    *  to the <code>MsgCntrl</code> for delivery or queuing.
1020    *
1021    *  @see jgroup.relacs.mss.MsgCntrl#msgReceive
1022    *  @param fragment
1023    *    The message fragment to add to deliver or add to the message
1024    *    queue.
1025    *  @return
1026    *    False if the message could not be deliverd due to uninitialized
1027    *    datastructures; the message flow.
1028    */
1029   private boolean handleJGInfo(FragmentHeader fragment)
1030   {
1031     if (fragment.isBroadcast()) {
1032       /*
1033        * The fragment has to be broadcast within the local cluster.
1034        * This means that I'm cluster leader for this message, and I must
1035        * forward the message to all members of my cluster.
1036        */
1037       Cluster localCluster = ClusterTable.getLocalCluster();
1038       if (log.isDebugEnabled())
1039         log.debug("Broadcasting msg fragment (" + fragment + ") within local cluster "
1040                   + localCluster.getEndPoint());
1041       fragment.broadcastOff();
1042       localCluster.forward(fragment.getFragment(), fragment.getFragmentLength());
1043       /*
1044        * Note that we don't deliver the message at this point, since we
1045        * will receive it from the above send in the local cluster; note
1046        * that before invoking the forward() on the local cluster, we turn
1047        * off broadcasting.  This causes the next delivery to skip to the
1048        * else part of this if-statement when received again.
1049        */
1050     } else {
1051       if (log.isDebugEnabled())
1052         log.debug("Received msg (" + fragment.fragId + ") from " + fragment.getSender());
1053       return msgCntrl.msgReceive(fragment);
1054     }
1055     return true;
1056   }
1057 
1058 } // END Mss