View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import jgroup.core.EndPoint;
32  import jgroup.core.MemberId;
33  import jgroup.core.View;
34  import jgroup.core.multicast.ChainIdentifier;
35  import jgroup.relacs.events.DeliveryAck;
36  import jgroup.relacs.events.InstallAck;
37  import jgroup.relacs.events.JoinRequest;
38  import jgroup.relacs.events.LeaveRequest;
39  import jgroup.relacs.events.MemberLeftEvent;
40  import jgroup.relacs.events.MulticastRequest;
41  import jgroup.relacs.events.PrepareAck;
42  import jgroup.relacs.events.PrepareEvent;
43  import jgroup.relacs.mss.HostTable;
44  import jgroup.relacs.mss.Mss;
45  import jgroup.relacs.mss.MssConstants;
46  import jgroup.relacs.mss.MssHost;
47  import jgroup.relacs.mss.MssUser;
48  import jgroup.relacs.types.LocalId;
49  import jgroup.relacs.types.MemberIdImpl;
50  import jgroup.relacs.types.ViewId;
51  import jgroup.relacs.types.ViewImpl;
52  import jgroup.util.Queue;
53  import jgroup.util.Util;
54  
55  import org.apache.log4j.Logger;
56  import org.apache.log4j.MDC;
57  
58  
59  /**
60   * The <code>Group</code> class
61   *
62   * @author  Alberto Montresor
63   * @since   Jgroup 0.1
64   */
65  final class Group
66    implements Tag, MssConstants
67  {
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Logger
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** Obtain logger for this class */
74    public static final Logger log = Logger.getLogger(Group.class);
75  
76  
77    ////////////////////////////////////////////////////////////////////////////////////////////
78    // Fields
79    ////////////////////////////////////////////////////////////////////////////////////////////
80  
81    // References to other data structures of this layer
82    
83    /** Group table */
84    private final GroupTable grouptable;
85    
86    /** Table containing the hosts handled by this daemon */
87    private final HostTable hosttable;
88  
89    /** Mss used to send messages */
90    private final Mss mss;
91    
92    // General fields
93    
94    /** Group identifier */
95    private int gid;
96    
97    /** Group status (idle, synch, estimate) */
98    private int status;
99    
100   /** Current reachable set */
101   private EndPoint[] rset;
102   
103   /** Information related to this host */
104   private final HostData me; 
105   
106   /** My identifier (32 bit IP address + port number) */
107   private final EndPoint myaddress;
108   
109   /** Number of views installed so far */
110   private int ninstalled;
111   
112   /** Number of views coordinated so far */
113   private int ncoordinated;
114   
115   /** Number of suspect events */
116   private int nsuspects;
117 
118   /** Complete list of hosts interested in the group */
119   private final Map<EndPoint,HostData> thosts;
120   
121   /** Complete list of local members interested in the group */
122   private final Map<LocalId,MemberData> tmembers;
123 
124   /** Number of members contained in the system, excluding the leaving members */
125   private int nmembers;
126   
127   
128   // Current view information
129   
130   /** Complete view identifier */
131   private long cvid;
132   
133   /** View identifier */
134   private long pvid;
135   
136   /** View composition */
137   private ViewDescription viewDesc;
138 
139   
140   // Agreement phase data
141   
142   /** Messages whose sending is postponed */
143   private Queue delayedSent; 
144   
145   /** Messages whose delivery is postponed */
146   private Queue delayedRcvd;
147   
148   /** Hosts in the current estimate */
149   private Estimate estimate;
150   
151   /** Current proposal */
152   private MsgProp proposal;
153   
154   /** True if message buffer is consistent */
155   private boolean msgsStable;
156   
157   /** 
158    * True if the installation of the view has been completed 
159    * by every local member included in the view. When false, 
160    * the local host cannot participate in the agreement protocol
161    * by sending proposal, as the current view has not been
162    * installed by one of the local members. Once true, it will
163    * stay true until the next view installation.
164    */
165   private boolean installationComplete;
166 
167   /** Send buffer */
168   private final SendBuffer sbuffer;
169 
170   
171   ////////////////////////////////////////////////////////////////////////////////////////////
172   // Constructors
173   ////////////////////////////////////////////////////////////////////////////////////////////
174 
175   /**
176    *  Creates a group identified by the group identifier <code> gid</code>.
177    *
178    *  @param gid             group identifier
179    *  @param vid             inital view identifier (both complete and partial)
180    *  @param hd              this host additional info
181    *  @param md              joining member additional info
182    */
183   Group(GroupTable grouptable, Mss mss, int gid, View view, HostData hd, MemberData md)
184   {
185     // References to other data structures
186     this.grouptable = grouptable;
187     this.mss = mss;
188     this.hosttable = mss.getHostTable();
189 
190     // General fields
191     this.gid = gid;
192     // Initial status of group
193     this.status = S_IDLE;
194     MDC.put("group", toString());
195     myaddress = hd.getEndPoint();
196     rset = new EndPoint[] { myaddress };
197     me = hd;
198     ncoordinated = 0;
199     ninstalled = 0;
200     nsuspects = 0;
201     thosts = new HashMap<EndPoint,HostData>();
202     tmembers = new HashMap<LocalId,MemberData>();
203     thosts.put(myaddress, hd);
204     tmembers.put(md.getMemberId().getLocalId(), md);
205     nmembers = 1;
206 
207     // Initialization of the first view
208     cvid = view.getVid();
209     pvid = cvid;
210     viewDesc = new ViewDescription(hd, md);
211     sbuffer = new SendBuffer();
212     sbuffer.setView(view, 1);
213 
214     // Initialization of agreement phase info
215     delayedSent = new Queue();
216     delayedRcvd = new Queue();
217     proposal    = new MsgProp();
218     msgsStable  = true;
219     installationComplete  = false;
220   }
221 
222 
223   ////////////////////////////////////////////////////////////////////////////////////////////
224   // Group management
225   ////////////////////////////////////////////////////////////////////////////////////////////
226 
227   /**
228    *  Return the identifier of this group. 
229    */
230   int getGid()
231   {
232     return gid;
233   }
234   
235   
236   /**
237    *  Returns a collection containing the hosts of the group
238    */
239   Collection<HostData> getHosts()
240   {
241     return thosts.values();
242   }
243   
244   /**
245    *  Returns a collection containing the local members of the group
246    */
247   Collection<MemberData> getMembers()
248   {
249     return tmembers.values();
250   }
251 
252   /**
253    *  Returns true if this group contains no local members;
254    *  otherwise false is returned.
255    */
256   boolean isEmpty()
257   {
258     return tmembers.isEmpty();
259   }
260 
261   /**
262    * Returns a string representation of this Group object.
263    */
264   public String toString()
265   {
266     return toString(false);
267   }
268 
269   /**
270    * Returns a string representation of this Group object.
271    */
272   public String toString(boolean full)
273   {
274     StringBuilder buf = new StringBuilder("[Group: ");
275     buf.append(gid);
276     buf.append(", ");
277     switch (status) {
278       case S_IDLE :
279         buf.append("IDLE");
280         break;
281       case S_SYNCH :
282         buf.append("SYNCH");
283         break;
284       case S_ESTIMATE :
285         buf.append("ESTIMATE");
286         break;
287       default :
288         buf.append("INVALID STATUS");
289         break;
290     }
291     if (full) {
292       buf.append(", ");
293       buf.append(estimate);
294     }
295     buf.append("]");
296     return buf.toString();
297   }
298 
299   /**
300    * 
301    */
302   HostData getHost(EndPoint endpoint)
303   {
304     return (HostData) thosts.get(endpoint);
305   }
306   
307   /**
308    *  Add the host <CODE>src</CODE> to the group. 
309    *  
310    *  @param src the host to be added
311    *  @param hosts the hosts known to src, that we need to add as well
312    *  @return true if <CODE>src</CODE> was added to the group
313    */
314   boolean join(MssHost src, MssHost[] hosts)
315   {
316     if (log.isDebugEnabled())
317       log.debug("Group.join: START");
318     
319     EndPoint endpoint = src.getEndPoint();
320     HostData sender = (HostData) thosts.get(endpoint);
321     if (sender == null) {
322       if (log.isDebugEnabled())
323         log.debug("Group.join: new host");
324 
325       /* Add the host in src and the hosts in hosts to the group */   
326       thosts.put(endpoint, new HostData(src));
327       grouptable.addEndpointToGroup(endpoint, this);
328       for (int i=0; i < hosts.length; i++) {
329         HostData scan = (HostData) thosts.get(hosts[i].getEndPoint());
330         if (scan == null) {
331           thosts.put(hosts[i].getEndPoint(), new HostData(hosts[i]));
332           grouptable.addEndpointToGroup(hosts[i].getEndPoint(), this);
333         }
334       }
335 
336       /* We have added src to the group */
337       return true;
338       
339     } else if (sender.isLeaving()) {
340       
341       /* The sender is not leaving any more */
342       sender.setLeaving(false);
343       if (status == S_IDLE && Util.in(rset, endpoint))
344         synchronizationPhaseInit();
345     }
346     
347     /* We have not added anything */
348     return false;
349   }
350 
351   
352   /**
353    *  Remove the host described in src from the group.
354    */
355   void leave(EndPoint src)
356   {
357     HostData sender = thosts.get(src);
358     if (sender != null) {
359       sender.setLeaving(true);
360       switch (status) {
361         /* IDLE PHASE */
362       case S_IDLE:
363         synchronizationPhaseInit();
364         if (log.isDebugEnabled())
365           log.debug("Leaving in S_IDLE: " );
366         break;
367         /* SYNCHRONIZATION PHASE */
368       case S_SYNCH:
369         estimate.remove(src);
370         if (log.isDebugEnabled())
371           log.debug("Leaving in S_SYNCH: EST= " + estimate);
372         checkSynchronization();
373         break;
374         /* ESTIMATE EXCHANGE PHASE */
375       case S_ESTIMATE:
376         estimate.remove(src);
377         if (log.isDebugEnabled())
378           log.debug("Leaving in S_ESTIMATE: EST= " + estimate);
379         sendEstimate();
380         break;
381       }
382     }
383   }
384 
385   /**
386    *  Handles the crash of the local member identified by the given
387    *  local identifier. This method is invoked by <CODE>Daemon</CODE>
388    *  on all groups included in the group table. It is up to the
389    *  <CODE>Group</CODE> objects to react to the crash.
390    *  
391    *  @param id the identifier of the crashed member
392    */
393   void handleLocalSuspect(MemberId id)
394   {
395     MemberData md = (MemberData) tmembers.get(id.getLocalId());
396     if (md != null) {
397       /* The member belongs to this group */
398       if (log.isDebugEnabled())
399         log.debug("Group:handleLocalSuspect: " + id);
400       if (!md.isLeaving() && !md.isCrashed()) {
401         md.setCrashed();
402         nmembers--;
403         runAgreementProtocol();
404       }
405     }
406   }
407   
408   /**
409    *  Invoked by activateGroups
410    */
411   void handleRemoteSuspect(int nsuspects, EndPoint[] trset, EndPoint[] nrset, EndPoint[] nuset)
412   {
413     if (nsuspects > this.nsuspects) {
414       if (log.isDebugEnabled())
415         log.debug("nsuspects=" + nsuspects);
416       this.nsuspects = nsuspects;
417 
418       /*
419        * Construct symmetrical reachability information and send it to
420        * the newly reachable members (nrset) that were previously
421        * unreachable.
422        */
423       try {
424         MsgSymm tosend = MsgSymm.marshal(gid, me.getVersion(),
425           HostData.extract(thosts, nrset), rset, nrset.length);
426         mss.msend(R_SYMM, tosend, nrset);
427       } catch (IOException e) {
428         log.error("Unable to marshal symmetrical reachability message", e);
429         return;
430       }
431 
432       /* Set the reachability set */
433       rset = trset;
434 
435       switch (status) {
436     
437       case S_IDLE:
438         synchronizationPhaseInit();
439         break;
440     
441       case S_SYNCH:
442         estimate.remove(nuset);
443         if (log.isDebugEnabled())
444           log.debug(estimate);
445         checkSynchronization();
446         break;
447     
448       case S_ESTIMATE:
449         estimate.remove(nuset);
450         if (log.isDebugEnabled())
451           log.debug(estimate);
452         sendEstimate();
453         break;
454       }
455     }
456   }
457 
458 
459   ////////////////////////////////////////////////////////////////////////////////////////////
460   // Interactions with members
461   ////////////////////////////////////////////////////////////////////////////////////////////
462 
463   void handleLocalDlvrAck(DeliveryAck ack)
464   {
465     if (log.isDebugEnabled()) {
466       log.debug("host.pos=" + ack.getHostIndex() + ", mem.pos=" + ack.getMemberIndex());
467     }
468     LocalId ackSender = ack.getAckSender().getLocalId();
469     MemberData md = (MemberData) tmembers.get(ackSender);
470     md.removeAck(ack.getMessageId());
471 
472     log.assertLog(viewDesc.size() > ack.getHostIndex(),
473       "Host index of DeliveryAck is greater than the current view size ("
474       + viewDesc.size() + "): " + ack + "; msgStable: " + msgsStable);
475 
476     /*
477      *  Send the RESULT message (ack + info) to all hosts; in the
478      *  previous versions of the protocol, result info were sent using
479      *  the mss (by msending the message also to the sender).  The new
480      *  version takes care of locally delivering the message.
481      */
482     MsgResult result = new MsgResult(gid, pvid, ack.getMessageId(),
483       ack.getSender(), viewDesc.getDelivered(ack.getHostIndex()), 
484       ack.getHostIndex(), ack.getViewIndex(), ack.getResult());
485     try {
486       EndPoint[] receivers = viewDesc.getEndPoints();
487       MsgResult tosend = result.marshal(receivers.length);
488       sbuffer.insertMsgResult(result);
489       if (log.isDebugEnabled())
490         log.debug("Sending R_RESULT: " + tosend);
491       mss.msend(R_RESULT, tosend, receivers);
492     } catch (IOException e) {
493       log.error("Unable to marshal result message", e);
494       return;
495     }
496 
497     boolean stable = viewDesc.localDeliveryAck(ack);
498     if (log.isDebugEnabled()) {
499       log.debug("Handling deliveryACK (stable=" + stable + ")");
500       log.debug(viewDesc);
501     }
502     if (stable) {
503       msgsStable = true;
504       if (status == S_ESTIMATE) {
505         try {
506           int[] delivered = viewDesc.getDelivered(estimate.getHosts());
507           proposal.encodeMsgsProp(delivered);
508           sendProposal();
509         } catch (IOException e) {
510           log.error("Unable to encode messages proposal", e);
511           return;
512         }
513       }
514     }
515   }
516 
517 
518   /**
519    *  Handles a multicast request generated by a member.
520    * 
521    *  @param request the multicast request
522    */
523   void handleMulticastRequest(MulticastRequest request)
524   {
525     if (log.isDebugEnabled())
526       log.debug(request);
527 
528     // Message to send
529     MsgMcast msg = null;
530     try {
531       msg = new MsgMcast(request);
532     } catch (IOException e) {
533       log.error("Cannot marshal multicast message", e);
534       return;
535     }
536 
537     if (status == S_IDLE) {
538       acceptMessage(msg, false);
539     } else if (!installationComplete) {
540       if (log.isDebugEnabled())
541         log.debug("View installation for local members not complete");
542       acceptMessage(msg, true);
543     } else {
544       if (log.isDebugEnabled())
545         log.debug(viewDesc);
546       ChainIdentifier chainId = request.getChainId();
547       if (chainId != null) {
548         EndPoint orgSender = chainId.getOriginalSender().getEndPoint();
549         HostData hd = (HostData) thosts.get(orgSender);
550         assert hd != null : "Unknown chain sender";
551         assert hd.hasValidViewIndex() : "Uninitialized chain sender";
552         if (log.isDebugEnabled())
553           log.debug("Received chained message: " + chainId);
554         if (!viewDesc.isNew(hd.getViewIndex(), chainId.getMsgId())) {
555           if (log.isDebugEnabled())
556             log.debug("Accepting chained message: " + chainId);
557           /*
558            * Since this multicast is chained with another multicast
559            * being processed by the Dispatcher thread, and since we
560            * are not in the IDLE state, this message should be
561            * considered 'late' and thus R_FORWARD will be used to
562            * pass this chained message.  This forwarded message will
563            * be processed in the handleMsgDlvrd() method at the receiver
564            * side.
565            */
566           acceptMessage(msg, true);
567         } else {
568           assert false : "Chain message cannot be new";
569         }
570       } else {
571         delayedSent.insert(msg);
572         if (log.isDebugEnabled())
573           log.debug("Message (" + msg.mid + ") put in the delayedSent queue: " + delayedSent);
574       }
575     }
576   }
577 
578 
579   void handleLocalPrepareAck(PrepareAck event)
580   {
581     if (log.isDebugEnabled())
582       log.debug(event);
583     // Get sender of the PrepareAck event
584     MemberData md = (MemberData) tmembers.get(event.getMemberId().getLocalId());
585     if (!md.isPrepared() && !md.isCrashed()) {
586       md.setPrepared();
587       if (log.isDebugEnabled())
588         log.debug(md + " is prepared");
589     }
590   }
591 
592 
593   void handleLocalViewAck(InstallAck upcall)
594   {
595     installationComplete = viewDesc.localViewAck(upcall);
596     if (log.isDebugEnabled()) {
597       log.debug("Local view is " + (installationComplete ? "stable" : "not stable"));
598     }
599     if (status == S_ESTIMATE)
600       sendProposal();
601   }
602 
603 
604   void handleLocalJoin(JoinRequest event, MssUser mssuser)
605   {
606     if (log.isDebugEnabled())                    
607       log.debug(event);
608 
609     MemberId id = event.getMemberId();
610     MemberData md = new MemberData(id, event.getDispatcher());
611     md.schedulePingTimer(mssuser);
612     if (tmembers.put(id.getLocalId(), md) == null) {
613       nmembers++;
614       runAgreementProtocol();
615     }
616   }
617 
618 
619   /**
620    *  This method handle a leave request coming from a member.
621    */
622   void handleLeaveRequest(LeaveRequest event)
623   {
624     if (log.isDebugEnabled())
625       log.debug(event);
626 
627     // Sender member and additional data
628     MemberId id = event.getMemberId();
629     MemberData md = (MemberData) tmembers.get(id.getLocalId());
630     md.cancelPingTimer();
631     /* 
632      *  Note: I don't expect that a crashed member can send a leave
633      *  request ;-) But members request and crash notifications may
634      *  end up in the Mss queue in unpredictable order, so it is
635      *  safer to perform the check in this way
636      */
637     if (!md.isLeaving() && !md.isCrashed()) {
638       md.setLeaving();
639       if (log.isDebugEnabled())
640         log.debug(md + " is leaving");
641       nmembers--;
642       runAgreementProtocol();
643     }
644   }
645 
646 
647   ////////////////////////////////////////////////////////////////////////////////////////////
648   // Interactions with remote hosts (group-related messages)
649   ////////////////////////////////////////////////////////////////////////////////////////////
650 
651   /**
652    *  Handles a mcast invocation received from a remote host; if the host
653    *  is in Estimate Exchange Phase, the message is buffered; otherwise,
654    *  if the view of the message corresponds to the current one, the
655    *  message is locally delivered and an ack message is m-sent.
656    *
657    *  @param     msg     the m-received message.
658    *  @param     src     the sender of the message.
659    */
660   void handleMsgMcast(MsgMcast msg, EndPoint src)
661   {
662     if (log.isDebugEnabled()) {
663       log.debug("Handle mcast msg (" + msg.mid + ") received from " + src);
664     }
665 
666     if (status == S_ESTIMATE) {
667       delayedRcvd.insert(msg);
668       if (log.isDebugEnabled()) {
669         log.debug("Message buffered in receive queue: " + delayedRcvd);
670         log.debug(viewDesc);
671       }
672     } else if (pvid == msg.vid) {
673       boolean newCrashed = viewDesc.notifyMessage(gid, msg);
674       if (log.isDebugEnabled())
675         log.debug("Multicast message being delivered (stable=false)");
676       msgsStable = false;
677       /*
678        * If the current view size is equal to one, it contains just the
679        * local host; there is no need to send this message to it
680        */
681       if (viewDesc.size() > 1) {
682         try {
683           MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
684             viewDesc.getAcks(), viewDesc.size());
685           mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
686         } catch (IOException e) {
687           log.error("Cannot marshal ACK message", e);
688         }
689       }
690       /*
691        *  If the message has not been delivered because of a new
692        *  crashed hosts, start a new agreement protocol (if needed),
693        *  or complete the member list and send a proposal, otherwise.
694        */
695       if (newCrashed) {
696         if (log.isDebugEnabled()) {
697           log.debug("Detected new crashed host; running agreement protocol");
698         }
699         runAgreementProtocol();
700       }
701     } else {
702       log.warn("Received a multicast message with incorrect view id: "
703         + msg.vid + " (" + pvid + ")");
704     }
705   }
706 
707   /**
708    *
709    *
710    *  @param     msg     the m-received message.
711    *  @param     src     the sender of the message.
712    */
713   void handleMsgAck(MsgAck msg, EndPoint src)
714   {
715     if (log.isDebugEnabled())
716       log.debug("handleMsgAck: " + msg+ " from src = " + src);
717 
718     /* Set the ack */
719     if (pvid == msg.vid) {
720       viewDesc.remoteMessageAck(msg);
721     } else {
722       log.warn("Received an ack message with incorrect view id: " + msg.vid + " (" + pvid + ")");
723     }
724   }
725 
726 
727   /**
728    *  First of all, the method checks whether (i) the group is known;
729    *  (ii) the view associated to the message is equal to the current
730    *  one; (iii) the messge is unknown; (iv) the forwarding sender is
731    *  still in the estimate. If so, the message is delivered and an ack
732    *  message is sent. The message is forwarded again to the remaining
733    *  members.
734    *
735    *  @param     msg     the m-received message.
736    *  @param     src     the sender of the message.
737    */
738   void handleMsgDlvrd(MsgMcast msg, EndPoint src)
739   {
740     if (log.isDebugEnabled())
741       log.debug("handleMsgDlvrd: " + msg + " from src = " + src);
742 
743     if ( pvid == msg.vid && estimate.contains(src) && viewDesc.isNew(msg.hpos, msg.mid) ) {
744       boolean newCrashed = viewDesc.notifyMessage(gid, msg);
745       if (log.isDebugEnabled())
746         log.debug("Delivered multicast message (stable=false)");
747       msgsStable = false;
748 
749       /*
750        * If the current view size is equal to one, it contains just the
751        * local host; there is no need to send this message to it
752        */
753       if (viewDesc.size() > 1) {
754         /* Sends an ack message */
755         try {
756           MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
757               viewDesc.getAcks(), viewDesc.size());
758           mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
759         } catch (IOException e) {
760           log.error("Cannot marshal ACK message", e);
761         }
762       }
763       
764       /*  
765        *  During ESTIMATE EXCHANGE phase, forwards the message. If the 
766        *  current estimate size is equal to one, it contains just the
767        *  local host; there is no need to send this message to it
768        */
769       if (status == S_ESTIMATE && estimate.size() > 1)
770         mss.msend(R_FORWARD, msg, estimate.getEndPoints());
771       
772       /*
773        *  If the message has not been delivered because of a new
774        *  crashed hosts, start a new agreement protocol (if needed),
775        *  or complete the member list and send a proposal, otherwise.
776        */
777       if (newCrashed) {
778         if (log.isDebugEnabled()) {
779           log.debug("Detected new crashed host; running agreement protocol");
780         }
781         runAgreementProtocol();
782       }
783     }
784   }
785 
786   /**
787    *
788    *  @param     msg     the m-received message.
789    *  @param     src     the sender of the message.
790    */
791   void handleMsgSynch(MsgSynch msg, EndPoint src)
792   {
793     HostData sender = thosts.get(src);
794     if (sender != null) {
795       if (log.isDebugEnabled()) {
796         log.debug(msg);
797         log.debug("  Sender: " + sender);
798         log.debug("Receiver: " + me);
799       }
800 
801       switch(status) {
802 
803       /*  IDLE PHASE */
804       case S_IDLE:
805         if (log.isDebugEnabled())
806           log.debug("Group: Idle phase");
807         if (sender.getVersion() < msg.vsend) {
808           sender.setVersion(msg.vsend);
809           if (Util.in(rset, src))
810             synchronizationPhaseInit();
811         }
812         break;
813 
814       /* SYNCHRONIZATION PHASE */
815       case S_SYNCH:
816         if (log.isDebugEnabled()) {
817           log.debug("Group: S phase sender version: " + sender.getVersion() + " <? " + msg.vsend);
818           log.debug("Group: S phase local version:  " + me.getVersion() + " ==? " + msg.vdest);
819         }
820         if (me.getVersion() == msg.vdest) {
821           estimate.setSynchronized(src);
822           if (log.isDebugEnabled())
823             log.debug(estimate);
824           /*
825            * Note that the algorithm in the thesis (fig. 5.7 page 91) is slightly different
826            * than this; here the agreed for sender is only set if the sender is synchronized, while
827            * the thesis version sets it as long as the local version number of the sender is less than
828            * that of the sender (msg.vsend).  It is likely that this version is correct.
829            */
830           sender.setAgreed(msg.vsend);
831         }
832         if (sender.getVersion() < msg.vsend) {
833           //FIXME The thesis claims that this updating should be done here rather than above:
834 //          sender.setAgreed(msg.vsend);
835 
836           sender.setVersion(msg.vsend);
837           EndPoint[] symset = grouptable.getSymset(src);
838           try {
839             MsgSynch msynch = MsgSynch.marshal(gid, me.getVersion(), sender.getVersion(), symset);
840             if (log.isDebugEnabled())
841               log.debug("Sending R_SYNCH to " + src);
842             mss.send(R_SYNCH, msynch, src);
843           } catch (IOException e) {
844             log.error("Unable to marshal synchronization message", e);
845             return;
846           }
847         }
848         checkSynchronization();
849         break;
850         
851       /* ESTIMATE EXCHANGE PHASE */
852       case S_ESTIMATE:
853         if (log.isDebugEnabled())
854           log.debug("Group: EE phase ");
855         if (sender.getVersion() < msg.vsend)
856           sender.setVersion(msg.vsend);
857         if (log.isDebugEnabled())
858           log.debug("Group: sender.agreed " + sender.getAgreed() + " msg.vsend " + msg.vsend);
859         if (sender.getAgreed() < msg.vsend && estimate.contains(src)) {
860           estimate.remove(msg.rset);
861           if (log.isDebugEnabled())
862             log.debug(estimate);
863           sendEstimate();
864         }
865 
866       } // END switch
867     }
868   }
869 
870 
871   /**
872    *
873    *  @param     msg     the m-received message.
874    *  @param     src     the sender of the message.
875    */
876   void handleMsgSymm(MsgSymm msg, EndPoint src)
877   {
878     if (log.isDebugEnabled())
879       log.debug(msg + " from src=" + src);
880 
881     // In the original algorithm, MsgSymm messages are simply discard 
882     // when received in the IDLE phase.
883     if (status == S_IDLE)
884       return;
885 
886     int pos = Util.valueAt(msg.hosts, myaddress);
887     if (pos >= 0) {
888       int vdest = msg.version[pos];
889 
890       if (status == S_SYNCH) {             // SYNCHRONIZATION PHASE
891         if (log.isDebugEnabled())
892           log.debug("Group:MsgSymm: S PHASE");
893         if (estimate.contains(src) && me.getVersion() == vdest) {
894           if (log.isDebugEnabled())
895             log.debug("Group:MsgSymm: accepted");
896           estimate.remove(msg.rset);
897           if (log.isDebugEnabled())
898             log.debug(estimate);
899           checkSynchronization();
900         }
901 
902       } else if (status == S_ESTIMATE) {   // ESTIMATE EXCHANGE PHASE
903         if (log.isDebugEnabled())
904           log.debug("Group:MsgSymm: EE PHASE");
905         HostData sender = estimate.get(src);
906         // FIXME the commented line below is the thesis version;
907         // which appears to be wrong, but needs to be verified
908 //        if (sender != null && (me.getAgreed() == vdest || sender.getAgreed() <= msg.vsend)) { // Thesis version
909         if (sender != null && me.getAgreed() == vdest && sender.getAgreed() == msg.vsend) {
910           if (log.isDebugEnabled()) {
911             log.debug("Group:MsgSymm: accepted " + estimate);
912             for (EndPoint endp : msg.rset) {
913               log.debug("removing from estimate: " + endp);
914             }
915           }
916           estimate.remove(msg.rset);
917           if (log.isDebugEnabled())
918             log.debug(estimate);
919           sendEstimate();
920         }
921       }
922     }
923   }
924 
925   /**
926    *
927    * @param     msg     the m-received message.
928    * @param     src     the sender of the message.
929    */
930   void handleMsgEstim(MsgEstim msg, EndPoint src)
931   {
932     if (log.isDebugEnabled())
933       log.debug("handleMsgEstim: " + msg + " from src = " + src);
934 
935     /*
936      * Locate my position in the estimate in order to determine the
937      * estimate agreed value for me.  If I'm not in the estimate, I
938      * discard it; do nothing.
939      */
940     int pos = Util.valueAt(msg.hosts, myaddress);
941     if (pos >= 0) {
942       int vdest = msg.agreed[pos];
943       if (log.isDebugEnabled()) {
944         log.debug("Group:vdest " + vdest);
945         log.debug("Group:me.getVersion() " + me.getVersion());
946       }
947 
948       if (status == S_SYNCH) {           // SYNCHRONIZATION PHASE
949 
950         HostData sender = thosts.get(src);
951         /* If the sender is valid */
952         if (sender != null) {
953           if (log.isDebugEnabled())
954             log.debug(sender + " is valid");
955           if (sender.getVersion() < msg.vsend)
956             sender.setVersion(msg.vsend);
957           if (!estimate.contains(src)) {
958             if (log.isDebugEnabled())
959               log.debug("Sending MsgSymm to " + src);
960             try {
961               MsgSymm tosend = MsgSymm.marshal(gid, me.getVersion(), sender,
962                   estimate.getEndPoints(), 1);
963               mss.send(R_SYMM, tosend, src);
964             } catch (IOException e) {
965               log.error("Unable to marshal symmetrical reachability message", e);
966               return;
967             }
968           } else if (me.getVersion() == vdest) {
969             if (log.isDebugEnabled())
970               log.debug("Before intersect: " + estimate);
971             estimate.intersect(msg.hosts);
972             estimate.updateAgreed(msg.hosts, msg.agreed);
973             if (log.isDebugEnabled())
974               log.debug(" After intersect: " + estimate);
975             checkSynchronization();
976           }
977         } else {
978           if (log.isDebugEnabled())
979             log.debug(sender + " is not valid");
980         }
981 
982       } else if (status == S_ESTIMATE) {         // ESTIMATE EXCHANGE PHASE
983 
984         if (estimate.contains(src) && estimate.checkAgreed(msg.hosts, msg.agreed)) {
985           estimate.intersect(msg.hosts);
986           if (log.isDebugEnabled())
987             log.debug(estimate);
988           sendEstimate();
989         } else {
990           // Only for debugging
991           if (log.isDebugEnabled()) {
992             log.debug(estimate);
993             if (!estimate.contains(src))
994               log.debug("Estimate does not contain " + src);
995           }
996         }
997       }
998     } else if (status == S_ESTIMATE && estimate.contains(src)) {
999       /*
1000        * Line 24-25 of Fig. 5.8 pg 94 of Alberto's thesis.
1001        */
1002       if (log.isDebugEnabled())
1003         log.debug("p not in P");
1004       HostData sender = thosts.get(src);
1005       /* If the sender is valid */
1006       if (sender != null && sender.getAgreed() <= msg.vsend) {
1007         estimate.intersect(msg.hosts);
1008         if (log.isDebugEnabled())
1009           log.debug(estimate);
1010         sendEstimate();
1011       }
1012     }
1013   }
1014 
1015 
1016   /**
1017    *
1018    *  @param     msg     the m-received message.
1019    *  @param     src     the sender of the proposal message.
1020    */
1021   void handleMsgProp(MsgProp msg, EndPoint src)
1022     throws IOException, ClassNotFoundException
1023   {
1024     if (log.isDebugEnabled())
1025       log.debug("handleMsgProp: " + msg + " from src = " + src);
1026 
1027     /* Identify the sender */
1028     HostData sender = thosts.get(src);
1029     
1030     /* If the sender is valid */
1031     if ( status != S_IDLE && sender != null && estimate.contains(src) ) {
1032       sender.setProposal(msg);
1033       if (log.isDebugEnabled())
1034         log.debug("handleMsgProp: sender is valid");
1035 
1036       if (status == S_ESTIMATE && checkAgreement()) {
1037         if (log.isDebugEnabled())
1038           log.debug("handleMsgProp: ESTIMATE PHASE; agreement reached");
1039 
1040         // Creates a new complete view identifier and decodes the next view composition
1041         long cvid = ViewId.create(myaddress, ++ncoordinated);
1042         EndPoint[] cvcomp = me.getProposal().decodeHostProp_hosts();
1043 
1044         /*
1045          * Allocates arrays
1046          */
1047         
1048         // View composition length
1049         int len = cvcomp.length;
1050         
1051         // Proposals
1052         MsgProp[] props = new MsgProp[len];
1053         
1054         // Previous view identifiers
1055         long[] prev_vid = new long[len];
1056         
1057         // Previous view compositions
1058         EndPoint[][] prev_vcomp  = new EndPoint[len][];
1059 
1060         // Decoded local members
1061         LocalId[][] members = new LocalId[len][];
1062 
1063         // Destination list
1064         EndPoint[] dlist = new EndPoint[len];
1065 
1066         // Debug related StringBuilder
1067         StringBuilder buf = new StringBuilder("Proposal: ");
1068 
1069         int i,j,k;
1070 
1071         // Initializes props and dlist
1072         for (i=0; i < len; i++) {
1073           HostData host = (HostData) thosts.get(cvcomp[i]);
1074           if (host == null)
1075             throw new IllegalStateException("Corrupted proposal; unknown host in proposal");
1076           if (log.isDebugEnabled()) {
1077             buf.append(cvcomp[i]);
1078             buf.append(" ");
1079           }
1080           props[i] = host.getProposal();
1081           dlist[i] = host.getEndPoint();
1082 
1083           // Decodes proposals
1084           prev_vid[i]   = props[i].cvid;
1085           prev_vcomp[i] = props[i].decodeLastProp();
1086           members[i]    = props[i].decodeMembProp();
1087         }
1088         if (log.isDebugEnabled()) 
1089           log.debug(buf.toString());
1090 
1091         // Checks whether partial views are needed
1092         // FIXME OPTIMIZATION: Note that this check could be performed in O(n^2)
1093         boolean partial = false;
1094         for (i=0; i < len && !partial; i++)
1095           for (j=0; j < prev_vcomp[i].length && !partial; j++) {
1096             k = Util.valueAt(cvcomp, prev_vcomp[i][j]);
1097             partial = ( k >= 0 && prev_vid[i] != prev_vid[k] );
1098           }
1099 
1100         // Constructs partial view index
1101         int[] pvids = new int[len];
1102         int pvid = 0;
1103         if (partial) {
1104           int npartial = 0;
1105           long[] diff_vid = new long[len];
1106           for (i=0; i < len; i++) {
1107             for (j=0; j < npartial && prev_vid[i] != diff_vid[j]; j++);
1108             if (j == npartial) {
1109               diff_vid[j] = prev_vid[i];
1110               npartial++;
1111             }
1112             pvids[i] = j;
1113             if (cvcomp[i].equals(myaddress)) {
1114               pvid = j;
1115             }
1116           }
1117           ncoordinated += npartial;
1118         }
1119 
1120         /* 
1121          * M-send the view message containing the information
1122          * Note: in the previous versions, mss.rmsend was used here. rmsend
1123          * has been removed; instead, daemons receiving and installing a
1124          * view must msend it to the other members in the view itself.
1125          * This costs n multicast messages in a cluster of n daemons, but
1126          * is needed to avoid rare scenarios in which only part of the
1127          * group members receive a view. 
1128          * FIXME OPTIMIZATION: instead of forwarding the view, store it
1129          * and sends only when the coordinator is suspected. In this way
1130          * we optimize the common case.
1131          */
1132         /*
1133          * If dlist contains just one host, this is the local host.
1134          * There is no need to send the view to it, as the next lines
1135          * will perform a local installation without passing through the
1136          * mss.
1137          */
1138         if (dlist.length > 1) {
1139           MsgView tosend = MsgView.marshal(gid, cvid, me.getProposal(),
1140             pvids, props, dlist.length);
1141           mss.msend(R_VIEW, tosend, dlist);
1142         }
1143 
1144         // Locally installs the view
1145         int[] incarns = me.getProposal().decodeHostProp_incarns();
1146         installView(cvid, cvcomp, incarns, members, pvids, pvid);
1147       }
1148     }
1149   }
1150 
1151   /**
1152    *  Locates the position of this host in the complete view and checks
1153    *  whether the version number in the message is equal to the current
1154    *  one.
1155    *
1156    *  @param     msg     the m-received message.
1157    *  @param     src     the sender of the message.
1158    */
1159   void handleMsgView(MsgView msg, EndPoint src)
1160   {
1161     if (log.isDebugEnabled()) {
1162       log.debug("handleMsgView: " + msg + " from src = " + src);
1163       log.debug(toString(true));
1164     }
1165     if (status == S_ESTIMATE && estimate.contains(src)) {
1166       int pos = Util.valueAt(msg.cvcomp, myaddress);
1167       if (pos >= 0 && msg.agreed[pos] == ninstalled+1) {
1168         /* 
1169          * Re-send the view to all members, to guarantee reliability.
1170          * FIXME OPTIMIZATION: should be optimized by postponing the
1171          * send when the coordinator is suspected.
1172          */
1173         mss.msend(R_VIEW, msg.marshal(), msg.cvcomp);
1174         /* Install the view. */
1175         installView(msg.cvid, msg.cvcomp, msg.incarns, msg.members, msg.pvids, msg.pvids[pos]);
1176       }
1177     }
1178   }
1179 
1180   /**
1181    *
1182    *  @param     msg     the m-received message.
1183    *  @param     src     the sender of the message.
1184    */
1185   void handleMsgResult(MsgResult msg, EndPoint src)
1186   {
1187     if (log.isDebugEnabled())
1188       log.debug("handleMsgResult: " + msg + " from src = " + src);
1189     sbuffer.insertMsgResult(msg);
1190   }
1191 
1192 
1193   /**
1194    *  Method invoked in response to receiving a JOIN message from a
1195    *  member that wants to join the specified group.
1196    *
1197    *  @param msg
1198    *    The m-received <code>MsgGroup</code> message.
1199    *  @param src
1200    *    The sender <code>EndPoint</code> of the JOIN message; that is
1201    *    the joining member endpoint.
1202    */
1203   boolean handleMsgJoin(MsgGroup msg, EndPoint src)
1204   {
1205     if (log.isDebugEnabled())
1206       log.debug("JOIN:" + msg);
1207     /*
1208      * Received a JOIN message, notify membership service to prepare for
1209      * view change and reply with a JOINACK to the joining member.
1210      */
1211     try {
1212       MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1213       mss.send(R_JOINACK, tosend, src);
1214     } catch (IOException e) {
1215       log.error("Unable to marshal group message", e);
1216     }
1217     /* Invoke code in common with receiving a JOINACK message */
1218     return handleMsgJoinAck(msg, src);
1219   }
1220 
1221 
1222   /**
1223    *  Method invoked in response to receiving a JOINACK message from the
1224    *  joining member.
1225    *
1226    *  @param msg
1227    *    The m-received <code>MsgGroup</code> message.
1228    *  @param src
1229    *    The sender <code>EndPoint</code> of the JOINACK message; that is
1230    *    the joining member endpoint.
1231    */
1232   boolean handleMsgJoinAck(MsgGroup msg, EndPoint src)
1233   {
1234     if (log.isDebugEnabled())
1235       log.debug("JOINACK: " + msg);
1236 
1237     /* Search the hosts in the hosttable */
1238     MssHost[] hosts = new MssHost[msg.hosts.length];
1239     for (int i=0; i < hosts.length; i++)
1240       hosts[i] = hosttable.lookup(msg.hosts[i]);
1241     MssHost srcHost = hosttable.lookup(src);
1242 
1243     /* Join the group */
1244     return join(srcHost, hosts);
1245   }
1246 
1247 
1248   ////////////////////////////////////////////////////////////////////////////////////////////
1249   // Methods related to the total composition of the group
1250   ////////////////////////////////////////////////////////////////////////////////////////////
1251 
1252   /**
1253    *  Removes <code>MemberData</code> items of members that have
1254    *  requested to leave or have been excluded from the current view.
1255    *  It also updates the prepared status for any remaining members.
1256    */
1257   private void removeLeavingMembers()
1258   {
1259     /* 
1260      * Declares an ACK event for the leave operation; it will be
1261      * instantiated only when needed.  This is used to notify the member
1262      * that it has now left the group.
1263      */
1264     MemberLeftEvent event = null;
1265     
1266     /* Search local leaving members */
1267     for (Iterator iter = getMembers().iterator(); iter.hasNext(); ) {
1268       MemberData member = (MemberData) iter.next();
1269       if (log.isDebugEnabled()) {
1270         log.debug("Checking member: " + member);
1271       }
1272       /* Remove member because it has requested to leave */
1273       if (member.isLeaving()) {
1274         /* Creates an ACK event, only when needed */
1275         if (event == null)
1276           event = new MemberLeftEvent(gid);
1277         boolean success = DaemonInteraction.addEvent(member, event);
1278         if (!success)
1279           member.setCrashed();
1280         /* Removes 'member' from the underlying tmembers HashMap. */
1281         iter.remove();
1282       } else if (member.isCrashed()) {
1283         iter.remove();
1284       } else {
1285         /* Reset the prepared status for this remaining member. */
1286         member.resetPrepared();
1287       }
1288     }
1289   }
1290 
1291 
1292   /**
1293    *  Removes <code>HostData</code> items that have requested to leave
1294    *  or have been excluded from the current view.
1295    */
1296   private void removeLeavingHosts()
1297   {
1298     for (Iterator<HostData> iter = getHosts().iterator(); iter.hasNext(); ) {
1299       HostData host = iter.next();
1300       if (log.isDebugEnabled())
1301         log.debug("Checking host: " + host);
1302       if (host.isLeaving() && !host.getEndPoint().isLocal()) {
1303         if (log.isDebugEnabled())
1304           log.debug("Sending R_LEAVE to: " + host);
1305         try {
1306           MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1307           mss.send(R_LEAVE, tosend, host.getEndPoint());
1308         } catch (IOException e) {
1309           log.error("Unable to marshal group message", e);
1310         }
1311         /* Removes 'host' from the underlying thosts HashMap. */
1312         iter.remove();
1313       }
1314     }
1315   }
1316 
1317 
1318   /**
1319    *  Send leave notification to all other hosts known to this group.
1320    */
1321   void sendLeave()
1322   {
1323     for (EndPoint endpoint : thosts.keySet()) {
1324       if (!endpoint.isLocal()) {
1325         if (log.isDebugEnabled())
1326           log.debug("Sending R_LEAVE to: " + endpoint);
1327         try {
1328           MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1329           mss.send(R_LEAVE, tosend, endpoint);
1330         } catch (IOException e) {
1331           log.error("Unable to marshal group message", e);
1332         }
1333       }
1334     }
1335     /*
1336      * Perform leave on the local member last to avoid starting the agreement
1337      * protocol before having sent all the leave requests.
1338      */
1339     if (log.isDebugEnabled())
1340       log.debug("Leaving: " + myaddress + "; it is the localhost");
1341     leave(myaddress);
1342   }
1343 
1344 
1345   ////////////////////////////////////////////////////////////////////////////////////////////
1346   // Methods related to the current view estimate
1347   ////////////////////////////////////////////////////////////////////////////////////////////
1348 
1349   /**
1350    *  Used to initialize agreement protocol.
1351    */
1352   private void runAgreementProtocol()
1353   {
1354     if (log.isDebugEnabled())
1355       log.debug("Running agreement protocol");
1356     if (status == S_IDLE) {
1357       synchronizationPhaseInit();
1358     } else if (status == S_ESTIMATE) {
1359       try {
1360         proposal.encodeMembProp(tmembers, nmembers);
1361       } catch (IOException e) {
1362         log.error("Unable to encode membership proposal", e);
1363         return;
1364       }
1365       sendProposal();
1366     }
1367   }
1368 
1369 
1370   /**
1371    *  Initialize the synchronization phase; we are in the process of running
1372    *  a new agreement protocol to agree on a new view.
1373    */
1374   private void synchronizationPhaseInit()
1375   {
1376     if (log.isDebugEnabled())
1377       log.debug("Preparing ESTIMATE for SYNCHRONIZATION phase");
1378       /* Initialize a new estimate */
1379       if (estimate == null) {
1380         estimate = new Estimate(sbuffer, thosts, rset);
1381       } else {
1382         estimate.reset(thosts, rset);
1383       }
1384  
1385       /* Enters SYNCHRONIZATION phase */
1386       status = S_SYNCH;
1387       if (log.isDebugEnabled()) {
1388         MDC.put("group", toString());
1389       log.debug("Entering SYNCHRONIZATION phase");
1390     }
1391 
1392     /* 
1393      * We are in the process of running a new agreement protocol;
1394      * notify local members that the current view has become invalid.
1395      */
1396     PrepareEvent prepareEvent = null;
1397     for (Iterator iter = getMembers().iterator(); iter.hasNext();) {
1398       MemberData md = (MemberData) iter.next();
1399       if (!md.isPrepared() && !md.isLeaving() && !md.isCrashed()) {
1400         /* Creates the PrepareEvent, only when needed */
1401         if (prepareEvent == null)
1402           prepareEvent = new PrepareEvent(gid);
1403         if (log.isDebugEnabled())
1404           log.debug("Sending PrepareEvent to " + md);
1405         boolean success = DaemonInteraction.addEvent(md, prepareEvent);
1406         if (!success)
1407           md.setCrashed();
1408       }
1409     }
1410 
1411     /* Increment the version number */
1412     me.newVersion();
1413     
1414     /* Send a SYNCH message to the hosts in the estimate */
1415     Iterator iterator = estimate.getHosts().iterator();
1416     while (iterator.hasNext()) {
1417       HostData host = (HostData) iterator.next();
1418       if (!host.equals(me)) {
1419         EndPoint[] symset = grouptable.getSymset(host.getEndPoint());
1420         try {
1421           MsgSynch tosend = MsgSynch.marshal(gid, me.getVersion(), host.getVersion(), symset);
1422           if (log.isDebugEnabled())
1423             log.debug("Sending R_SYNCH to " + host.getEndPoint());
1424           mss.send(R_SYNCH, tosend, host.getEndPoint());
1425         } catch (IOException e) {
1426           log.error("Unable to marshal synchronization message", e);
1427           return;
1428         }
1429       }
1430     }
1431     
1432     /* If synchronization has ended, go to ESTIMATE EXCHANGE phase */
1433     checkSynchronization();
1434   }
1435 
1436 
1437   /**
1438    * Check if all in estimate has been synchronized; if so enter the
1439    * ESTIMATE EXCHANGE phase.  Otherwise, do nothing and await further
1440    * R_SYNCH messages.
1441    */
1442   private void checkSynchronization()
1443   {
1444     if (estimate.isSynchronized())
1445       estimateExchangePhaseInit();
1446   }
1447 
1448 
1449   /**
1450    *
1451    */
1452   private void estimateExchangePhaseInit()
1453   {
1454     /* Enter the estimate exchange phase */
1455     status = S_ESTIMATE;
1456     if (log.isDebugEnabled()) {
1457       MDC.put("group", toString());
1458       log.debug("Group:estimateExchangePhaseInit: start");
1459       log.debug(estimate);
1460     }
1461 
1462     try {
1463 
1464       /*
1465        * Send a new estimate.  If the current estimate size is equal to
1466        * one, it contains just the local host; there is no need to send
1467        * this message to it.
1468        */
1469       if (estimate.size() > 1) {
1470         MsgEstim tosend = MsgEstim.marshal(gid, me.getVersion(), estimate.getHosts());
1471         mss.msend(R_ESTIM, tosend, estimate.getEndPoints());
1472       }
1473 
1474       /* Builds a new proposal and (possibily) sends it */
1475       proposal.encodeLastProp(viewDesc.getEndPoints());
1476       proposal.encodeHostProp(estimate.getHosts());
1477       proposal.encodeMembProp(tmembers, nmembers);
1478       if (msgsStable) {
1479         int[] delivered = viewDesc.getDelivered(estimate.getHosts());
1480         proposal.encodeMsgsProp(delivered);
1481         sendProposal();
1482       } else {
1483         if (log.isDebugEnabled()) {
1484           log.debug("Messages are not stable: ");
1485           log.debug(viewDesc);
1486         }
1487       }
1488     } catch (IOException e) {
1489       log.error("Unable to encode proposal for the EE-phase", e);
1490       return;
1491     }
1492 
1493     /* Forward unacknowledged messages to the other members */
1494     viewDesc.forwardMessages(mss);
1495     
1496     if (log.isDebugEnabled()) {
1497       log.debug("Group:estimateExchangePhaseInit: stop");
1498     }
1499   }
1500 
1501 
1502   /**
1503    *  Sends a new view proposal to the coordinator.
1504    */
1505   private void sendProposal()
1506   {
1507     /*
1508      * This loop only logs problems if a member has outstanding ACKs while
1509      * it is deemed that messages are stable.  This happens rarely, therefore
1510      * we need to collect data for these cases.
1511      */
1512     for (MemberData md : tmembers.values()) {
1513       log.assertLog(!(md.hasOutstandingAcks() && msgsStable), md + " has outstanding ACKs while msgs are stable");
1514     }
1515     if (log.isDebugEnabled())
1516       log.debug("Messages are " + (msgsStable ? "stable": "not stable")); 
1517     if (msgsStable && installationComplete) {
1518 
1519       try {
1520         MsgProp msg = proposal.marshal(gid, cvid, pvid);
1521 
1522         /*
1523          * Obtain the coordinator; note that we should never get here if
1524          * the estimate is empty and no coordinator can be selected.
1525          */
1526         EndPoint coordinator = estimate.getCoordinator();
1527         if (coordinator.isLocal()) {
1528           /*
1529            * This deamon is the coordinator; so, locally deliver the
1530            * proposal message without passing it through the mss.
1531            */
1532           if (log.isDebugEnabled())
1533             log.debug("Group:sendProposal: local coordinator");
1534           handleMsgProp(new MsgProp(msg.getOutMessage()), coordinator);
1535 
1536         } else {
1537           /*
1538            * Send a proposal message to the coordinator (distinct from
1539            * this daemon).
1540            */
1541           if (log.isDebugEnabled())
1542             log.debug("Group:sendProposal: remote coordinator: " + coordinator);
1543           mss.send(R_PROP, msg, coordinator);
1544 
1545         }
1546       } catch (IOException e) {
1547         log.error("Unable to marshal and send proposal message", e);
1548       } catch (ClassNotFoundException e) {
1549         log.error("Unable to unmarshal (local) proposal message", e);
1550       }
1551     }
1552     if (log.isDebugEnabled())
1553       log.debug("Group:sendProposal: end");
1554   }
1555 
1556 
1557   /**
1558    *  If the estimate has changed (and is not empty), send it
1559    *  all members in the estimate.  If the message buffer is
1560    *  consistent between members of the previous view, send also
1561    *  a proposal to the coordinator.
1562    */
1563   private void sendEstimate()
1564   {
1565     if (estimate.hasChanged() && !estimate.isEmpty()) {
1566       try {
1567         /*
1568          * If estimate contains just one host, this is the local host.
1569          * There is no need to send the estimate to it, as the local
1570          * host is already aware of its estimate.
1571          */
1572         if (estimate.size() > 1) {
1573           MsgEstim tosend = MsgEstim.marshal(gid, me.getVersion(), estimate.getHosts());
1574           mss.msend(R_ESTIM, tosend, estimate.getEndPoints());
1575         }
1576         proposal.encodeHostProp(estimate.getHosts());
1577         if (msgsStable) {
1578           int[] delivered = viewDesc.getDelivered(estimate.getHosts());
1579           proposal.encodeMsgsProp(delivered);
1580           sendProposal();
1581         } else {
1582           if (log.isDebugEnabled()) {
1583             log.debug("Messages are not stable: ");
1584             log.debug(viewDesc);
1585           }
1586         }
1587       } catch (IOException e) {
1588         log.error("Unable to marshal estimate proposal message", e);
1589       }
1590     }
1591   }
1592 
1593   /**
1594    * Checks if an agreement has been established. A set of proposals are
1595    * in agreement if and only if: (i) all proposals contains the same estimate
1596    * for the next view, and (ii) if two proposals in the set have been generated
1597    * in the same view, the sending hosts must have delivered the same set
1598    * of messages. <br>
1599    * Point (i) is checked by verifying that all hosts have the same estimate
1600    * as the coordinator; method checkEstimate() compares two estimates.
1601    * Point (ii) is checked by verifying that given any pair of hosts in the
1602    * set, if they have the same view they must have delivered the same set
1603    * of messages. This check is done by checkMessages().
1604    */
1605   private boolean checkAgreement()
1606   {
1607     MsgProp proposalp = me.getProposal();
1608     if (proposalp == null) {
1609       if (log.isDebugEnabled())
1610         log.debug("checkAgreement: no local proposal found");
1611       return false;
1612     }
1613     if (log.isDebugEnabled())
1614       log.debug("checkAgreement: found my proposal " + me);
1615     for (HostData hostq : estimate.getHosts()) {
1616       MsgProp proposalq = hostq.getProposal();
1617       if (!proposalp.checkEstimate(proposalq)) {
1618         if (log.isDebugEnabled()) {
1619           log.debug("PROPOSAL DIFFERS FROM LOCAL ESTIMATE");
1620           log.debug("proposalp: " + proposalp);
1621           log.debug("proposalq: " + proposalq);
1622         }
1623         return false;
1624       }
1625       if (log.isDebugEnabled())
1626         log.debug("checkAgreement: ok estimate from " + hostq);
1627       for (HostData hostr : estimate.getHosts()) {
1628         // No need to compare the proposals of the same host
1629         if (hostq == hostr)
1630           continue;
1631         if (log.isDebugEnabled()) {
1632           log.debug("Checking proposals from: ");
1633           log.debug(hostq);
1634           log.debug(hostr);
1635         }
1636         MsgProp proposalr = hostr.getProposal();
1637         if (!proposalq.checkMessages(proposalr)) {
1638           if (log.isDebugEnabled()) {
1639             log.debug("PROPOSALS DIFFERS (MESSAGES)");
1640             log.debug("proposalq: " + proposalq);
1641             log.debug("proposalr: " + proposalr);
1642           }
1643           return false;
1644         }
1645       }
1646     }
1647     if (log.isDebugEnabled())
1648       log.debug("checkAgreement: all ok");
1649     return true;
1650   }
1651 
1652 
1653   /**
1654    *
1655    */
1656   private void acceptMessage(MsgMcast msg, boolean late)
1657   {
1658     if (log.isDebugEnabled())
1659       log.debug("acceptMessage; " + (late ? "late" : ""));
1660     try {
1661       msg.complete(viewDesc.getLocalIndex(), pvid, sbuffer.getMid());
1662     } catch (IOException e) {
1663       log.error("Cannot complete marshalling multicast message", e);
1664       return;
1665     }
1666 
1667     /* Add message to local buffer */
1668     sbuffer.insertMsgMcast(msg);
1669     
1670     /* Local delivery of the message */
1671     boolean newCrashed = viewDesc.notifyMessage(gid, msg);
1672     if (log.isDebugEnabled())
1673       log.debug("Accept multicast message (stable=false)");
1674 
1675     /* Unstable until message is delivered */
1676     msgsStable = false;
1677 
1678     /**
1679      *  If the current view size is equal to one, it contains just the
1680      *  local host; there is no need to send this message to it
1681      */
1682     if (viewDesc.size() > 1) {   
1683       if (late) {
1684         mss.msend(R_FORWARD, msg, viewDesc.getEndPoints());
1685       } else {
1686         mss.msend(R_MCAST, msg, viewDesc.getEndPoints());
1687       }
1688     }
1689   
1690     /*
1691      *  If the message has not been delivered because of a new
1692      *  crashed hosts, start a new agreement protocol (if needed),
1693      *  or complete the member list and send a proposal, otherwise.
1694      */
1695     if (newCrashed) {
1696       if (log.isDebugEnabled()) {
1697         log.debug("Detected new crashed host; running agreement protocol");
1698       }
1699       runAgreementProtocol();
1700     }
1701   }
1702 
1703 
1704   /**
1705    *
1706    */
1707   private void installView(long cvid, EndPoint[] hosts, int[] incarns,
1708                            LocalId[][] members, int[] pvids, int pid)
1709   {
1710     // Stores complete view information
1711     this.cvid = cvid;
1712 
1713     /*
1714      * Reset the view index for all hosts, indicating that there isn't
1715      * any hosts is in the current view.  This will be followed by
1716      * setting the view index to the correct value for hosts that are
1717      * included in the current view.
1718      */
1719     for (HostData hd : thosts.values()) {
1720       hd.resetViewIndex();
1721     }
1722 
1723     /*
1724      * Computes the host positions in the current view and removes
1725      * information related to hosts and members not participating in the
1726      * next (partial) view.
1727      */
1728     List<MemberId> memberSet = new ArrayList<MemberId>(hosts.length);
1729     Set<Integer> uniquePids = new HashSet<Integer>(pvids.length);
1730     LocalId[] localMembers = null;
1731     int hostCount = 0;
1732     int hpos = 0;
1733     for (int i = 0; i < hosts.length; i++) {
1734       uniquePids.add(pvids[i]);
1735       if (pvids[i] == pid) {
1736         if (hosts[i].equals(myaddress)) {
1737           hpos = hostCount;
1738           localMembers = members[i];
1739         }
1740         HostData hd = (HostData) thosts.get(hosts[i]);
1741         if (hd == null)
1742           throw new IllegalStateException("Estimate contains an unknown host");
1743         for (int j=0; j < members[i].length; j++) {
1744           memberSet.add(new MemberIdImpl(hd.getEndPoint(), incarns[i], members[i][j]));
1745         }
1746         /*
1747          * Reorder the hosts[] array; note that the 'hostCount' variable will always
1748          * be less than or equal to 'i' and hence it should not cause problems for
1749          * the iteration over the hosts[] array.
1750          */
1751         hosts[hostCount] = hosts[i];
1752         hd.setViewIndex(hostCount++);
1753       }
1754     }
1755 
1756     // Stores current view information
1757     viewDesc = new ViewDescription(thosts, hosts, rset, tmembers, localMembers, hpos);
1758 
1759     // Updates group flags
1760     msgsStable = true;
1761     installationComplete = false;
1762     status = S_IDLE;
1763     if (log.isDebugEnabled()) {
1764       MDC.put("group", toString());
1765       log.debug("New view installed (stable=true)");
1766     }
1767 
1768     /* Create the new view and delivers it to non-leaving members */
1769     pvid = ViewId.createPartial(cvid, pid);
1770     MemberId[] memberIds = (MemberId[]) memberSet.toArray(new MemberId[memberSet.size()]);
1771     //FIXME this may cause problems for the merging layer since the uniquePids appears to not reflect
1772     // the new view and the number of partitions that the members come from; implement test case for this??
1773     View view = new ViewImpl(gid, pvid, uniquePids.size(), memberIds);
1774     ninstalled++;
1775     viewDesc.notifyView(gid, view);
1776 
1777     // Remove hosts and members that have request to leave
1778     removeLeavingMembers();
1779     removeLeavingHosts();
1780 
1781     if (!tmembers.isEmpty()) {            // The group has not to be removed
1782 
1783       // Updates HostData information
1784       for (Iterator iter = thosts.values().iterator(); iter.hasNext(); ) {
1785         HostData hd = (HostData) iter.next();
1786         hd.setProposal(null);
1787         // PVIEW Should we move also ctbl to ViewDescription
1788       }
1789 
1790       // Updates information about messages
1791       sbuffer.setView(view, hostCount);
1792       MsgMcast scan;
1793       while ((scan = (MsgMcast) delayedSent.removeFirst()) != null)
1794         acceptMessage(scan, false);
1795       while ((scan = (MsgMcast) delayedRcvd.removeFirst()) != null)
1796         if (scan.vid == pvid)
1797           viewDesc.notifyMessage(gid, scan);
1798 
1799       /*
1800        * If the current view size is equal to one, it contains just the
1801        * local host; there is no need to send this message to it.
1802        */
1803       if (!sbuffer.isEmpty() && viewDesc.size() > 1) {
1804         try {
1805           MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
1806             viewDesc.getAcks(), viewDesc.size());
1807           mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
1808         } catch (IOException e) {
1809           log.error("Cannot marshal ACK message", e);
1810           return;
1811         }
1812       }
1813 
1814       // Checks if the view is stable
1815       if (!viewDesc.isStable() || viewDesc.containsCrashedMembers())
1816         synchronizationPhaseInit();
1817     }
1818   }
1819 
1820 } // END Group