View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  import java.util.Collection;
22  import java.util.Iterator;
23  import java.util.Map;
24  
25  import jgroup.core.EndPoint;
26  import jgroup.core.View;
27  import jgroup.relacs.events.DeliveryAck;
28  import jgroup.relacs.events.DeliveryEvent;
29  import jgroup.relacs.events.InstallAck;
30  import jgroup.relacs.events.InstallEvent;
31  import jgroup.relacs.mss.Mss;
32  import jgroup.relacs.types.LocalId;
33  import jgroup.util.InMessage;
34  import jgroup.util.Queue;
35  import jgroup.util.Util;
36  
37  import org.apache.log4j.Logger;
38  
39  /**
40   *  The <code>ViewDescription</code> class describes a view.
41   *
42   *  @author Alberto Montresor
43   *  @since Jgroup 0.3
44   */
45  final class ViewDescription
46    implements Tag
47  {
48  
49    ////////////////////////////////////////////////////////////////////////////////////////////
50    // Logger
51    ////////////////////////////////////////////////////////////////////////////////////////////
52  
53    /** Obtain logger for this class */
54    private static final Logger log = Logger.getLogger(ViewDescription.class);
55  
56  
57    ////////////////////////////////////////////////////////////////////////////////////////////
58    // Fields
59    ////////////////////////////////////////////////////////////////////////////////////////////
60  
61    /** Hosts that are members of this view */
62    private HostData[] hosts;
63  
64    /** Data associated with hosts included in the current view */
65    private HostViewData[] data;
66    
67    /** Endpoints of the hosts in this view */
68    private EndPoint[] endpoints;
69  
70    /** 
71     * True if the view is stable, i.e. if its current host membership
72     * is equal to the current reachability set. If a view is not
73     * stable, a new agreement protocol must be launched.
74     */
75    private boolean stable;
76    
77    /** True if the view contains local members that have crashed. */
78    private boolean containsCrashed;
79  
80    /** Members in this view */
81    private MemberData[] members;
82  
83    /** 
84     * Array of boolean flags indicating whether the view described
85     * in this section has been installed or not.
86     */
87    private boolean[] hasInstalled;
88    
89    /** Position of the local host in the view description */
90    private int localIndex;
91  
92  
93    ////////////////////////////////////////////////////////////////////////////////////////////
94    // Constructors
95    ////////////////////////////////////////////////////////////////////////////////////////////
96  
97    /**
98     *  Builds a <CODE>ViewDescription</CODE> containing a single member
99     *  on a single host.
100    *  
101    *  @param hd the host to be inserted
102    *  @param md the member to be inserted
103    */
104   ViewDescription(HostData hd, MemberData md)
105   {
106     hosts = new HostData[] { hd };
107     data = new HostViewData[] { new HostViewData(1, 1) };
108     endpoints = new EndPoint[] { hd.getEndPoint() };
109     members = new MemberData[] { md };
110     hasInstalled = new boolean[1];
111     localIndex = 0;
112     stable = true;
113     containsCrashed = false;
114   }
115 
116 
117   /**
118    *  Builds a <CODE>ViewDescription</CODE> containing the hosts
119    *  identified by <CODE>endpoints</CODE>.
120    *
121    */
122   ViewDescription(Map thosts, EndPoint[] endpoints, EndPoint[] rset, 
123     Map tmembers, LocalId[] localMembers, int localIndex)
124   {
125     this.stable = true;
126 
127     /*
128      *  Build host composition 
129      */
130     this.hosts = new HostData[endpoints.length];
131     this.endpoints = new EndPoint[endpoints.length];
132     this.data = new HostViewData[endpoints.length];
133     this.hasInstalled = new boolean[tmembers.size()];
134     for (int i=0; i < hosts.length; i++) {
135       this.endpoints[i] = endpoints[i];
136       hosts[i] = (HostData) thosts.get(endpoints[i]);
137       if (hosts[i] == null)
138         throw new IllegalStateException("Error: installing inconsistent view");
139 
140       /*
141        *  FIXED ALB: tmembers contains the current set of local hosts
142        *  while memberIds contains the identifiers of the local
143        *  members that have been included in the view. Their size
144        *  may differ, as new members may join the group when a view
145        *  is installed. Since an HostViewData object contains the
146        *  acknowledgement information for the current view, to
147        *  initialize it we need to use memberIds.
148        *  Previous code:
149        *    data[i] = new HostViewData(endpoints.length, tmembers.size());
150        */
151       data[i] = new HostViewData(endpoints.length, localMembers.length);
152       
153       if (hosts[i].getVersion() > hosts[i].getAgreed()) {
154         if (log.isDebugEnabled())
155           log.debug(endpoints[i] + " version " + hosts[i].getVersion()
156               + " > agreed " + hosts[i].getAgreed() + "(unstable view)");
157         stable = false;
158       }
159     }
160 
161     for (Iterator iterator = thosts.values().iterator(); iterator.hasNext(); ) {
162       HostData host = (HostData) iterator.next();
163       EndPoint endpoint = host.getEndPoint();
164       if (!host.isLeaving() && Util.in(rset, endpoint)) {
165         boolean found = false;
166         for (int i=0; i < endpoints.length && !found; i++) {
167           if (endpoints[i].equals(endpoint)) {
168             found = true;
169           }
170         }
171         if (!found) {
172           if (log.isDebugEnabled())
173             log.debug(endpoint + " excluded, but is in the reachable set (unstable view)");
174           stable = false;
175         }
176       }
177       if (log.isDebugEnabled()) {
178         log.debug(endpoint + ": is " + (host.isLeaving() ? "" : "not ")
179           + "leaving, and is " + (Util.in(rset, endpoint) ? "" : "not ")
180           + "in the reachable set.");
181       }
182     }
183     if (log.isDebugEnabled()) {
184       StringBuilder buf = new StringBuilder("endpoints: ");
185       for (int i = 0; i < endpoints.length; i++) {
186         buf.append(endpoints[i]);
187         buf.append(" ");
188       }
189       log.debug(buf);
190       buf = new StringBuilder(" reachset: ");
191       for (int i = 0; i < rset.length; i++) {
192         buf.append(rset[i]);
193         buf.append(" ");
194       }
195       log.debug(buf);
196     }
197 
198     /*
199      *  Build member composition 
200      */
201     this.members = new MemberData[localMembers.length];
202     for (int i=0; i < localMembers.length; i++) {
203       members[i] = (MemberData) tmembers.get(localMembers[i]);
204       if (members[i] == null)
205         throw new IllegalStateException("Error: installing inconsistent view");
206     }
207 
208     this.localIndex = localIndex;
209   }
210 
211   ////////////////////////////////////////////////////////////////////////////////////////////
212   // Methods
213   ////////////////////////////////////////////////////////////////////////////////////////////
214 
215   /**
216    *  Returns the number of hosts composing this view.
217    */
218   int hostSize()
219   {
220     return hosts.length;
221   }
222   
223   /**
224    *  Returns the number of members composing this view.
225    */
226   int memberSize()
227   {
228     return members.length;
229   }
230   
231   /**
232    *  Returns the i-th host of the view 
233    */
234   HostData getHost(int index)
235   {
236     return hosts[index];
237   }
238   
239   /**
240    *  Returns the i-th member of the view 
241    */
242   MemberData getMember(int index)
243   {
244     return members[index];
245   }
246 
247   /**
248    *  Returns the set of endpoints composing this view.
249    */
250   EndPoint[] getEndPoints()
251   {
252     return endpoints;
253   }
254   
255   /**
256    *  Returns the size of this view.
257    */
258   int size()
259   {
260     return endpoints.length;
261   }
262 
263   /**
264    * Returns true if the provided view composition equals the
265    * view composition represented by this <code>ViewDescription</code>
266    * object.
267    *
268    * @param prev_cvcomp
269    *   The previous view composition to compare against this view
270    * @return
271    *   True if the views are equal, false otherwise.
272    */
273   boolean isDuplicate(EndPoint[] prev_cvcomp)
274   {
275     if (endpoints.length == prev_cvcomp.length) {
276       for (int i = 0; i < prev_cvcomp.length; i++) {
277         if (!prev_cvcomp[i].equals(endpoints[i])) {
278           return false;
279         }
280       }
281       return true;
282     } else {
283       return false;
284     }
285   }
286 
287   /**
288    * Returns true if this view is stable, i.e. if its current host membership
289    * is equal to the current reachability set. If a view is not
290    * stable, a new agreement protocol must be launched.
291    */
292   boolean isStable()
293   {
294     return stable;
295   }
296   
297   /**
298    * Method <code>containsCrashedMembers</code> returns true if some of
299    * the members contained in the current view have crashed. 
300    * @return true if the current view contains crashed members.
301    */
302   boolean containsCrashedMembers()
303   {
304     return containsCrashed;
305   }
306   
307   int getLocalIndex()
308   {
309     return localIndex;
310   }
311   
312   int getDelivered(int index)
313   {
314     log.assertLog(data.length > index, "Provided host index=" + index
315       + " is greater than data.length=" + data.length);
316     return data[index].delivered;
317   }
318   
319   /**
320    *  
321    */
322   int[] getAcks()
323   {
324     int[] acks = new int[data.length];
325     for (int i=0; i < acks.length; i++) {
326       acks[i] = data[i].hack[localIndex];
327     }
328     return acks;
329   }
330 
331   /**
332    *  
333    */
334   int[] getDelivered(Collection estimate)
335   {
336     int[] delivered = new int[estimate.size()];
337     int i = 0;
338     for (Iterator iter = estimate.iterator(); iter.hasNext(); i++) {
339       HostData host = (HostData) iter.next();
340       if (host.hasValidViewIndex())
341         delivered[i] = data[host.getViewIndex()].delivered;
342       else
343         delivered[i] = 0;
344       if (log.isDebugEnabled())
345         log.debug(host + ": delivered=" + delivered[i]);
346     }
347     if (log.isDebugEnabled()) {
348       StringBuilder buffer = new StringBuilder();
349       buffer.append("Delivered: ");
350       for (i=0; i < delivered.length; i++)
351         buffer.append(delivered[i] + " ");
352       log.debug(buffer.toString());
353     }
354     return delivered;
355   }
356     
357 
358   /**
359    *  Returns true if the message has not already been delivered
360    *  locally.
361    *  
362    *  @param index the index of the sender
363    *  @param mid the message identifier
364    */
365   boolean isNew(int index, int mid)
366   {
367     return (mid > data[index].hack[localIndex]);
368   }
369 
370 
371   /**
372    *  
373    */
374   boolean localViewAck(InstallAck ack) 
375   {
376     boolean viewStable = true;
377     hasInstalled[ack.getMemberIndex()] = true;
378     for (int i=0; i < members.length && viewStable; i++)
379       viewStable = (members[i].isLeaving() || 
380         members[i].isCrashed() || hasInstalled[i]);
381     return viewStable;
382   }
383 
384   /**
385    *  
386    */
387   void remoteMessageAck(MsgAck msg)
388   {
389     /* Sanity check */
390     log.assertLog(msg.ack.length == hosts.length, "Received incorrect ACK message");
391 
392     /* Check ack */
393     for (int i=0; i < hosts.length; i++) {
394       if (data[i].hack[msg.hpos] < msg.ack[i]) {
395         data[i].hack[msg.hpos] = msg.ack[i];
396         data[i].checkAck();
397       }
398     }
399   }
400 
401   /**
402    *  
403    */
404   boolean localDeliveryAck(DeliveryAck ack)
405   {
406     int mid = ack.getMessageId();
407     int hostIndex = ack.getHostIndex();
408     HostViewData sender = data[hostIndex];
409     sender.mack[ack.getMemberIndex()] = mid;
410     if (log.isDebugEnabled()) {
411       log.debug("DeliveryAck from " + hosts[hostIndex]
412           + " with mid=" + mid + ", sender.delivered=" + sender.delivered);
413     }
414     boolean carryon = true;
415 
416     /*
417      * Maybe the number of delivered msgs have increased
418      */
419 //FIXME implement buffer instead
420     if (mid >= sender.delivered+1) {
421       carryon = updateDelivered(sender, hostIndex, mid);
422     }
423     if (log.isDebugEnabled()) {
424       log.debug("carryon=" + carryon);
425     }
426     return carryon;
427   }
428 
429   /**
430    * 
431    * @param sender
432    * @param hostIndex
433    * @param mid
434    */
435   private boolean updateDelivered(HostViewData sender, int hostIndex, int mid)
436   {
437     boolean carryon = true;
438     /*
439      * Check that we have received an appropriate ACK from all members.
440      * Note that, members.length must be is equal to sender.mack.length.
441      */
442     for (int i=0; i<members.length && carryon; i++)
443       carryon = (members[i].isCrashed() || members[i].isLeaving() || sender.mack[i] >= mid);
444     if (carryon) {
445       sender.delivered = mid;
446       if (log.isDebugEnabled())
447         log.debug("Updated delivered for " + hosts[hostIndex] + ": " + mid);
448       /*
449        * Check that we have received an appropriate ACK from all hosts
450        */
451       for (int i=0; i < data.length && carryon; i++) {
452         carryon = (data[i].hack[localIndex] == data[i].delivered);
453       }
454     }
455     return carryon;
456   }
457 
458   /**
459    *  Forward the messages contained in the buffer
460    *  to the hosts in the view.
461    */
462   void forwardMessages(Mss mss)
463   {
464     /**
465      *  If the current view size is equal to one, it contains just the
466      *  local host; there is no need to send this message to it
467      */
468     if (endpoints.length > 1) {
469       for (int i = 0; i < data.length; i++) {
470         for (Iterator iter = data[i].buffer.iterator(); iter.hasNext(); ) {
471           MsgMcast msg = (MsgMcast) iter.next();
472           if (log.isDebugEnabled())
473             log.debug("Forwarding msg " + msg.mid + " to group members");
474           mss.msend(R_FORWARD, msg, endpoints);
475         }
476       }
477     }
478   }
479 
480 
481   /**
482    *  This method performs the local installation of a view.
483    *  Returns true if all members have received the InstallEvent;
484    *  return false if some of them have crashed. Having received
485    *  a InstallEvent does not mean that the view has been installed;
486    *  the members will send an InstallAck message when this happens.
487    * 
488    * @param gid
489    *   the identifier of the group for which this view has been generated
490    * @param view
491    *   the view to be installed
492    */
493   void notifyView(int gid, View view)
494   {
495     if (log.isDebugEnabled()) {
496       log.debug(view);
497     }
498     for (int i=0; i < members.length; i++) {
499       if (!members[i].isLeaving() && !members[i].isCrashed()) {
500         InstallEvent event = new InstallEvent(gid, view, localIndex, i);
501         boolean success = DaemonInteraction.addEvent(members[i], event);
502         if (!success) {
503           members[i].setCrashed();
504           containsCrashed = true;
505         }
506       }
507     }
508   }
509 
510 
511   /**
512    *  Performs the local delivery of a multicast message
513    */
514   boolean notifyMessage(int gid, MsgMcast msg)
515   {
516     boolean newCrashed = false;
517     if (log.isDebugEnabled())
518       log.debug("notifyMessage for group: " + gid + ", msg (" + msg.mid + ")" );
519 
520     /* Notify the message to the members */
521     for (int i=0; i < members.length; i++) {
522       if (!members[i].isLeaving() && !members[i].isCrashed()) {
523         InMessage inmsg = msg.getInMessage();
524         boolean success;
525         synchronized (inmsg) {
526           DeliveryEvent event = new DeliveryEvent(gid, msg.isObject, msg.mid, 
527             msg.hpos, msg.sender, msg.ackr, inmsg);
528           success = DaemonInteraction.addEvent(members[i], event);
529         }
530         if (!success) {
531           members[i].setCrashed();
532           newCrashed = true;
533           containsCrashed = true;          
534         } else {
535           members[i].setAck(msg.mid);
536         }
537       }
538     }
539 
540     /* Update ack data */  
541     data[msg.hpos].hack[localIndex] = msg.mid;
542     data[msg.hpos].hack[msg.hpos] = msg.mid;
543     data[msg.hpos].buffer.insert(msg);
544     
545     data[msg.hpos].checkAck();
546     return newCrashed;
547   }
548 
549 
550   public String toString()
551   {
552     StringBuilder b = new StringBuilder("[ViewDescription: ");
553     b.append((stable ? "stable" : "not stable"));
554     b.append(", hosts={");
555     for (int i = 0; i < hosts.length; i++) {
556       if (i != 0)
557         b.append(", ");
558       b.append(hosts[i]);
559     }
560     b.append("} delivered={");
561     for (int i = 0; i < data.length; i++) {
562       if (i != 0)
563         b.append(", ");
564       b.append(data[i].delivered);
565     }
566     b.append("}, acks={");
567     int[] acks = getAcks();
568     for (int i = 0; i < acks.length; i++) {
569       if (i != 0)
570         b.append(", ");
571       b.append(acks[i]);
572     }
573     b.append("}");
574     return b.toString();
575   }
576 
577 
578   private class HostViewData 
579   {
580     
581     /** Hosts acknowledgement */ 
582     int[] hack;
583   
584     /** Members acknowledgement */ 
585     int[] mack;
586   
587     /** Minimal value among host acknowledgement */ 
588     int hmin;
589   
590     /** Messages sent to members */ 
591     int delivered;
592   
593     /** Message buffer */ 
594     Queue buffer;
595 
596     /**
597      *  Builds an HostViewData for a view of size <CODE>size</CODE>.
598      */
599     HostViewData(int hostSize, int memberSize)
600     {
601       hack        = new int[hostSize];
602       mack        = new int[memberSize];
603       hmin        = 0;
604       delivered   = 0;
605       buffer      = new Queue();
606     }
607     
608     /**
609      *
610      */
611     void checkAck()
612     {
613       int min = Integer.MAX_VALUE;
614       for (int i=0; i < hack.length; i++) 
615         if (min > hack[i])
616           min = hack[i];
617       if (min > hmin) {
618         for (int i=hmin; i < min; i++) 
619           buffer.removeFirst();
620         hmin = min;
621       }
622     }
623     
624   } // END HostViewData
625 
626 } // END ViewDescription