View Javadoc

1   /*
2    * Copyright (c) 1998-2006 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi.protocols;
20  
21  import java.io.Externalizable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.ObjectInput;
25  import java.io.ObjectOutput;
26  import java.io.OutputStream;
27  import java.rmi.RemoteException;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.NoSuchElementException;
35  import java.util.SortedMap;
36  import java.util.TreeMap;
37  import java.util.concurrent.locks.Lock;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import jgroup.core.MemberId;
41  import jgroup.core.MembershipService;
42  import jgroup.core.multicast.MulticastListener;
43  import jgroup.core.multicast.MulticastService;
44  import jgroup.relacs.gmi.GroupAckListener;
45  import jgroup.relacs.gmi.GroupInvocationDispatcher;
46  import jgroup.relacs.gmi.InvocationResult;
47  import jgroup.relacs.gmi.MethodSemantics;
48  import jgroup.relacs.types.MemberIdImpl;
49  import jgroup.util.InMessage;
50  import jgroup.util.OutMessage;
51  
52  import org.apache.log4j.Logger;
53  import org.apache.log4j.MDC;
54  
55  /**
56   * Atomic represents a protocol that dispatch method invocations
57   * received through the local endpoint to all the other methods of
58   * the group, including the local server.  Each message received is
59   * ordered according to a total order among all members of the group.
60   * The result from all members is returned to the local endpoint whom
61   * first received the method invocation request from some client, and
62   * one of results are selected and passed back to the client.
63   * 
64   * Total ordering is accomplished using distributed agreement protocol similar
65   * to that of [Birman and Joseph 1987].
66   *
67   * @author Hein Meling <hein.meling@uis.no>
68   * @since Jgroup 2.2
69   */
70  public class Atomic
71    extends BasicDispatcher
72    implements MulticastListener
73  {
74  
75    ////////////////////////////////////////////////////////////////////////////////////////////
76    // Logger
77    ////////////////////////////////////////////////////////////////////////////////////////////
78  
79    /** Obtain logger for this class */
80    private static final Logger log = Logger.getLogger(Atomic.class);
81  
82  
83    ////////////////////////////////////////////////////////////////////////////////////////////
84    // Constants
85    ////////////////////////////////////////////////////////////////////////////////////////////
86  
87    /** Protocol name for demux in the multicast layer */
88    private static final String PROTOCOL_NAME = MethodSemantics.ATOMIC.toString();
89  
90    /** Message type identifiers for the distributed agreement protocol */
91    private enum MsgType { REQUEST_SEQ_NO, AGREED_SEQ_NO };
92  
93  
94    ////////////////////////////////////////////////////////////////////////////////////////////
95    // Fields
96    ////////////////////////////////////////////////////////////////////////////////////////////
97  
98    /** The lock preventing concurrent event handling */
99    private final Lock lock = new ReentrantLock();
100 
101   /** The multicast service interface */
102   private final MulticastService mcast;
103 
104   /** The membership service interface */
105   private final MembershipService pgms;
106 
107   /** This member's identifier */
108   private final MemberId me;
109 
110   /** The largest agreed sequence number */
111   private SequenceNo largestAgreedSeqNo;
112 
113   /** The largest proposed sequence number */
114   private SequenceNo largestProposedSeqNo;
115 
116   /** Map of proposals: MessageID -> SequenceNo (Proposal) */
117   private final Map<MessageID, SequenceNo> proposalsMap =
118     new HashMap<MessageID, SequenceNo>();
119 
120   /** The hold-back queue of messages (mapping: SequenceNo -> InputStream) */
121   private final SortedMap<SequenceNo, InputStream> holdBackQueue =
122     new TreeMap<SequenceNo, InputStream>();
123 
124   /** Map of result collectors */
125   private final Map<SequenceNo, ResultCollector> resultCollectors =
126     Collections.synchronizedMap(new HashMap<SequenceNo, ResultCollector>());
127 
128 
129   ////////////////////////////////////////////////////////////////////////////////////////////
130   // Constructor
131   ////////////////////////////////////////////////////////////////////////////////////////////
132 
133   public Atomic(GroupInvocationDispatcher dispatcher,
134       MulticastService mcast, MembershipService pgms)
135   {
136     super(dispatcher);
137     this.mcast = mcast;
138     this.pgms = pgms;
139     this.me = pgms.getMyIdentifier();
140     largestAgreedSeqNo = new SequenceNo(me);
141     largestProposedSeqNo = new SequenceNo(me);
142     /*
143      * We need to listen to multicast events, and since this is
144      * not a layer it will not be handled automatically by the
145      * group manager construction mechanism.
146      */
147     mcast.addListener(this);
148   }
149 
150 
151   ////////////////////////////////////////////////////////////////////////////////////////////
152   // ProtocolDispatcher interface (overriding BasicDispatcher)
153   ////////////////////////////////////////////////////////////////////////////////////////////
154 
155   /**
156    * Handle inbound request to the local endpoint with <i>atomic</i>
157    * invocation semantics.
158    * 
159    * The local endpoint will send a multicast message to all the other group
160    * members for method invocation dispatching at each members.  They each
161    * return a result to the local endpoint, and one of these values are selected
162    * and returned to the client (GroupInvocationHandler).
163    * 
164    * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
165    */
166   public InvocationResult dispatch(InputStream in)
167     throws IOException
168   {
169     // Reduce overhead if group has a single member only
170     if (pgms.members() == 1)
171       return super.dispatch(in);
172 
173     SequenceNo agreed = getAgreed(in);
174     if (agreed == null)
175       throw new IOException("Could not obtain agreed sequence number; Proposals not received from all members");
176 
177     OutputStream agreedMsg = prepareAgreedMessage(agreed);
178     ResultCollector rc = new ResultCollector(agreed);
179     resultCollectors.put(agreed, rc);
180 
181     // send out the agreed message (eventually causing delivery of method invocations)
182     try {
183       mcast.mcast(agreedMsg, rc.getRemoteAckListener());
184     } catch (Exception e) {
185       log.warn("Failed multicast agreed sequence numbers to the group", e);
186       IOException ioex = new IOException("Failed multicast agreed sequence numbers to the group");
187       ioex.initCause(e);
188       throw ioex;
189     }
190     // Wait for ack listener
191     return (InvocationResult) rc.getResult();
192   }
193 
194 
195   /**
196    * 
197    * @param agreed
198    * @throws IOException
199    */
200   private OutputStream prepareAgreedMessage(SequenceNo agreed)
201     throws IOException
202   {
203     /* Send the agreed sequence number to all members */
204     OutMessage out = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
205     // write common part: message type
206     out.writeObject(MsgType.AGREED_SEQ_NO);
207     // write the agreed sequence number
208     agreed.writeExternal(out);
209     out.flush();
210     return out;
211   }
212 
213 
214   /**
215    * Multicast the invocation to the group, and wait for proposals for the
216    * sequence number of this invocation; the greatest proposal is returned
217    * as the agreed sequence number.
218    * 
219    * @param in the invocation stream to multicast to the group
220    * @return the agreed sequence number (the greatest proposal)
221    * @throws IOException if the invocation failed due to either a communication
222    *   failure or a new view was installed preventing all members from sending
223    *   their propsal.
224    */
225   private SequenceNo getAgreed(InputStream in)
226     throws IOException
227   {
228     OutputStream reqMsg = prepareReqMessage(in);
229     // Do multicast the invocation; wait for ordering proposals
230     ProposalCollector pc = new ProposalCollector();
231     if (log.isDebugEnabled())
232       log.debug("Atomic: multicasting invocation to group members");
233     try {
234       mcast.mcast(reqMsg, pc.getRemoteAckListener());
235     } catch (Exception e) {
236       log.warn("Invocation failed", e);
237       IOException ioex = new IOException("Failed to send invocation to group");
238       ioex.initCause(e);
239       throw ioex;
240     }
241     // Wait for all proposals and compute the agreed sequence number
242     return pc.getGreatestProposal();
243   }
244 
245 
246   /**
247    * 
248    * @param in
249    * @return
250    * @throws IOException
251    */
252   private OutputStream prepareReqMessage(InputStream in)
253     throws IOException
254   {
255     OutMessage mout = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
256     mout.writeObject(MsgType.REQUEST_SEQ_NO);
257     /* Create a message id for this message and write it to the stream header */
258     MessageID msgId = new MessageID(me);
259     msgId.writeExternal(mout);
260     // Write request into multicast output stream
261     byte[] buf = new byte[1500];
262     int bytesRead;
263     while ((bytesRead = in.read(buf)) != -1) {
264       mout.write(buf, 0, bytesRead);
265     }
266     return mout;
267   }
268 
269 
270   ////////////////////////////////////////////////////////////////////////////////////////////
271   // MulticastListener methods
272   ////////////////////////////////////////////////////////////////////////////////////////////
273 
274   /* (non-Javadoc)
275    * @see jgroup.core.multicast.MulticastListener#getProtocolName()
276    */
277   public String getProtocolName()
278   {
279     return PROTOCOL_NAME;
280   }
281 
282   /**
283    * Received a multicast message from a mediating endpoint for invocation
284    * dispatching at the local server.
285    * 
286    * @param msg
287    *   Atomic multicast message received
288    * @return
289    *   The result of the atomic multicast invocation, or <code>null</code>
290    *   if the invocation failed.
291    * 
292    * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
293    */
294   public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
295   {
296     if (log.isDebugEnabled()) {
297       MDC.put("group", "[Group: " + pgms.getGid() + "]");
298       log.debug("deliverStream() from " + sender);
299     }
300     InMessage inmsg = (InMessage) msg;
301     try {
302       /* Read message type from input stream */
303       MsgType msgType = (MsgType) inmsg.readObject();
304       /* Multiplex on the various message types */
305       switch (msgType) {
306         case REQUEST_SEQ_NO:
307           return computeProposedSeqNo(inmsg);
308 
309         case AGREED_SEQ_NO:
310           return dispatchAgreedInvocations(inmsg);
311 
312         default:
313           log.error("Unknown message type: " + msgType);
314           break;
315       }
316     } catch (IOException e) {
317       log.warn("Failed to read from input stream", e);
318     } catch (ClassNotFoundException e) {
319       log.warn("Class not found", e);
320     }
321 
322     return null;
323   }
324 
325 
326   private Object dispatchAgreedInvocations(InMessage inmsg)
327     throws IOException, ClassNotFoundException
328   {
329     /* Read the agreed sequence number from the input stream */
330     SequenceNo agreed = new SequenceNo();
331     agreed.readExternal(inmsg);
332     if (log.isDebugEnabled())
333       log.debug("Received AGREED: " + agreed + " for msg " + agreed.msgId);
334 
335     // Reorder the holdBackQueue if agreed differs from my proposed sequence number
336     reorderQueue(agreed);
337     if (log.isDebugEnabled()) {
338       log.debug("holdBackQueue=" + holdBackQueue.keySet());
339       log.debug(" proposalsMap=" + proposalsMap);
340     }
341 
342     return dispatchFromQueue(agreed);
343   }
344 
345 
346   /**
347    * 
348    * @param agreed
349    */
350   private Object dispatchFromQueue(SequenceNo agreed)
351   {
352     List<SequenceNo> results = new ArrayList<SequenceNo>(holdBackQueue.size());
353     lock.lock();
354     try {
355       for (Iterator<Map.Entry<SequenceNo, InputStream>> iter = holdBackQueue.entrySet().iterator(); iter.hasNext();) {
356         Map.Entry<SequenceNo, InputStream> entry = iter.next();
357         SequenceNo seqno = entry.getKey();
358         // if this is a newly agreed seqno, tag the message as deliverable
359         if (seqno.equals(agreed))
360           seqno.markAsDeliverable();
361         if (seqno.isDeliverable()) {
362           InputStream stream = entry.getValue();
363           iter.remove();
364           if (log.isDebugEnabled())
365             log.debug("Delivering message: " + seqno.msgId + ", assigned seqno: " + seqno);
366           try {
367             // Perform the actual method invocation on the local server (BasicDispatcher)
368             seqno.result = super.dispatch(stream);
369             /*
370              * Save the sequence number in the results list (the result object returned
371              * by the invocation is embedded within the sequence number object)
372              */
373             results.add(seqno);
374           } catch (IOException e) {
375             log.warn("Failed to dispatch atomic invocation", e);
376           }
377         } else {
378           // Stop moving messages to the delivery queue
379           if (log.isDebugEnabled())
380             log.debug("Stopping message delivery. " + seqno.msgId + " is undeliverable: " + seqno);
381           break;
382         }
383       }
384     } finally {
385       lock.unlock();
386     }
387     if (log.isDebugEnabled())
388       log.debug("results.size=" + results.size());
389     return results;
390   }
391 
392 
393   /**
394    * Reorder the hold-back queue in case the given agreed sequence number is
395    * different from the sequence number proposed by this local member.
396    * 
397    * @param agreed the sequence number that all have agreed to use for its associated message
398    */
399   private void reorderQueue(SequenceNo agreed)
400   {
401     lock.lock();
402     try {
403       // Update the largest agreed sequence number
404       largestAgreedSeqNo = SequenceNo.max(largestAgreedSeqNo, agreed);
405       // Get my proposal for the msgId currently being ordered
406       SequenceNo myProposal = proposalsMap.remove(agreed.msgId);
407       if (!agreed.equals(myProposal)) {
408         // Reorder the hold-back queue
409         if (log.isDebugEnabled())
410           log.debug("Reordering hold-back queue: myProposal=" + myProposal
411               + " is different from agreed=" + agreed);
412         InputStream stream = holdBackQueue.remove(myProposal);
413         if (stream == null) {
414           log.error("MESSAGE NOT FOUND IN HOLD-BACK QUEUE: " + myProposal);
415           // abort message handling
416           throw new NoSuchElementException("Message " + myProposal + " not found in hold-back queue");
417         } else {
418           holdBackQueue.put(agreed, stream);
419         }
420       }
421     } finally {
422       lock.unlock();
423     }
424   }
425 
426 
427   /**
428    * 
429    * @param inmsg
430    * @return
431    * @throws IOException
432    * @throws ClassNotFoundException
433    */
434   private SequenceNo computeProposedSeqNo(InMessage inmsg)
435     throws IOException, ClassNotFoundException
436   {
437     /* Read the message identifier from the input stream */
438     MessageID msgId = new MessageID();
439     msgId.readExternal(inmsg);
440     lock.lock();
441     try {
442       /* Compute a new proposed sequence number */
443       largestProposedSeqNo = SequenceNo.max(largestAgreedSeqNo, largestProposedSeqNo);
444       largestProposedSeqNo.updateSeqNo(msgId);
445       /* Associate the msg id and its proposed sequence number */
446       proposalsMap.put(msgId, largestProposedSeqNo);
447       /*
448        * When getting the message back from the holdBackQueue, the method
449        * associated with it can be dispatched in the delivery phase.
450        */
451       holdBackQueue.put(largestProposedSeqNo, inmsg);
452       if (log.isDebugEnabled())
453         log.debug("Computed PROPOSAL: " + largestProposedSeqNo + " for msg " + msgId);
454       return largestProposedSeqNo;
455     } finally {
456       lock.unlock();
457     }
458 
459   }
460 
461 
462   /* (non-Javadoc)
463    * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object, jgroup.core.MemberId, int)
464    */
465   public Object deliverObject(Object msg, MemberId sender, int seqNo)
466   {
467     throw new UnsupportedOperationException();
468   }
469 
470 
471   ////////////////////////////////////////////////////////////////////////////////////////////
472   // Nested classes extending the GroupAckListener
473   ////////////////////////////////////////////////////////////////////////////////////////////
474 
475   /**
476    * An <code>AckListener</code> for collecting proposals for the
477    * distributed agreement protocol.
478    */
479   private class ProposalCollector
480     extends GroupAckListener
481   {
482 
483     public ProposalCollector()
484     {
485       super(new Object());
486     }
487 
488     private boolean isCompleted(int index)
489     {
490       return (completed[index] && !(results[index] instanceof RemoteException));
491     }
492 
493     /**
494      * Returns the greateset proposal obtained from the members.
495      * This method might return <code>null</code> in case not all
496      * members responded with a <i>proposal</i>.
497      */
498     public SequenceNo getGreatestProposal()
499     {
500       pendingCompletion();
501 
502       // Select the greatest proposal
503       SequenceNo greatestProposal = null;
504       if (isCompleted(0)) {
505         greatestProposal = (SequenceNo) results[0];
506         for (int i = 1; i < results.length; i++) {
507           if (isCompleted(i)) {
508             SequenceNo theProposal = (SequenceNo) results[i];
509             greatestProposal = SequenceNo.max(theProposal, greatestProposal);
510           } else {
511             greatestProposal = null;
512             break;
513           }
514         }
515       }
516       if (greatestProposal == null)
517         log.warn("Not all members have provided a PROPOSAL");
518       return greatestProposal;
519     }
520 
521   } // END ProposalCollector
522 
523 
524   /**
525    * An <code>AckListener</code> for collecting results for the
526    * distributed agreement protocol and delivering to the application.
527    */
528   private class ResultCollector
529     extends GroupAckListener
530   {
531 
532     private SequenceNo agreed;
533 
534     public ResultCollector(SequenceNo agreed)
535     {
536       super(agreed);
537       this.agreed = agreed;
538     }
539 
540     /* (non-javadoc)
541      * @see jgroup.core.multicast.AckListener#ack(MemberId, int, Object)
542      */
543     @SuppressWarnings("unchecked")
544     @Override
545     public void ack(MemberId id, int pos, Object obj)
546       throws RemoteException
547     {
548       if (log.isDebugEnabled())
549         log.debug("Recieved result from " + id + ", pos=" + pos + ": " + obj);
550       if (log.isDebugEnabled())
551         log.debug("resultCollectors: " + resultCollectors.keySet());
552 
553       List<SequenceNo> results = (List) obj;
554       if (results.isEmpty()) {
555         /*
556          * Nothing was dispatched, and hence no result to deliver; may have
557          * been delivered previously when two or more results was included?!?
558          */
559         if (log.isDebugEnabled())
560           log.debug("Empty results list when agreed=" + agreed);
561         ResultCollector rc = resultCollectors.get(agreed);
562         // ack with null to ensure progress
563         rc.doAck(id, pos, null);
564         return;
565       }
566       for (SequenceNo seqno : results) {
567         ResultCollector rc = resultCollectors.get(seqno);
568         if (log.isDebugEnabled())
569           log.debug("Delivering seqno=" + seqno + ", when agreed=" + agreed);
570         rc.doAck(id, pos, seqno.result);
571       }
572     }
573 
574     private void doAck(MemberId id, int pos, Object obj)
575       throws RemoteException
576     {
577       super.ack(id, pos, obj);
578     }
579 
580     @Override
581     public Object getResult()
582     {
583       // blocks until invocation completed
584       Object result = super.getResult();
585       // remove the result collector for this agreed sequence number (it has completed)
586       resultCollectors.remove(agreed);
587       return result;
588     }
589 
590   } // END ResultCollector
591 
592 
593   ////////////////////////////////////////////////////////////////////////////////////////////
594   // Nested static class for total order sequence numbers
595   ////////////////////////////////////////////////////////////////////////////////////////////
596 
597   /**
598    * Sequence number class used to define a total order of messages.
599    */
600   private static final class SequenceNo
601     implements Externalizable, Comparable<SequenceNo>
602   {
603 
604     private static final long serialVersionUID = -6394934923836273234L;
605 
606     /** The sequence number value */
607     private int sequenceNo = 100;
608 
609     /** The member that generated this sequence number */
610     private MemberId member;
611 
612     /**
613      * The message identifier associated with this sequence number.
614      * Note that the message identifier is not used when comparing
615      * sequence numbers or when computing the hash code.
616      */
617     private MessageID msgId = null;
618 
619     /**
620      * Result object associated with this sequence number.
621      */
622     private Object result = null;
623 
624     /** Set to true when the message can be delivered */
625     private transient boolean deliverable = false;
626 
627 
628     /** Default constructor for externalization */
629     public SequenceNo() {}
630 
631     public SequenceNo(MemberId me)
632     {
633       member = me;
634     }
635 
636     public SequenceNo(SequenceNo oldSeq)
637     {
638       this.sequenceNo = oldSeq.sequenceNo;
639       this.member = oldSeq.member;
640       this.msgId = oldSeq.msgId;
641     }
642 
643     public static SequenceNo max(SequenceNo s1, SequenceNo s2)
644     {
645       if (s1.compareTo(s2) > 0) {
646         return new SequenceNo(s1);
647       } else {
648         return new SequenceNo(s2);
649       }
650     }
651 
652     /**
653      * Update the sequence number counter and set the message identifier associated
654      * with this sequence number.
655      * 
656      * @param msgId the message identifier to which this sequence number should be associated.
657      */
658     public void updateSeqNo(MessageID msgId)
659     {
660       sequenceNo++;
661       this.msgId = msgId;
662     }
663 
664     public boolean isDeliverable()
665     {
666       return deliverable;
667     }
668 
669     public void markAsDeliverable()
670     {
671       deliverable = true;
672     }
673 
674     public int hashCode()
675     {
676       return sequenceNo ^ member.hashCode();
677     }
678 
679     public boolean equals(Object obj)
680     {
681       if (obj instanceof SequenceNo) {
682         SequenceNo sno = (SequenceNo) obj;
683         return (sequenceNo == sno.sequenceNo && member.equals(sno.member));
684       } else {
685         return false;
686       }
687     }
688 
689     public String toString()
690     {
691       StringBuilder buf = new StringBuilder();
692       buf.append("[");
693       String a = member.getEndPoint().getAddress().getHostAddress().toString();
694       String[] b = a.split("\\.");
695       if (b.length > 0)
696         buf.append(b[b.length-1]);
697       buf.append(".");
698       buf.append(sequenceNo);
699       buf.append("]");
700       return buf.toString();
701     }
702 
703     /* (non-Javadoc)
704      * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
705      */
706     public void readExternal(ObjectInput in)
707       throws IOException, ClassNotFoundException
708     {
709       sequenceNo = in.readInt();
710       member = new MemberIdImpl();
711       member.readExternal(in);
712       msgId = new MessageID();
713       msgId.readExternal(in);
714       result = in.readObject();
715     }
716 
717     /* (non-Javadoc)
718      * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
719      */
720     public void writeExternal(ObjectOutput out)
721       throws IOException
722     {
723       out.writeInt(sequenceNo);
724       member.writeExternal(out);
725       msgId.writeExternal(out);
726       out.writeObject(result);
727     }
728 
729     /**
730      * Compare two <code>SequenceNo</code> instances.
731      *
732      * For equal sequence number values we use the <code>MemberId</code>
733      * to distinguish them.  Otherwise, we use only the sequence
734      * number.  That is, the sequence number has precedence when
735      * comparing two <code>SequenceNo</code> instances.
736      *
737      * @see java.lang.Comparable#compareTo(java.lang.Object)
738      */
739     public int compareTo(SequenceNo seqno)
740     {
741       if (this == seqno)
742         return 0;
743       if (this.sequenceNo == seqno.sequenceNo)
744         return this.member.compareTo(seqno.member);
745       else if (this.sequenceNo > seqno.sequenceNo)
746         return 1;
747       else
748         return -1;
749     }
750 
751   } // END SequenceNo
752 
753 
754   ////////////////////////////////////////////////////////////////////////////////////////////
755   // Nested static class for message identifiers
756   ////////////////////////////////////////////////////////////////////////////////////////////
757 
758   /**
759    * A message identifier is a unique identifier for messages
760    * generated within the system.  This is implement by a locally
761    * unique sequence number and a member identifier.
762    */
763   private static final class MessageID
764     implements Externalizable
765   {
766 
767     private static final long serialVersionUID = -6073546790683304835L;
768 
769     private static long nextSequenceNo = 0;
770     private long sequenceNo;
771     private MemberId memberId;
772 
773     /** Default constructor for externalization */
774     public MessageID() {}
775 
776     public MessageID(MemberId id)
777     {
778       this.sequenceNo = nextSequenceNo++;
779       this.memberId = id;
780     }
781 
782     public boolean equals(Object obj)
783     {
784       if (obj instanceof MessageID) {
785         MessageID msgId = (MessageID) obj;
786         return (sequenceNo == msgId.sequenceNo && memberId.equals(msgId.memberId));
787       } else {
788         return false;
789       }
790     }
791 
792     public int hashCode()
793     {
794       return (int)(sequenceNo ^ (sequenceNo >>> 32)) ^ memberId.hashCode();
795     }
796 
797     public String toString()
798     {
799       StringBuilder buf = new StringBuilder();
800       buf.append("[");
801       String a = memberId.getEndPoint().getAddress().getHostAddress().toString();
802       String[] b = a.split("\\.");
803       if (b.length > 0)
804         buf.append(b[b.length-1]);
805       buf.append("*");
806       buf.append(sequenceNo);
807       buf.append("]");
808       return buf.toString();
809     }
810 
811     /* (non-Javadoc)
812      * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
813      */
814     public void readExternal(ObjectInput in)
815       throws IOException, ClassNotFoundException
816     {
817       sequenceNo = in.readLong();
818       memberId = new MemberIdImpl();
819       memberId.readExternal(in);
820     }
821 
822     /* (non-Javadoc)
823      * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
824      */
825     public void writeExternal(ObjectOutput out)
826       throws IOException
827     {
828       out.writeLong(sequenceNo);
829       memberId.writeExternal(out);
830     }
831 
832   } // END MessageID
833 
834 } // END Atomic