1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.util.HashMap;
22 import java.util.Iterator;
23
24 import jgroup.core.ConfigurationException;
25 import jgroup.core.EndPoint;
26 import jgroup.relacs.config.TransportConfig;
27
28 import org.apache.log4j.Logger;
29
30 /**
31 * The <code> ClusterTable </code> class
32 *
33 * @author Salvatore Cammarata
34 * @author Hein Meling
35 * @since Jgroup 1.2
36 */
37 final class ClusterTable
38 implements MssConstants, MssTag
39 {
40
41 ////////////////////////////////////////////////////////////////////////////////////////////
42 // Logger
43 ////////////////////////////////////////////////////////////////////////////////////////////
44
45 /** Obtain logger for this class */
46 private static Logger log = Logger.getLogger(ClusterTable.class);
47
48
49 ////////////////////////////////////////////////////////////////////////////////////////////
50 // Constants
51 ////////////////////////////////////////////////////////////////////////////////////////////
52
53 /** Initial capacity of the table */
54 private final static int CAPACITY = 32;
55
56
57 ////////////////////////////////////////////////////////////////////////////////////////////
58 // Fields
59 ////////////////////////////////////////////////////////////////////////////////////////////
60
61 /** Mss level distributed system configuration */
62 private MssDS mssds;
63
64 /** Configuration parameters */
65 private TransportConfig config;
66
67 /** Hash map containing the clusters */
68 private HashMap map;
69
70 /** Table of clusters */
71 private Cluster[] table;
72
73 /** Actual size of the table */
74 private int size;
75
76 /** This cluster */
77 private static Cluster me;
78
79
80 ////////////////////////////////////////////////////////////////////////////////////////////
81 // Constructors
82 ////////////////////////////////////////////////////////////////////////////////////////////
83
84 /**
85 * Constructs an empty cluster table with an initial
86 * capacity equal to <CODE>CAPACITY</CODE>.
87 */
88 ClusterTable(MssDS mssds, TransportConfig config)
89 {
90 this.mssds = mssds;
91 this.config = config;
92 map = new HashMap();
93 table = new Cluster[CAPACITY];
94 size = 0;
95 }
96
97
98 ////////////////////////////////////////////////////////////////////////////////////////////
99 // Public methods
100 ////////////////////////////////////////////////////////////////////////////////////////////
101
102 /**
103 * Inserts cluster <code>cluster</code> in the table, unless a cluster
104 * with the same key is already in the table.
105 *
106 * @param cluster
107 * The cluster to be inserted.
108 * @exception ConfigurationException
109 * If the <CODE>cluster</CODE> is already in the table.
110 * @exception NullPointerException
111 * If <CODE>cluster</CODE> is null.
112 */
113 void insert(Cluster cluster)
114 throws ConfigurationException
115 {
116 if (cluster == null)
117 throw new NullPointerException();
118 if (map.containsKey(cluster.getEndPoint()))
119 throw new ConfigurationException("Cluster already present in " + cluster.getEndPoint());
120 if (cluster.isLocal()) {
121 if (me == null) {
122 me = cluster;
123 } else {
124 throw new ConfigurationException("Local cluster already defined: " + cluster.getEndPoint());
125 }
126 }
127 /* If space in the table has been exhausted, we double it */
128 if (size >= table.length) {
129 doubleCapacity();
130 }
131 table[size++] = cluster;
132 map.put(cluster.getEndPoint(), cluster);
133 }
134
135
136 /**
137 * Returns the number of cluster contained in the cluster table.
138 */
139 int size()
140 {
141 return size;
142 }
143
144
145 /**
146 * Returns the <CODE>index</CODE>-th cluster maintained in the cluster
147 * table.
148 *
149 * @param index the index of the cluster to be returned
150 * @exception IndexOutOfBoundException if <CODE>index</CODE> is not a
151 * valid index
152 */
153 Cluster get(int index)
154 {
155 if (index < 0 || index >= size)
156 throw new IndexOutOfBoundsException("Index " + index);
157 return table[index];
158 }
159
160
161 /**
162 * Returns the cluster reference to which this endpoint is mapped.
163 *
164 * @param EndPoint endpoint
165 * @return the cluster reference to which this endpoint is mapped.
166 */
167 Cluster lookup(EndPoint endpoint)
168 {
169 return (Cluster) map.get(endpoint);
170 }
171
172
173 Iterator iterator()
174 {
175 return map.values().iterator();
176 }
177
178
179 /**
180 * Returns the local cluster descriptor.
181 *
182 * @throws IllegalStateException
183 * Raised if the local cluster has not yet been initialized.
184 */
185 public static Cluster getLocalCluster()
186 {
187 if (me == null)
188 throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
189 return me;
190 }
191
192
193 /**
194 * Reset the message flow of all clusters in this
195 * <code>ClusterTable</code>.
196 */
197 void resetMsgFlow()
198 {
199 for (Iterator iter = map.values().iterator(); iter.hasNext(); ) {
200 Cluster cluster = (Cluster) iter.next();
201 cluster.resetMsgFlow();
202 }
203 }
204
205
206 /**
207 * Update the routing table, based on the information stored in the
208 * routing message.
209 */
210 void updateRoutingTable(MsgRouting msg)
211 {
212 TopologyEntry[] topologyTable = msg.getTopologyTable();
213 for (int i=0; i < topologyTable.length; i++) {
214 /*
215 * Note that the sending cluster can be obtained from the message,
216 * even though the ROUTING message has been forwarded through an
217 * intermediate host. The forwarding mechanism, preserves the
218 * sender field of the message, changing only the message tag to
219 * FWDROUTING.
220 */
221 EndPoint sendingCluster = msg.getCluster().getEndPoint();
222 mergeRoute(topologyTable[i], sendingCluster);
223 }
224 }
225
226
227 /**
228 * Updates the reachability state of the members of each cluster
229 * according to the route information. Discards any entry with TTL
230 * equal to zero. If the TTL is equal to the TTLwarning threshold
231 * the cost is increased in order to make is less appetitible.
232 */
233 void update()
234 {
235 if (me == null)
236 throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
237
238 /*
239 * To ensure that the route for the local cluster is always correct,
240 * we reset it for every update. This should actually not be needed
241 * for every update (assuming that the route does not change for the
242 * wrong reason), since the route for the local cluster should
243 * never change. FIXME HEIN: Move this method to insert() above, and
244 * do a complete test to ensure that the local route never changes.
245 */
246 me.resetRoute();
247
248 /*
249 * Update the route and reachability information for each cluster.
250 */
251 for (int i = 0; i < size; i++) {
252 Cluster cluster = table[i];
253 //FIXME HEIN: move the code below to a new method in Cluster.
254 RoutingTableEntry rt = cluster.getRoutingEntry();
255 if (rt.TTL != 0) {
256 int TTLwarning = config.getTTLWarning();
257
258 if (--rt.TTL == 0) {
259 /* The cluster became unreachable */
260 rt.route = rt.key;
261 rt.cost = config.getMaxPathLength();
262
263 /*
264 * Mark reachable members of the cluster as unreachable.
265 */
266 cluster.setClusterAsUnreachable();
267
268 } else if (rt.TTL == TTLwarning) {
269 /*
270 * If the cluster route is quite old set the cost to a less
271 * attractive value (delta warning value).
272 */
273 rt.cost = config.getPathWarning();
274 /*
275 * Set all reachable members of the cluster to a warning
276 * state, and also check for unreachable members.
277 */
278 cluster.setWarning(TTLwarning);
279
280 } else {
281
282 /*
283 * Check for previously reachable members of the cluster, and
284 * mark them as unreachable.
285 */
286 cluster.updateReachability();
287
288 }
289 } // end if (rt.TTL...)
290 } // end for
291 }
292
293
294 ////////////////////////////////////////////////////////////////////////////////////////////
295 // Routing
296 ///////////////////////////////////////////////////////////////////////////////////////////
297
298 /**
299 *
300 */
301 private void mergeRoute(TopologyEntry newEntry, EndPoint sendingCluster)
302 {
303 log.assertLog(newEntry.routing.cost >= 0, "mergeRoute: negative cost");
304
305 if (log.isDebugEnabled())
306 log.debug("mergeRoute: start");
307
308 Cluster cluster = (Cluster) map.get(newEntry.routing.key);
309 log.assertLog(cluster != null, "Cluster from routing entry not known locally");
310 RoutingTableEntry rt = cluster.getRoutingEntry();
311 if (newEntry.routing.cost + 1 < rt.cost) {
312 if (log.isDebugEnabled()) {
313 log.debug("new.cost=" + (newEntry.routing.cost+1) + " < rt.cost=" + rt.cost);
314 log.debug("Found a better route for " + cluster + " --> " + newEntry);
315 }
316 } else if (sendingCluster.equals(rt.route)) {
317 // FIXME: metric for current next-hop may have changed
318 if (log.isDebugEnabled())
319 log.debug("Sending cluster same as my route: " + sendingCluster + " --> "+ newEntry);
320 } else {
321 if (log.isDebugEnabled())
322 log.debug("mergeRoute: end");
323 return;
324 }
325
326 /*
327 * Update routing information and cost data for this route
328 */
329 rt.route = sendingCluster;
330 rt.TTL = config.getMaxTTL();
331 int maxPathLength = config.getMaxPathLength();
332 int pathLengthWarning = config.getPathWarning();
333 if (newEntry.routing.cost + 1 <= maxPathLength)
334 rt.cost = newEntry.routing.cost + 1;
335 else
336 rt.cost = maxPathLength;
337
338 /*
339 * Update the reachability info; currently, only information leading
340 * to new reachable members are considered, without taking into
341 * consideration problems in reachability; these should be detected
342 * by the periodic refresh.
343 */
344 if (rt.cost == maxPathLength) {
345 /*
346 * Not reachable; do nothing
347 */
348 } else if (rt.cost >= pathLengthWarning) {
349 /*
350 * The cluster is badly reachable; update the reachability info.
351 */
352 if (log.isDebugEnabled()) {
353 log.warn(cluster + " is badly reachable - cost=" + rt.cost);
354 }
355 for (int j = 0; j < newEntry.reachable.length; j++) {
356 // check if there are new reachable members
357 MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
358 if (!reachable.pingOK(pathLengthWarning)) {
359 // if the member was not reachable before
360 mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
361 }
362 }
363
364 /*
365 * Set all reachable members of the cluster to the warning state
366 * and update the reachability of other cluster members that may be
367 * unreachable.
368 */
369 cluster.setWarning(config.getTTLWarning());
370
371 } else {
372 /*
373 * The cluster has good reachability
374 */
375 if (log.isDebugEnabled()) {
376 log.debug(cluster + " is well reachable - cost=" + rt.cost);
377 }
378 for (int j = 0; j < newEntry.reachable.length; j++) {
379 MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
380 // check if there are new reachable members
381 if (!reachable.isLocal() && !reachable.pingOK())
382 // if the member was not reachable before
383 mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
384 }
385 }
386 if (log.isDebugEnabled())
387 log.debug("mergeRoute: end");
388 }
389
390
391 ////////////////////////////////////////////////////////////////////////////////////////////
392 // Private methods
393 ////////////////////////////////////////////////////////////////////////////////////////////
394
395 /**
396 * Double the capacity of the table
397 */
398 private void doubleCapacity()
399 {
400 Cluster[] newtable = new Cluster[table.length*2];
401 System.arraycopy(table, 0, newtable, 0, table.length);
402 table = newtable;
403 }
404
405
406 ////////////////////////////////////////////////////////////////////////////////////////////
407 // Methods from Object
408 ////////////////////////////////////////////////////////////////////////////////////////////
409
410 /**
411 * Returns a string representation of this object
412 */
413 public String toString()
414 {
415 StringBuilder buffer = new StringBuilder();
416 buffer.append("[ClusterTable: {");
417 for (int i = 0; i < size; i++) {
418 if (i != 0)
419 buffer.append(",");
420 buffer.append(table[i]);
421 }
422 buffer.append("}]");
423 return buffer.toString();
424 }
425
426 } // END ClusterTable