View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gm;
20  
21  import java.io.Externalizable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.ObjectInput;
25  import java.io.ObjectInputStream;
26  import java.io.ObjectOutput;
27  import java.io.ObjectOutputStream;
28  import java.io.OutputStream;
29  import java.rmi.RemoteException;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.SortedMap;
37  import java.util.TreeMap;
38  
39  import jgroup.core.JgroupException;
40  import jgroup.core.MemberId;
41  import jgroup.core.MembershipListener;
42  import jgroup.core.MembershipService;
43  import jgroup.core.View;
44  import jgroup.core.multicast.AckListener;
45  import jgroup.core.multicast.ChainIdentifier;
46  import jgroup.core.multicast.MulticastListener;
47  import jgroup.core.multicast.MulticastService;
48  import jgroup.relacs.types.EndPointImpl;
49  import jgroup.relacs.types.MemberIdImpl;
50  import jgroup.util.InMessage;
51  import jgroup.util.OutMessage;
52  
53  import org.apache.log4j.Logger;
54  
55  /**
56   *  The <code>TotalOrderLayer</code> class implements the multicast service,
57   *  and uses the underlying reliable multicast service.
58   *
59   *  Total ordering is accomplished using distributed agreement protocol similar
60   *  to that of [Birman and Joseph 1987].
61   *
62   *  @author Hein Meling
63   *  @since Jgroup 2.2
64   */
65  public final class TotalOrderLayer
66    implements MulticastService, MulticastListener, MembershipListener
67  {
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Logger
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** Obtain logger for this class */
74    private static final Logger log = Logger.getLogger(TotalOrderLayer.class);
75  
76  
77    ////////////////////////////////////////////////////////////////////////////////////////////
78    // Constants
79    ////////////////////////////////////////////////////////////////////////////////////////////
80  
81    /** Protocol name used to distinguish messages from other protocols. */
82    private static final String PROTOCOL_NAME = "Total";
83  
84    /** Message identifiers for the distributed agreement protocol */
85    private static final int REQUEST_SEQ_NO  = 1;
86    private static final int AGREED_SEQ_NO   = 2;
87  
88    /** Indicator used to indicate that the stream contains a single object */
89    private static final boolean OBJECT_TYPE = true;
90  
91    /** Indicator used to indicate that the stream contains a stream */
92    private static final boolean STREAM_TYPE = false;
93  
94  
95    ////////////////////////////////////////////////////////////////////////////////////////////
96    // Fields
97    ////////////////////////////////////////////////////////////////////////////////////////////
98  
99    /** Dynamic map of multicast listeners; protocol name as key. */
100   private Map<String,MulticastListener> multicastListeners = new HashMap<String,MulticastListener>();
101 
102   /** The multicast service */
103   private MulticastService mcast;
104 
105   /** This member's identifier */
106   private MemberId me;
107 
108   /** The largest agreed sequence number */
109   private SequenceNo largestAgreedSeqNo;
110 
111   /** The largest proposed sequence number */
112   private SequenceNo largestProposedSeqNo;
113 
114   /** Map of proposals: MessageID -> SequenceNo (Proposal) */
115   private Map<MessageID,SequenceNo> proposalsMap =
116     Collections.synchronizedMap(new HashMap<MessageID,SequenceNo>());
117 
118   /** The hold-back queue of messages (mapping: SequenceNo -> InputStream) */
119   private SortedMap<SequenceNo,InputStream> holdBackQueue =
120     Collections.synchronizedSortedMap(new TreeMap<SequenceNo,InputStream>());
121 
122   /** The delivery queue of messages to be delivered */
123 //  private List deliveryQueue = Collections.synchronizedList(new LinkedList());
124 
125 
126   ////////////////////////////////////////////////////////////////////////////////////////////
127   // Constructor
128   ////////////////////////////////////////////////////////////////////////////////////////////
129 
130   /**
131    *  Initialize a new <code>TotalOrderLayer</code> object.
132    *
133    *  @param pgms
134    *    The membership service.
135    *  @param mcast
136    *    The multicast service.
137    *  @exception JgroupException
138    *    Raised if the distributed system configuration was not available.
139    */
140   private TotalOrderLayer(MembershipService pgms, MulticastService mcast)
141     throws JgroupException
142   {
143     me = pgms.getMyIdentifier();
144     largestAgreedSeqNo = new SequenceNo(me);
145     largestProposedSeqNo = new SequenceNo(me);
146     this.mcast = mcast;
147   }
148 
149 
150   ////////////////////////////////////////////////////////////////////////////////////////////
151   // Static factory
152   ////////////////////////////////////////////////////////////////////////////////////////////
153 
154   public static TotalOrderLayer getLayer(MembershipService pgms, MulticastService mcast)
155     throws JgroupException
156   {
157     return new TotalOrderLayer(pgms, mcast);
158   }
159 
160 
161   ////////////////////////////////////////////////////////////////////////////////////////////
162   // Layer interface methods
163   ////////////////////////////////////////////////////////////////////////////////////////////
164 
165   /**
166    *  Add a listener object for this layer to provide upcalls to in
167    *  response to multicast events.
168    */
169   public void addListener(Object listener)
170   {
171     if (listener instanceof MulticastListener) {
172       MulticastListener mListener = (MulticastListener) listener;
173       String protocol = mListener.getProtocolName();
174       Object currentListener = multicastListeners.get(protocol);
175       if (currentListener == null)
176         multicastListeners.put(protocol, mListener);
177       else if (!currentListener.equals(listener))
178         throw new IllegalArgumentException("Multiple multicast listeners for the same protocol is illegal: " + protocol);
179     } else {
180       throw new IllegalArgumentException("Specified listener does not implement a MulticastListener");
181     }
182   }
183 
184 
185   ////////////////////////////////////////////////////////////////////////////////////////////
186   // Methods from MembershipListener
187   ////////////////////////////////////////////////////////////////////////////////////////////
188 //FIXME remove this interface.
189   public void viewChange(View view)
190   {
191 //    log.debug("View: " + view);
192   }
193 
194   public void prepareChange()
195   {
196     log.debug("prepareChange");
197   }
198 
199   public void hasLeft() { }
200 
201 
202   ////////////////////////////////////////////////////////////////////////////////////////////
203   // Methods from MulticastService
204   ////////////////////////////////////////////////////////////////////////////////////////////
205 
206   /* (non-Javadoc)
207    * @see jgroup.core.multicast.MulticastService#mcast(java.io.OutputStream, jgroup.core.multicast.AckListener, jgroup.core.multicast.ChainIdentifier)
208    */
209   public void mcast(OutputStream out, AckListener ackl, ChainIdentifier chId)
210     throws JgroupException
211   {
212     throw new UnsupportedOperationException();
213   }
214 
215 
216   /* (non-Javadoc)
217    * @see jgroup.core.multicast.MulticastService#mcast(java.io.OutputStream, jgroup.core.multicast.AckListener)
218    */
219   public void mcast(OutputStream stream, AckListener ackl)
220     throws JgroupException
221   {
222     ProposalCollector pc = new ProposalCollector();
223     mcast.mcast(stream, pc);
224     // Wait for all proposals and compute the agreed sequence number
225     SequenceNo agreed = pc.getGreatestProposal();
226 
227     try {
228       /* Send the agreed sequence number to all members */
229       OutMessage out = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
230       // write common part: message type
231       out.writeInt(AGREED_SEQ_NO);
232       // write the agreed sequence number
233       agreed.writeExternal(out);
234       out.flush();
235       ResultCollector rc = new ResultCollector(agreed, ackl);
236       acklMap.put(agreed, rc);
237       // send out the agreed message
238       mcast.mcast(out, rc);
239     } catch (IOException e) {
240       log.warn("Could not send out agreed sequence numbers.", e);
241     }
242   }
243 
244   /* (non-Javadoc)
245    * @see jgroup.core.multicast.MulticastService#mcast(java.lang.String, java.lang.Object, jgroup.core.multicast.AckListener)
246    */
247   public void mcast(String protocol, Object obj, AckListener ackl)
248     throws JgroupException, IOException
249   {
250     OutMessage bout = getMessage(protocol, OBJECT_TYPE);
251     ObjectOutputStream out = new ObjectOutputStream(bout);
252     out.writeObject(obj);
253     out.close();
254     mcast(bout, ackl);
255   }
256 
257   /* (non-Javadoc)
258    * @see jgroup.core.multicast.MulticastService#getMessage(java.lang.String)
259    */
260   public OutputStream getMessage(String protocol)
261     throws IOException
262   {
263     return getMessage(protocol, STREAM_TYPE);
264   }
265 
266   /**
267    * Obtain a message for the given protocol and message type.
268    *
269    * @see jgroup.core.multicast.MulticastService#getMessage(java.lang.String)
270    */
271   private OutMessage getMessage(String protocol, boolean msgType)
272     throws IOException
273   {
274     OutMessage msg = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
275     msg.writeInt(REQUEST_SEQ_NO);
276     /* Create a message id for this message and write it to the stream header */
277     MessageID msgId = new MessageID(me);
278     msgId.writeExternal(msg);
279     msg.writeUTF(protocol);
280     msg.writeBoolean(msgType);
281     return msg;
282   }
283 
284 
285   ////////////////////////////////////////////////////////////////////////////////////////////
286   // Methods from MulticastListener
287   ////////////////////////////////////////////////////////////////////////////////////////////
288 
289   /* (non-Javadoc)
290    * @see jgroup.core.multicast.MulticastListener#getProtocolName()
291    */
292   public String getProtocolName()
293   {
294     return PROTOCOL_NAME;
295   }
296 
297 
298   /* (non-Javadoc)
299    * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream)
300    */
301   public synchronized Object deliverStream(InputStream msg, MemberId sender, int seq) 
302   {
303     InMessage inmsg = (InMessage) msg;
304     try {
305       /* Read message type from input stream */
306       int messageType = inmsg.readInt();
307 
308       /* Multiplex on the various message types */
309       switch (messageType) {
310 
311         case REQUEST_SEQ_NO: 
312         {
313           /* Read the message identifier from the input stream */
314           MessageID msgId = new MessageID();
315           msgId.readExternal(inmsg);
316           /* Compute a new proposed sequence number */
317           largestProposedSeqNo = SequenceNo.max(largestAgreedSeqNo, largestProposedSeqNo);
318           largestProposedSeqNo.sequenceNo++;
319           largestProposedSeqNo.setMsgId(msgId);
320           proposalsMap.put(msgId, largestProposedSeqNo);
321           /*
322            * Note that the current position of the inmsg stream is just
323            * before the position of the protocol name, allowing the protocol
324            * name to be extracted when getting the message back from the
325            * holdBackQueue in the delivery phase.
326            */
327           holdBackQueue.put(largestProposedSeqNo, inmsg);
328 //          if (log.isDebugEnabled())
329 //            log.debug("Sending PROPOSAL: " + largestProposedSeqNo + " for msg " + msgId);
330           /* Respond with a proposal */
331           return largestProposedSeqNo;
332         } // case REQUEST_SEQ_NO
333 
334         case AGREED_SEQ_NO:
335         {
336           SequenceNo agreed = new SequenceNo();
337           agreed.readExternal(inmsg);
338 //          if (log.isDebugEnabled())
339 //            log.debug("Received AGREED: " + agreed + " for msg " + agreed.msgId + " from " + sender);
340           largestAgreedSeqNo = SequenceNo.max(largestAgreedSeqNo, agreed);
341 
342           // Get my proposal for the msgId currently being ordered
343           SequenceNo myProposal = (SequenceNo) proposalsMap.remove(agreed.msgId);
344           if (!agreed.equals(myProposal)) {
345             // Reorder the hold-back queue
346             log.debug("reordering hold-back queue: myProposal=" + myProposal
347               + " is different from agreed=" + agreed);
348             InputStream stream = holdBackQueue.remove(myProposal);
349             if (stream == null) {
350               log.warn("MESSAGE NOT FOUND IN HOLD-BACK QUEUE: " + myProposal);
351               break;
352             }
353             holdBackQueue.put(agreed, stream);
354           }
355           log.debug("holdBackQueue=" + holdBackQueue.keySet());
356           log.debug(" proposalsMap=" + proposalsMap);
357 
358           List<SequenceNo> resultValues = new ArrayList<SequenceNo>(holdBackQueue.size());
359           for (Iterator iter = holdBackQueue.entrySet().iterator(); iter.hasNext();) {
360             Map.Entry entry = (Map.Entry) iter.next();
361             SequenceNo seqno = (SequenceNo) entry.getKey();
362             // if this is a newly agreed seqno, tag the message as deliverable
363             if (seqno.equals(agreed))
364               seqno.msgId.markAsDeliverable();
365             resultValues.add(seqno);
366             if (seqno.isDeliverable()) {
367 //              if (log.isDebugEnabled())
368 //                log.debug("Delivering message: " + seqno.msgId + ", assigned seqno: " + seqno);
369               InMessage stream = (InMessage) entry.getValue();
370               iter.remove();
371               String protocol = stream.readUTF();
372               boolean msgType = stream.readBoolean();
373               MulticastListener listener =
374                 (MulticastListener) multicastListeners.get(protocol);
375               MemberId theSender = seqno.msgId.getSender();
376               if (msgType == STREAM_TYPE) {
377                 seqno.result = listener.deliverStream(stream, theSender, seqno.sequenceNo);
378               } else {
379                 seqno.result = listener.deliverObject(stream, theSender, seqno.sequenceNo);
380               }
381               if (log.isDebugEnabled())
382                 log.debug("Result of message: " + seqno.msgId + ", assigned seqno: " + seqno);
383             } else {
384               // Stop moving messages to the delivery queue
385               if (log.isDebugEnabled())
386                 log.debug("Stopping message delivery. " + seqno.msgId + " is undeliverable: " + seqno);
387               break;
388             }
389           }
390           log.debug("resultValues.size=" + resultValues.size());
391           return resultValues;
392         } // case AGREED_SEQ_NO
393 
394       } // END of switch (messageType)
395 
396     } catch (Exception e) {
397       log.warn("Could not read data from stream.", e);
398     }
399     return null;
400   }
401 
402 
403   /* (non-Javadoc)
404    * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object)
405    */
406   public Object deliverObject(Object msg, MemberId id, int seq)
407   {
408     /*
409      * This should never happen since we use the stream method also
410      * when dealing with multicasting simple objects.
411      */
412     throw new UnsupportedOperationException("Should never happen");
413   }
414 
415 
416   ////////////////////////////////////////////////////////////////////////////////////////////
417   // Nested class implementing the AckListener interface
418   ////////////////////////////////////////////////////////////////////////////////////////////
419 
420   /**
421    * An <code>AckListener</code> for collecting proposals for the
422    * distributed agreement protocol.
423    */
424   private class ProposalCollector
425     implements AckListener
426   {
427 
428     /** States */
429     private static final int STARTING  = 1;
430     private static final int WAITING   = 2;
431     private static final int COMPLETED = 3;
432 
433     /** Current state */
434     private volatile int state = STARTING;
435 
436     /** Number of unacknowledge messages */
437     private int missing;
438 
439     /** Proposals from servers */
440     private SequenceNo[] proposals;
441 
442     /** Flag completed servers */
443     private boolean[] completed;
444 
445     /** Update internal state */
446     private synchronized void updateState()
447     {
448       if (missing==0) {
449         state = COMPLETED;
450         notifyAll();
451       }
452     }
453 
454     /* (non-javadoc)
455      * @see jgroup.core.multicast.AckListener#ack(MemberId, int, Object)
456      */
457     public void ack(MemberId id, int pos, Object obj)
458       throws RemoteException
459     {
460 //      if (log.isDebugEnabled())
461 //        log.debug("Recieved proposal from " + id + ", pos=" + pos + ": " + obj);
462 
463       proposals[pos] = (SequenceNo) obj;
464       completed[pos] = true;
465       missing--;
466       updateState();
467     }
468 
469     /* (non-javadoc)
470      * @see jgroup.core.multicast.AckListener#notifyView(View)
471      */
472     public void notifyView(View view)
473       throws RemoteException
474     {
475       missing = view.size();
476       proposals = new SequenceNo[missing];
477       completed = new boolean[missing];
478       state = WAITING;
479     }
480 
481     /* (non-javadoc)
482      * @see jgroup.core.multicast.AckListener#viewChange()
483      */
484     public void viewChange()
485       throws RemoteException
486     {
487       log.warn("ProposalCollector: ViewChange during invocation handling");
488       //FIXME we should handle this differently.
489       missing = 0;
490       updateState();
491     }
492 
493     /**
494      * Returns the greateset proposal obtained from the members.
495      * This method might return <code>null</code> in case not all
496      * members responded with a <i>proposal</i>.
497      */
498     public synchronized SequenceNo getGreatestProposal()
499     {
500       while (state != COMPLETED) {
501         try { wait(); } catch (InterruptedException iex) {}
502       }
503 
504       // Select the greatest proposal
505       SequenceNo greatestProposal = proposals[0];
506       for (int i = 0; i < proposals.length; ++i) {
507         if (completed[i]) {
508           greatestProposal = SequenceNo.max(proposals[i], greatestProposal);
509         } else {
510           log.warn("Not all members have provided a PROPOSAL");
511           break;
512         }
513       }
514       return greatestProposal;
515     }
516 
517   } // END ProposalCollector
518 
519 
520   ////////////////////////////////////////////////////////////////////////////////////////////
521   // Nested class implementing the AckListener interface
522   ////////////////////////////////////////////////////////////////////////////////////////////
523 
524   private static final Map<SequenceNo,ResultCollector> acklMap =
525     new HashMap<SequenceNo,ResultCollector>();
526 
527   /**
528    * An <code>AckListener</code> for collecting results from multiple
529    * totally ordered invocations.
530    */
531   private class ResultCollector
532     implements AckListener
533   {
534 
535     /** Number of unacknowledge messages */
536     private int missing;
537 
538     /** Result values from servers */
539     private List[] results;
540 
541     /** The agreed sequence number associated with this AckListener instance */
542     private SequenceNo agreed;
543 
544     /** The application level AckListener that is being wrapped */
545     private AckListener ackl;
546 
547 
548     public ResultCollector(SequenceNo agreed, AckListener ackl)
549     {
550       this.agreed = agreed;
551       this.ackl = ackl;
552     }
553 
554     /* (non-javadoc)
555      * @see jgroup.core.multicast.AckListener#ack(MemberId, int, Object)
556      */
557     public void ack(MemberId id, int pos, Object obj)
558       throws RemoteException
559     {
560       if (log.isDebugEnabled())
561         log.debug("Recieved result from " + id + ", pos=" + pos + ": " + obj);
562 
563 //      ackl.ack(id, pos, obj);
564       List res = (List) obj;
565       for (Iterator iter = res.iterator(); iter.hasNext();) {
566         SequenceNo seqno = (SequenceNo) iter.next();
567         if (seqno.hasResult()) {
568           ResultCollector rc = (ResultCollector) acklMap.get(seqno);
569           log.debug("seqno=" + seqno + ", agreed=" + agreed);
570           rc.missing--;
571           if (rc.missing == 0)
572             acklMap.remove(seqno);
573           log.debug("delivering: " + seqno);
574           rc.ackl.ack(id, pos, seqno.result);
575         } else {
576           log.debug("no result for seqno: " + seqno);
577         }
578       }
579     }
580 
581     /* (non-javadoc)
582      * @see jgroup.core.multicast.AckListener#notifyView(View)
583      */
584     public void notifyView(View view)
585       throws RemoteException
586     {
587 //      for (Iterator iter = acklMap.values().iterator(); iter.hasNext();) {
588 //        ResultCollector rc = (ResultCollector) iter.next();
589 //        rc.ackl.notifyView(view);
590 //      }
591       ackl.notifyView(view);
592       missing = view.size();
593       results = new List[missing];
594     }
595 
596     /* (non-javadoc)
597      * @see jgroup.core.multicast.AckListener#viewChange()
598      */
599     public void viewChange()
600       throws RemoteException
601     {
602       if (log.isDebugEnabled())
603         log.debug("ResultCollector: ViewChange during invocation handling: ackl="
604           + ackl + ", agreed=" + agreed);
605       for (Iterator iter = acklMap.values().iterator(); iter.hasNext();) {
606         ResultCollector rc = (ResultCollector) iter.next();
607         log.debug("rc.agreed=" + rc.agreed);
608         rc.ackl.viewChange();
609         rc.missing = 0;
610       }
611 //      ackl.viewChange();
612 //      missing = 0;
613     }
614 
615   } // END ResultCollector
616 
617 
618   ////////////////////////////////////////////////////////////////////////////////////////////
619   // Nested static class for total order sequence numbers
620   ////////////////////////////////////////////////////////////////////////////////////////////
621 
622   /**
623    * Sequence number class used to define a total order of messages.
624    */
625   private static final class SequenceNo
626     implements Externalizable, Comparable
627   {
628 
629     private static final long serialVersionUID = -6394934923836273234L;
630 
631     /** The sequence number value */
632     private int sequenceNo = 100;
633 
634     /** The member that generated this sequence number */
635     private MemberId member;
636 
637     /**
638      * The message identifier associated with this sequence number.
639      * Note that the message identifier is not used when comparing
640      * sequence numbers or when computing the hash code.
641      */
642     private MessageID msgId = null;
643 
644     /**
645      * Result object associated with this sequence number.
646      */
647     private Object result = null;
648 
649 
650     /** Default constructor for externalization */
651     public SequenceNo() {}
652 
653     public SequenceNo(MemberId me)
654     {
655       member = me;
656     }
657 
658     public SequenceNo(SequenceNo oldSeq)
659     {
660       this.sequenceNo = oldSeq.sequenceNo;
661       this.member = oldSeq.member;
662       this.msgId = oldSeq.msgId;
663     }
664 
665     public static SequenceNo max(SequenceNo s1, SequenceNo s2)
666     {
667       if (s1.compareTo(s2) > 0) {
668         return new SequenceNo(s1);
669       } else {
670         return new SequenceNo(s2);
671       }
672     }
673 
674     /**
675      * Set the message identifier associated with this sequence number.
676      */
677     public void setMsgId(MessageID msgId)
678     {
679       this.msgId = msgId;
680     }
681 
682     public boolean hasResult()
683     {
684       return result != null;
685     }
686 
687     public boolean isDeliverable()
688     {
689       return msgId.deliverable;
690     }
691 
692     public int hashCode()
693     {
694       return sequenceNo ^ member.hashCode();
695     }
696 
697     public boolean equals(Object obj)
698     {
699       if (obj instanceof SequenceNo) {
700         SequenceNo sno = (SequenceNo) obj;
701         return (sequenceNo == sno.sequenceNo && member.equals(sno.member));
702       } else {
703         return false;
704       }
705     }
706 
707     public String toString()
708     {
709       StringBuilder buf = new StringBuilder();
710       buf.append("[");
711       String a = member.getEndPoint().getAddress().getHostAddress().toString();
712       String[] b = a.split("\\.");
713       if (b.length > 0)
714         buf.append(b[b.length-1]);
715       buf.append(".");
716       buf.append(sequenceNo);
717       if (result == null) {
718         buf.append("]");
719       } else {
720         buf.append("]: ");
721         buf.append(result);
722       }
723       return buf.toString();
724     }
725 
726     /* (non-Javadoc)
727      * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
728      */
729     public void readExternal(ObjectInput in)
730       throws IOException, ClassNotFoundException
731     {
732       sequenceNo = in.readInt();
733       member = new MemberIdImpl();
734       member.readExternal(in);
735       msgId = new MessageID();
736       msgId.readExternal(in);
737       ObjectInputStream ois = new ObjectInputStream((InputStream) in);
738       result = ois.readObject();
739     }
740 
741     /* (non-Javadoc)
742      * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
743      */
744     public void writeExternal(ObjectOutput out)
745       throws IOException
746     {
747       out.writeInt(sequenceNo);
748       member.writeExternal(out);
749       msgId.writeExternal(out);
750       // HACK required since OutMessage does not support writeObject()
751       ObjectOutputStream oos = new ObjectOutputStream((OutputStream) out);
752       oos.writeObject(result);
753     }
754 
755     /**
756      * Compare two <code>SequenceNo</code> instances.
757      *
758      * For equal sequence numbers we use the <code>MemberId</code>
759      * to distinguish them.  Otherwise, we use only the sequence
760      * number.  That is, the sequence number has precedence when
761      * comparing two <code>SequenceNo</code> instances.
762      *
763      * @see java.lang.Comparable#compareTo(java.lang.Object)
764      */
765     public int compareTo(Object o)
766     {
767       if (this == o)
768         return 0;
769       SequenceNo seqno = (SequenceNo) o;
770       if (this.sequenceNo == seqno.sequenceNo)
771         return this.member.compareTo(seqno.member);
772       else if (this.sequenceNo > seqno.sequenceNo)
773         return 1;
774       else
775         return -1;
776     }
777 
778   } // END SequenceNo
779 
780 
781   ////////////////////////////////////////////////////////////////////////////////////////////
782   // Nested static class for message identifiers
783   ////////////////////////////////////////////////////////////////////////////////////////////
784 
785   /**
786    * A message identifier is a unique identifier for messages
787    * generated within the system.  This is implement by a locally
788    * unique sequence number and a member identifier.
789    */
790   private static final class MessageID
791     implements Externalizable
792   {
793 
794     private static final long serialVersionUID = -6073546790683304835L;
795 
796     private static long nextSequenceNo = 0;
797     private long sequenceNo;
798     private MemberId memberId;
799 
800     /** Set to true when the message with this message can be delivered */
801     private transient boolean deliverable = false;
802 
803 
804     /** Default constructor for externalization */
805     public MessageID() {}
806 
807     public MessageID(MemberId id)
808     {
809       this.sequenceNo = nextSequenceNo++;
810       this.memberId = id;
811     }
812 
813     public void markAsDeliverable()
814     {
815       deliverable = true;
816     }
817 
818     public boolean isDeliverable()
819     {
820       return deliverable;
821     }
822 
823     public MemberId getSender()
824     {
825       return memberId;
826     }
827 
828     public boolean equals(Object obj)
829     {
830       if (obj instanceof MessageID) {
831         MessageID msgId = (MessageID) obj;
832         return (sequenceNo == msgId.sequenceNo && memberId.equals(msgId.memberId));
833       } else {
834         return false;
835       }
836     }
837 
838     public int hashCode()
839     {
840       return (int)(sequenceNo ^ (sequenceNo >>> 32)) ^ memberId.hashCode();
841     }
842 
843     public String toString()
844     {
845       StringBuilder buf = new StringBuilder();
846       buf.append("[");
847       String a = memberId.getEndPoint().getAddress().getHostAddress().toString();
848       String[] b = a.split("\\.");
849       if (b.length > 0)
850         buf.append(b[b.length-1]);
851       buf.append("*");
852       buf.append(sequenceNo);
853       buf.append("]");
854       return buf.toString();
855     }
856 
857     /* (non-Javadoc)
858      * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
859      */
860     public void readExternal(ObjectInput in)
861       throws IOException, ClassNotFoundException
862     {
863       sequenceNo = in.readLong();
864       memberId = new MemberIdImpl();
865       memberId.readExternal(in);
866     }
867 
868     /* (non-Javadoc)
869      * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
870      */
871     public void writeExternal(ObjectOutput out)
872       throws IOException
873     {
874       out.writeLong(sequenceNo);
875       memberId.writeExternal(out);
876     }
877 
878   } // END MessageID
879 
880 
881   public static void main(String[] args)
882   {
883     MemberId m1 = new MemberIdImpl(new EndPointImpl(239239, 1239), 12, new EndPointImpl(122323, 2323), 0);
884     MemberId m2 = new MemberIdImpl(new EndPointImpl(339239, 1239), 12, new EndPointImpl(132323, 2323), 0);
885     MemberId m3 = new MemberIdImpl(new EndPointImpl(439239, 1239), 12, new EndPointImpl(142323, 2323), 0);
886     SequenceNo s1 = new SequenceNo(m1);
887     SequenceNo s2 = new SequenceNo(m2);
888     SequenceNo s3 = new SequenceNo(m3);
889     s1.sequenceNo++; s2.sequenceNo++;
890     System.out.println("s1.compareTo(s1)=" + s1.compareTo(s1));
891     System.out.println("s2.compareTo(s1)=" + s2.compareTo(s1));
892     System.out.println("s3.compareTo(s1)=" + s3.compareTo(s1));
893     System.out.println("s1.compareTo(s2)=" + s1.compareTo(s2));
894     System.out.println("s1.compareTo(s3)=" + s1.compareTo(s3));
895     System.out.println("s2.compareTo(s3)=" + s2.compareTo(s3));
896     System.out.println("s1=" + s1);
897     System.out.println("s2=" + s2);
898     System.out.println("s3=" + s3);
899     System.out.println("max(s1, s2)=s2=" + SequenceNo.max(s1, s2));
900     System.out.println("max(s1, s3)=s1=" + SequenceNo.max(s1, s3));
901     System.out.println("max(s2, s3)=s2=" + SequenceNo.max(s2, s3));
902   }
903 
904 } // END TotalOrderLayer