View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.util.HashMap;
22  import java.util.Iterator;
23  
24  import jgroup.core.ConfigurationException;
25  import jgroup.core.EndPoint;
26  import jgroup.relacs.config.TransportConfig;
27  
28  import org.apache.log4j.Logger;
29  
30  /**
31   *  The <code> ClusterTable </code> class
32   *
33   *  @author Salvatore Cammarata
34   *  @author Hein Meling
35   *  @since Jgroup 1.2
36   */
37  final class ClusterTable
38    implements MssConstants, MssTag
39  {
40  
41    ////////////////////////////////////////////////////////////////////////////////////////////
42    // Logger
43    ////////////////////////////////////////////////////////////////////////////////////////////
44  
45    /** Obtain logger for this class */
46    private static Logger log = Logger.getLogger(ClusterTable.class);
47  
48    
49    ////////////////////////////////////////////////////////////////////////////////////////////
50    // Constants
51    ////////////////////////////////////////////////////////////////////////////////////////////
52  
53    /** Initial capacity of the table */
54    private final static int CAPACITY = 32;
55  
56  
57    ////////////////////////////////////////////////////////////////////////////////////////////
58    // Fields
59    ////////////////////////////////////////////////////////////////////////////////////////////
60  
61    /** Mss level distributed system configuration */
62    private MssDS mssds;
63  
64    /** Configuration parameters */
65    private TransportConfig config;
66  
67    /** Hash map containing the clusters */
68    private HashMap map;
69    
70    /** Table of clusters */
71    private Cluster[] table;
72  
73    /** Actual size of the table */
74    private int size;
75  
76    /** This cluster */
77    private static Cluster me;
78  
79  
80    ////////////////////////////////////////////////////////////////////////////////////////////
81    // Constructors
82    ////////////////////////////////////////////////////////////////////////////////////////////
83  
84    /**
85     *  Constructs an empty cluster table with an initial
86     *  capacity equal to <CODE>CAPACITY</CODE>.
87     */
88    ClusterTable(MssDS mssds, TransportConfig config)
89    {
90      this.mssds = mssds;
91      this.config = config;
92      map = new HashMap();
93      table = new Cluster[CAPACITY];
94      size = 0;
95    }
96  
97  
98    ////////////////////////////////////////////////////////////////////////////////////////////
99    // Public methods
100   ////////////////////////////////////////////////////////////////////////////////////////////
101 
102   /**
103    *  Inserts cluster <code>cluster</code> in the table, unless a cluster 
104    *  with the same key is already in the table.
105    *
106    *  @param cluster
107    *    The cluster to be inserted.
108    *  @exception ConfigurationException 
109    *    If the <CODE>cluster</CODE> is already in the table.
110    *  @exception NullPointerException
111    *    If <CODE>cluster</CODE> is null.
112    */
113   void insert(Cluster cluster)
114     throws ConfigurationException
115   {
116     if (cluster == null)
117       throw new NullPointerException();
118     if (map.containsKey(cluster.getEndPoint()))
119       throw new ConfigurationException("Cluster already present in " + cluster.getEndPoint());
120     if (cluster.isLocal()) {
121       if (me == null) {
122         me = cluster;
123       } else {
124         throw new ConfigurationException("Local cluster already defined: " + cluster.getEndPoint());
125       }
126     }
127     /* If space in the table has been exhausted, we double it */
128     if (size >= table.length) {
129       doubleCapacity();
130     }
131     table[size++] = cluster;
132     map.put(cluster.getEndPoint(), cluster);
133   }
134 
135 
136   /**
137    *  Returns the number of cluster contained in the cluster table.
138    */
139   int size()
140   {
141     return size;
142   }
143 
144 
145   /**
146    *  Returns the <CODE>index</CODE>-th cluster maintained in the cluster
147    *  table.
148    *  
149    *  @param index the index of the cluster to be returned
150    *  @exception IndexOutOfBoundException if <CODE>index</CODE> is not a 
151    *  valid index
152    */
153   Cluster get(int index)
154   {
155     if (index < 0 || index >= size)
156       throw new IndexOutOfBoundsException("Index " + index);
157     return table[index];
158   }
159 
160 
161   /**
162    *  Returns the cluster reference to which this endpoint is mapped.
163    *
164    *  @param   EndPoint endpoint
165    *  @return  the cluster reference to which this endpoint is mapped.
166    */
167   Cluster lookup(EndPoint endpoint)
168   {
169     return (Cluster) map.get(endpoint);
170   }
171 
172 
173   Iterator iterator()
174   {
175     return map.values().iterator();
176   }
177 
178 
179   /**
180    *  Returns the local cluster descriptor.
181    *
182    *  @throws IllegalStateException
183    *    Raised if the local cluster has not yet been initialized.
184    */
185   public static Cluster getLocalCluster()
186   {
187     if (me == null)
188       throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
189     return me;
190   }
191 
192 
193   /**
194    *  Reset the message flow of all clusters in this
195    *  <code>ClusterTable</code>.
196    */
197   void resetMsgFlow()
198   {
199     for (Iterator iter = map.values().iterator(); iter.hasNext(); ) {
200       Cluster cluster = (Cluster) iter.next();
201       cluster.resetMsgFlow();
202     }
203   }
204 
205 
206   /**
207    *  Update the routing table, based on the information stored in the
208    *  routing message.
209    */
210   void updateRoutingTable(MsgRouting msg)
211   {
212     TopologyEntry[] topologyTable = msg.getTopologyTable();
213     for (int i=0; i < topologyTable.length; i++) {
214       /*
215        * Note that the sending cluster can be obtained from the message,
216        * even though the ROUTING message has been forwarded through an
217        * intermediate host.  The forwarding mechanism, preserves the
218        * sender field of the message, changing only the message tag to
219        * FWDROUTING.
220        */
221       EndPoint sendingCluster = msg.getCluster().getEndPoint();
222       mergeRoute(topologyTable[i], sendingCluster);
223     }
224   }
225 
226 
227   /**
228    *  Updates the reachability state of the members of each cluster
229    *  according to the route information.  Discards any entry with TTL
230    *  equal to zero.  If the TTL is equal to the TTLwarning threshold
231    *  the cost is increased in order to make is less appetitible.
232    */
233   void update()
234   {
235     if (me == null)
236       throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
237 
238     /*
239      * To ensure that the route for the local cluster is always correct,
240      * we reset it for every update.  This should actually not be needed
241      * for every update (assuming that the route does not change for the
242      * wrong reason), since the route for the local cluster should
243      * never change.  FIXME HEIN: Move this method to insert() above, and
244      * do a complete test to ensure that the local route never changes.
245      */
246     me.resetRoute();
247 
248     /*
249      * Update the route and reachability information for each cluster.
250      */
251     for (int i = 0; i < size; i++) {
252       Cluster cluster = table[i];
253       //FIXME HEIN: move the code below to a new method in Cluster.
254       RoutingTableEntry rt = cluster.getRoutingEntry();
255       if (rt.TTL != 0) {
256         int TTLwarning = config.getTTLWarning();
257 
258         if (--rt.TTL == 0) {
259           /* The cluster became unreachable */
260           rt.route = rt.key;
261           rt.cost = config.getMaxPathLength();
262           
263           /*
264            * Mark reachable members of the cluster as unreachable.
265            */
266           cluster.setClusterAsUnreachable();
267 
268         } else if (rt.TTL == TTLwarning) {
269           /*
270            * If the cluster route is quite old set the cost to a less
271            * attractive value (delta warning value).
272            */
273           rt.cost = config.getPathWarning();
274           /*
275            * Set all reachable members of the cluster to a warning
276            * state, and also check for unreachable members.
277            */
278           cluster.setWarning(TTLwarning);
279 
280         } else {
281 
282           /*
283            * Check for previously reachable members of the cluster, and
284            * mark them as unreachable.
285            */
286           cluster.updateReachability();
287 
288         }
289       } // end if (rt.TTL...)
290     } // end for 
291   }
292 
293 
294   ////////////////////////////////////////////////////////////////////////////////////////////
295   // Routing
296   ///////////////////////////////////////////////////////////////////////////////////////////
297 
298   /**
299    *
300    */
301   private void mergeRoute(TopologyEntry newEntry, EndPoint sendingCluster)
302   {
303     log.assertLog(newEntry.routing.cost >= 0, "mergeRoute: negative cost");
304 
305     if (log.isDebugEnabled())
306       log.debug("mergeRoute: start");
307 
308     Cluster cluster = (Cluster) map.get(newEntry.routing.key);
309     log.assertLog(cluster != null, "Cluster from routing entry not known locally");
310     RoutingTableEntry rt = cluster.getRoutingEntry();
311     if (newEntry.routing.cost + 1 < rt.cost) {
312       if (log.isDebugEnabled()) {
313         log.debug("new.cost=" + (newEntry.routing.cost+1) + " < rt.cost=" + rt.cost);
314         log.debug("Found a better route for " + cluster + " --> " + newEntry);
315       }
316     } else if (sendingCluster.equals(rt.route)) {
317       // FIXME: metric for current next-hop may have changed 
318       if (log.isDebugEnabled())
319         log.debug("Sending cluster same as my route: " + sendingCluster + " --> "+ newEntry);
320     } else {
321       if (log.isDebugEnabled())
322         log.debug("mergeRoute: end");
323       return;
324     }
325 
326     /*
327      * Update routing information and cost data for this route
328      */
329     rt.route = sendingCluster;
330     rt.TTL = config.getMaxTTL();
331     int maxPathLength = config.getMaxPathLength();
332     int pathLengthWarning = config.getPathWarning();
333     if (newEntry.routing.cost + 1 <= maxPathLength)
334       rt.cost = newEntry.routing.cost + 1;
335     else
336       rt.cost = maxPathLength;
337 
338     /*
339      * Update the reachability info; currently, only information leading
340      * to new reachable members are considered, without taking into
341      * consideration problems in reachability; these should be detected
342      * by the periodic refresh.
343      */
344     if (rt.cost == maxPathLength) {
345       /*
346        *  Not reachable; do nothing
347        */
348     } else if (rt.cost >= pathLengthWarning)  {
349       /*
350        * The cluster is badly reachable; update the reachability info.
351        */
352       if (log.isDebugEnabled()) {
353         log.warn(cluster + " is badly reachable - cost=" + rt.cost);
354       }
355       for (int j = 0; j < newEntry.reachable.length; j++) {
356         // check if there are new reachable members
357         MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
358         if (!reachable.pingOK(pathLengthWarning)) {
359           // if the member was not reachable before
360           mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
361         }
362       }
363 
364       /*
365        * Set all reachable members of the cluster to the warning state
366        * and update the reachability of other cluster members that may be
367        * unreachable.
368        */
369       cluster.setWarning(config.getTTLWarning());
370 
371     } else {
372       /*
373        * The cluster has good reachability
374        */
375       if (log.isDebugEnabled()) {
376         log.debug(cluster + " is well reachable - cost=" + rt.cost);
377       }
378       for (int j = 0; j < newEntry.reachable.length; j++) {
379         MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
380         // check if there are new reachable members 
381         if (!reachable.isLocal() && !reachable.pingOK()) 
382           // if the member was not reachable before
383           mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
384       }
385     }
386     if (log.isDebugEnabled())
387       log.debug("mergeRoute: end");
388   }
389 
390 
391   ////////////////////////////////////////////////////////////////////////////////////////////
392   // Private methods
393   ////////////////////////////////////////////////////////////////////////////////////////////
394 
395   /**
396    *  Double the capacity of the table
397    */
398   private void doubleCapacity()
399   {
400     Cluster[] newtable = new Cluster[table.length*2];
401     System.arraycopy(table, 0, newtable, 0, table.length);
402     table = newtable;
403   }
404 
405 
406   ////////////////////////////////////////////////////////////////////////////////////////////
407   // Methods from Object
408   ////////////////////////////////////////////////////////////////////////////////////////////
409 
410   /**
411    *  Returns a string representation of this object
412    */
413   public String toString()
414   {
415     StringBuilder buffer = new StringBuilder();
416     buffer.append("[ClusterTable: {");
417     for (int i = 0; i < size; i++) {
418       if (i != 0)
419         buffer.append(",");
420       buffer.append(table[i]);
421     }
422     buffer.append("}]");
423     return buffer.toString();
424   }
425 
426 } // END ClusterTable