View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Random;
25  
26  import jgroup.core.ConfigurationException;
27  import jgroup.core.EndPoint;
28  import jgroup.relacs.config.Domain;
29  import jgroup.relacs.config.TransportConfig;
30  
31  import org.apache.log4j.Logger;
32  
33  
34  /**
35   *  The <code> Cluster </code> class
36   *
37   *  FIXME HEIN: This class should contain a SortedMap of members in the
38   *  cluster, sorted according to each members reachability value.  in
39   *  addition there should be a sub-view of this map, reflecting those
40   *  deemed as reachable.
41   *
42   *  @author Salvatore Camarrata
43   *  @author Hein Meling
44   *  @since Jgroup 0.5
45   */
46  final class Cluster
47    implements MssConstants
48  {
49  
50    ////////////////////////////////////////////////////////////////////////////////////////////
51    // Logger
52    ////////////////////////////////////////////////////////////////////////////////////////////
53  
54    /** Obtain logger for this class */
55    private static final Logger log = Logger.getLogger(Cluster.class);
56  
57  
58    ////////////////////////////////////////////////////////////////////////////////////////////
59    // Static section
60    ////////////////////////////////////////////////////////////////////////////////////////////
61  
62    /** Random generator */
63    private static final Random rnd = new Random();
64  
65    /**
66     *  Constant null array to be returned for the case of no reachable
67     *  hosts (object reuse).  See Item 27 in Effective Java.
68     */
69    private final static MssHost[] NULL_MSS_HOST_ARRAY = new MssHost[0];
70  
71  
72    ///////////////////////////////////////////////////////////////////////
73    // Fields
74    ///////////////////////////////////////////////////////////////////////
75  
76    /** The distributed system configuration */
77    private MssDS mssds;
78  
79    /** Configuration parameters */
80    private TransportConfig config;
81  
82    /** EndPoints */
83    private RoutingTableEntry rt;
84  
85    /** Data link layer */
86    private NI ni;
87  
88    /** Hosts that are member of the cluster */
89    private final List<MssHost> members;
90  
91    /** True if this object is the local cluster */
92    private boolean local;
93  
94    /** Number of members in the cluster that are reachable */
95    private int reachableCounter;
96  
97    /** Message Flow Layer */
98    private MsgFlowSndrSide msgFlow;
99  
100   private MsgCntrl msgCntrl;
101 
102   /**
103    *  Message receivers associated with this cluster, when the message
104    *  is not addressed to all members of the cluster.
105    */
106   private final List<EndPoint> receivers;
107 
108 
109   ///////////////////////////////////////////////////////////////////////
110   // Constructors
111   ///////////////////////////////////////////////////////////////////////
112 
113   /**
114    *  Builds a cluster identified by <code>domain</code>.
115    */
116   Cluster(MssDS mssds, TransportConfig config, Domain domain, NI ni)
117   {
118     /* Copy initialization parameters */
119     this.mssds = mssds;
120     this.ni = ni;
121     this.config = config;
122     local = domain.isLocal();
123     reachableCounter = (local ? 1 : 0);
124     int cost = (local ? 0 : config.getMaxPathLength());
125 
126     /*
127      * Note that when marshalling the routing table entry, it will use
128      * the marshalling of <code>EndPointImpl</code> since currently the
129      * <code>Domain</code> class do not need any marshalling.  Further,
130      * note that EndPointImpl do not detect local clusters correctly.
131      */
132     rt = new RoutingTableEntry(domain.getEndpoint(), domain.getEndpoint(), cost);
133 
134     /* Initialize list of members */
135     members = new ArrayList<MssHost>(domain.size());
136     receivers = new ArrayList<EndPoint>(domain.size());
137 
138     /* Flow msg management */
139     msgFlow= new MsgFlowSndrSide(this, config);
140 
141     if (log.isDebugEnabled())
142       log.debug("Cluster:<init>: " + rt.key);
143   }
144 
145 
146   ///////////////////////////////////////////////////////////////////////
147   // Public methods
148   ///////////////////////////////////////////////////////////////////////
149 
150   /**
151    *  Returns the endpoint associated to this cluster.
152    */
153   EndPoint getEndPoint()
154   {
155     return rt.key;
156   }
157 
158 
159   String getName()
160   {
161     return rt.key.getAddress().getHostName();
162   }
163 
164 
165   MsgCntrl getControl()
166   {
167     return msgCntrl;
168   }
169 
170 
171   void setControl(MsgCntrl msgCntrl)
172   {
173     this.msgCntrl = msgCntrl;
174   }
175 
176 
177   /**
178    *  
179    */
180   RoutingTableEntry getRoutingEntry()
181   {
182     return rt;
183   }
184 
185 
186   /**
187    *  Reset the routing entry for this cluster.  This should only be used
188    *  for the local cluster. <p>
189    *
190    *  It is used by the clustertable update method, and is required to
191    *  ensure that remote failures are detected.
192    */
193   void resetRoute()
194   {
195     rt.cost = 0;
196     rt.TTL = config.getMaxTTL();
197     rt.route = rt.key;
198   }
199 
200 
201   /**
202    *  Returns true if this is the local cluster.
203    */
204   boolean isLocal()
205   {
206     return local;
207   }
208 
209 
210   /**
211    *  Returns the <CODE>MsgFlowSndrSide</CODE> object associated to
212    *  this host.
213    */
214   MsgFlowSndrSide getMsgFlow()
215   {
216     return msgFlow;
217   }
218 
219 
220   /**
221    *  Returns the number of hosts in this cluster.
222    */
223   int size()
224   {
225     return members.size();
226   }
227 
228 
229   /**
230    *  Returns and iterator over the hosts in this cluster.
231    */
232   Iterator<MssHost> iterator()
233   {
234     return members.iterator();
235   }
236 
237 
238   /**
239    *
240    */
241   void insertMember(MssHost member)
242     throws ConfigurationException
243   {
244     if (members.contains(member))
245       throw new ConfigurationException("Trying to insert a previously inserted member in cluster");
246     else {
247       member.setClusterIndex(members.size());
248       members.add(member);
249     }
250   }
251 
252 
253   /**
254    *  Returns the endpoint of a host included in this cluster whose
255    *  reachability value is greater than the given threshold; returns
256    *  the cluster endpoint if no hosts in this cluster has a
257    *  reachability value above the given threshold.
258    *
259    *  The leader is selected randomly among the endpoints in the cluster
260    *  whose reachability value is greater than the threshold.
261    *
262    *  FIXME POSSIBLE OPTIMIZATION: If the hosts were stored in a
263    *  SortedMap (on a more permanent basis), we could use the sorted
264    *  order to determine the range from which the random number should
265    *  be picked.
266    */
267   EndPoint getLeaderEndPoint(int threshold)
268   {
269     int size = size();
270     List<EndPoint> candidates = new ArrayList<EndPoint>(size);
271     for (MssHost candidate : members) {
272       if (candidate.isReachable(threshold))
273         candidates.add(candidate.getEndPoint());
274     }
275     return (candidates.isEmpty() ? getEndPoint()
276             : (EndPoint) candidates.get(rnd.nextInt(candidates.size())));
277   }
278 
279 
280   /**
281    *  Returns the endpoint of a host included in this cluster whose
282    *  reachability value is greater than threshold obtained from the
283    *  transport configuration object; returns the cluster endpoint if no
284    *  hosts in this cluster has a reachability value above the
285    *  associated threshold.
286    */
287   EndPoint getLeaderEndPoint()
288   {
289     return getLeaderEndPoint(config.getReachabilityThreshold());
290   }
291 
292 
293   /**
294    *  Set all reachable members of the cluster to the given reachability
295    *  warning level, and check if there are unreachable members.
296    */
297   void setWarning(int TTLwarning)
298   {
299     for (MssHost host : members) {
300       if (host.isReachable(TTLwarning)) {
301         host.pingOK(TTLwarning);
302       } else if (!host.pingKO() && host.wasReachable()) {
303         /* If the host was reachable, but is no longer reachable */
304         mssds.setAsUnreachable(host);
305       }
306     }
307   }
308 
309 
310   /**
311    *  Check for previously reachable members of the cluster, and mark
312    *  them as unreachable.
313    */
314   void updateReachability()
315   {
316     for (MssHost host : members) {
317       if (!host.isLocal() && host.isReachable()) {
318         host.printReachParams();
319       }
320       if (!host.isLocal() && !host.pingKO() && host.wasReachable()) {
321         /* If the member was reachable, but is no longer reachable */
322         mssds.setAsUnreachable(host);
323       }
324     }
325   }
326 
327 
328   /**
329    *  Mark all reachable members of the cluster as unreachable, and
330    *  check if there are new unreachable members.
331    */
332   void setClusterAsUnreachable()
333   {
334     for (MssHost host : members) {
335       if (host.wasReachable()) {
336         /* If the member was reachable, mark it as unreachable */
337         mssds.setAsUnreachable(host);
338       }
339     }
340   }
341 
342 
343   MssHost[] getReachable(int threshold)
344   {
345     int size = members.size();
346     List<MssHost> reachable = new ArrayList<MssHost>(size);
347     for (MssHost host : members) {
348       if (host.isReachable(threshold))
349         reachable.add(host);
350     }
351     return (MssHost[]) reachable.toArray(NULL_MSS_HOST_ARRAY);
352   }
353 
354 
355   MssHost[] getReachable()
356   {
357     return getReachable(MINIMUM_ALIVE_VALUE);   
358   }
359 
360 
361   void incrementReachableCounter()
362   {
363     reachableCounter++;
364   }
365 
366 
367   void decrementReachableCounter()
368   {
369     reachableCounter--;
370   }
371   
372 
373   int getReachableCounter()
374   {
375     return reachableCounter;
376   }
377   
378   
379   boolean isReachable()
380   {
381     return (reachableCounter != 0);
382   }
383 
384 
385   /**
386    *  Returns true if the local cluster is directly connected to this
387    *  cluster.  False is returned if there is no direct connection, and
388    *  messages to this cluster (from the local cluster) must be routed
389    *  through another cluster.
390    */
391   boolean directlyConnected()
392   {
393     return (rt.key.equals(rt.route));
394   }
395 
396 
397   /**
398    *  Returns the cluster that we need to route through to get to this
399    *  cluster from the local cluster.
400    */
401   private Cluster getRouteCluster()
402   {
403     return mssds.clusterLookup(rt.route);
404   }
405 
406 
407   /**
408    *  Reset the sender side message flow for this cluster.  This method
409    *  should be invoked only during initialization.
410    */
411   void resetMsgFlow()
412   {
413     msgFlow.reset(rnd.nextInt());
414   }
415 
416 
417   /**
418    *  Reset the sender side message flow for this cluster to the given
419    *  value.  This is used to indicate that this cluster is unreachable.
420    */
421   void resetMsgFlow(int mid)
422   {
423     msgFlow.reset(mid);
424   }
425 
426 
427   ////////////////////////////////////////////////////////////////////////////////////////////
428   // Receiver management
429   ////////////////////////////////////////////////////////////////////////////////////////////
430 
431   void addReceiver(EndPoint member)
432   {
433     receivers.add(member);
434   }
435 
436   void clearReceivers()
437   {
438     receivers.clear();
439   }
440 
441   boolean hasReceivers()
442   {
443     return !(receivers.isEmpty());
444   }
445 
446 
447   ////////////////////////////////////////////////////////////////////////////////////////////
448   // Cluster sending methods
449   ////////////////////////////////////////////////////////////////////////////////////////////
450 
451   /**
452    *  Forward a raw message fragment to this cluster.
453    */
454   void forward(byte[] buffer, int bufLen)
455   {
456     if (local) {
457 
458       /* If it is my local cluster (LAN) */
459 
460       ni.send(buffer, bufLen);
461       if (log.isDebugEnabled())
462         log.debug("Forwarded fragment to local cluster");
463 
464     } else {
465 
466       /*
467        * Sending to a remote cluster, preferably through the cluster
468        * leader; otherwise, we send directly to each member of the
469        * remote cluster.
470        */
471       EndPoint leader = getLeaderEndPoint();
472       if (!leader.isMulticastEndPoint()) {
473 
474         /*
475          * If there is a reachable leader member for this cluster, then
476          * send the message directly to the leader.  The leader will
477          * broadcast the message within its local cluster to the other
478          * cluster members.
479          */
480         ni.send(leader, buffer, bufLen);
481         if (log.isDebugEnabled())
482           log.debug("Forwarded fragment to remote cluster through leader: " + leader);
483 
484       } else {
485 
486         /*
487          * Set the receiver list to all members, unless the message is
488          * associated with a set of receivers.
489          */
490         if (receivers.isEmpty()) {
491           /* Sending to all members in cluster */
492           for (MssHost host : members) {
493             ni.send(host.getEndPoint(), buffer, bufLen);
494             if (log.isDebugEnabled())
495               log.debug("Forwarded fragment to receiver " + host);
496           }
497         } else {
498           /* Sending to receivers in cluster */
499           for (EndPoint recv : receivers) {
500             ni.send(recv, buffer, bufLen);
501             if (log.isDebugEnabled())
502               log.debug("Forwarded fragment to receiver " + recv);
503           }
504         }
505       }
506     }
507   }
508 
509 
510   /**
511    *  Method for resending Jgroup related messages.  This involves
512    *  setting the cluster, receiver set, and flow control information
513    *  for this cluster.  The actual Jgroup level message remains the
514    *  same; we only changed the cluster, flow control data and receiver
515    *  set, in addition to the fragment identifier in the case the
516    *  original message was sent to multiple clusters (each with
517    *  different fragId sequence numbers).
518    */
519   void resend(FragmentIterator fragIter, int fragId)
520   {
521     MsgJG msgjg = (MsgJG) fragIter.getMsg();
522     EndPoint[] rcvrs = msgjg.getReceivers();
523 
524     /*
525      * Add receivers in the destination cluster to the receiver set of
526      * the cluster.
527      */
528     for (int i = 0; i < rcvrs.length; i++) {
529       MssHost host = mssds.hostLookup(rcvrs[i]);
530       if (!rcvrs[i].isLocal() && host.isIn(this)) {
531         addReceiver(rcvrs[i]);
532         if (log.isDebugEnabled()) {
533           log.debug("Added host to the destination list: " + host);
534         }
535       } else if (log.isDebugEnabled()) {
536         log.debug("Removed host from the destination list: " + host);
537       }
538     }
539 
540     /*
541      * Update the cluster header field and the receiver set of the
542      * message.
543      */
544     msgjg.setCluster(this);
545     msgjg.setReceivers(receivers);
546 
547     /*
548      * Reset the fragment identifier, in case it was changed due to
549      * sending the same fragment to different clusters (with different
550      * fragment id sequences).
551      */
552     byte[] msgFragment = fragIter.getFragment(fragId);
553     FragmentHeader.marshal(false, fragId, msgFragment);
554 
555     /*
556      * At this point the message should be updated again, allowing us to
557      * send a correct fragment.
558      */
559     forward(msgFragment, fragIter.fragmentLength());
560   }
561 
562 
563   /**
564    *  Method for sending Jgroup related messages.  This involves setting
565    *  the cluster, receiver set, and flow control information for this
566    *  cluster.
567    */
568   void send(MsgJG msgjg)
569   {
570     /*
571      * Update the cluster header field and the receiver set of the
572      * message.
573      */
574     msgjg.setCluster(this);
575     msgjg.setReceivers(receivers);
576 
577     if (ROUTING_ENABLED) {
578 
579       /*
580        * Check if the message has to be routed (that is, this cluster is
581        * not directly connected to the local cluster)
582        */
583       if (!directlyConnected()) {
584         if (log.isDebugEnabled()) {
585           log.debug("Stack trace", new Exception());
586           log.debug("localCluster=" + ClusterTable.getLocalCluster() + ", Route=" + rt);
587         }
588 
589         /*
590          * The message must be forwarded; simply forward it to the next
591          * cluster in route for the destination (this) cluster.
592          */
593         getRouteCluster().send((Msg) msgjg);
594       }
595 
596     } else {
597 
598       send((Msg) msgjg);
599 
600     }
601   }
602 
603 
604   /**
605    *  Send a generic mss level message to this cluster.
606    *
607    *  Note that this method used to be synchronzied.  However, I believe
608    *  that this is unecessary since the fragment iteration loops will have
609    *  all their data in the local stack frame of the current invocation.
610    *  Only those methods whom modify the object state needs to be synchronized.
611    */
612   void send(Msg msg)
613   {
614     /* Send the msg directly to the cluster */
615 
616     if (local) {
617 
618       /* If it is my local cluster (LAN) */
619 
620       for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
621         /* Generate and send a fragment */
622         byte[] buffer = frag.next(false);
623         int bufLen = frag.fragmentLength();
624 
625         if (log.isDebugEnabled())
626           log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster " + rt.key);
627         ni.send(buffer, bufLen);
628       }
629 
630     } else {
631 
632       /*
633        * Sending to a remote cluster, preferably through the cluster
634        * leader; otherwise, we send directly to each member of the
635        * remote cluster.
636        */
637 
638       EndPoint leader = getLeaderEndPoint();
639       if (!leader.isMulticastEndPoint()) {
640 
641         /*
642          * If there is a reachable leader member for this cluster, then
643          * send the message directly to the leader.  The leader will
644          * broadcast the message within its cluster to the other cluster
645          * members.
646          */
647 
648         for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
649           /* Generate and send a fragment */
650           byte[] buffer = frag.next(true);
651           int bufLen = frag.fragmentLength();
652 
653           if (log.isDebugEnabled())
654             log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster leader " + leader);
655           ni.send(leader, buffer, bufLen);
656         }
657 
658       } else {
659 
660         /*
661          * Otherwise send the message to all the members of the
662          * cluster; alternatively, if only a subset of the cluster
663          * members are receivers, only those will receive the message.
664          */
665 
666         for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
667           /* Generate and send a fragment */
668           byte[] buffer = frag.next(false);
669           int bufLen = frag.fragmentLength();
670 
671           /*
672            * Set the receiver list to all members, unless the message is
673            * associated with a set of receivers.
674            */
675           if (receivers.isEmpty()) {
676             /* Sending to all members in cluster */
677             for (MssHost host : members) {
678               if (log.isDebugEnabled())
679                 log.debug("Sending msg fragment (" + frag.getFid() + ") to member " + host);
680               ni.send(host.getEndPoint(), buffer, bufLen);
681             }
682           } else {
683             /* Sending to receivers in cluster */
684             for (EndPoint recv : receivers) {
685               if (log.isDebugEnabled())
686                 log.debug("Sending msg fragment (" + frag.getFid() + ") to receiver " + recv);
687               ni.send(recv, buffer, bufLen);
688             }
689           }
690 
691         } // end for-loop frag iterator
692       } // end if-else leader mcast endpoint
693     } // end if-else local/remote cluster
694   }
695 
696 
697   ////////////////////////////////////////////////////////////////////////////////////////////
698   // Methods from Object
699   ////////////////////////////////////////////////////////////////////////////////////////////
700 
701   /**
702    *  Returns the hash code for this object.
703    */
704   public int hashCode()
705   {
706     return rt.key.hashCode();
707   }
708 
709 
710   /**
711    *  Returns a string representation of this object.
712    */
713   public String toString()
714   {
715     StringBuilder buf = new StringBuilder();
716     buf.append("[Cluster: ");
717     buf.append(rt.key);
718     buf.append("]");
719     return buf.toString();
720   }
721 
722 
723   /**
724    *  Compares two objects for content equality.
725    *
726    *  @param obj
727    *    The object to compare this object with.
728    *  @return 
729    *    True if these objects are equal; false otherwise.
730    */
731   public boolean equals(Object obj)
732   {
733     if (!(obj instanceof Cluster))
734       return false;
735     return this.rt.equals(((Cluster) obj).rt);
736   }
737 
738 } // END Cluster