1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.util.HashMap;
22 import java.util.Iterator;
23
24 import jgroup.core.ConfigurationException;
25 import jgroup.core.EndPoint;
26 import jgroup.relacs.config.TransportConfig;
27
28 import org.apache.log4j.Logger;
29
30
31
32
33
34
35
36
37 final class ClusterTable
38 implements MssConstants, MssTag
39 {
40
41
42
43
44
45
46 private static Logger log = Logger.getLogger(ClusterTable.class);
47
48
49
50
51
52
53
54 private final static int CAPACITY = 32;
55
56
57
58
59
60
61
62 private MssDS mssds;
63
64
65 private TransportConfig config;
66
67
68 private HashMap map;
69
70
71 private Cluster[] table;
72
73
74 private int size;
75
76
77 private static Cluster me;
78
79
80
81
82
83
84
85
86
87
88 ClusterTable(MssDS mssds, TransportConfig config)
89 {
90 this.mssds = mssds;
91 this.config = config;
92 map = new HashMap();
93 table = new Cluster[CAPACITY];
94 size = 0;
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 void insert(Cluster cluster)
114 throws ConfigurationException
115 {
116 if (cluster == null)
117 throw new NullPointerException();
118 if (map.containsKey(cluster.getEndPoint()))
119 throw new ConfigurationException("Cluster already present in " + cluster.getEndPoint());
120 if (cluster.isLocal()) {
121 if (me == null) {
122 me = cluster;
123 } else {
124 throw new ConfigurationException("Local cluster already defined: " + cluster.getEndPoint());
125 }
126 }
127
128 if (size >= table.length) {
129 doubleCapacity();
130 }
131 table[size++] = cluster;
132 map.put(cluster.getEndPoint(), cluster);
133 }
134
135
136
137
138
139 int size()
140 {
141 return size;
142 }
143
144
145
146
147
148
149
150
151
152
153 Cluster get(int index)
154 {
155 if (index < 0 || index >= size)
156 throw new IndexOutOfBoundsException("Index " + index);
157 return table[index];
158 }
159
160
161
162
163
164
165
166
167 Cluster lookup(EndPoint endpoint)
168 {
169 return (Cluster) map.get(endpoint);
170 }
171
172
173 Iterator iterator()
174 {
175 return map.values().iterator();
176 }
177
178
179
180
181
182
183
184
185 public static Cluster getLocalCluster()
186 {
187 if (me == null)
188 throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
189 return me;
190 }
191
192
193
194
195
196
197 void resetMsgFlow()
198 {
199 for (Iterator iter = map.values().iterator(); iter.hasNext(); ) {
200 Cluster cluster = (Cluster) iter.next();
201 cluster.resetMsgFlow();
202 }
203 }
204
205
206
207
208
209
210 void updateRoutingTable(MsgRouting msg)
211 {
212 TopologyEntry[] topologyTable = msg.getTopologyTable();
213 for (int i=0; i < topologyTable.length; i++) {
214
215
216
217
218
219
220
221 EndPoint sendingCluster = msg.getCluster().getEndPoint();
222 mergeRoute(topologyTable[i], sendingCluster);
223 }
224 }
225
226
227
228
229
230
231
232
233 void update()
234 {
235 if (me == null)
236 throw new IllegalStateException("Local cluster not yet initialized in the ClusterTable.");
237
238
239
240
241
242
243
244
245
246 me.resetRoute();
247
248
249
250
251 for (int i = 0; i < size; i++) {
252 Cluster cluster = table[i];
253
254 RoutingTableEntry rt = cluster.getRoutingEntry();
255 if (rt.TTL != 0) {
256 int TTLwarning = config.getTTLWarning();
257
258 if (--rt.TTL == 0) {
259
260 rt.route = rt.key;
261 rt.cost = config.getMaxPathLength();
262
263
264
265
266 cluster.setClusterAsUnreachable();
267
268 } else if (rt.TTL == TTLwarning) {
269
270
271
272
273 rt.cost = config.getPathWarning();
274
275
276
277
278 cluster.setWarning(TTLwarning);
279
280 } else {
281
282
283
284
285
286 cluster.updateReachability();
287
288 }
289 }
290 }
291 }
292
293
294
295
296
297
298
299
300
301 private void mergeRoute(TopologyEntry newEntry, EndPoint sendingCluster)
302 {
303 log.assertLog(newEntry.routing.cost >= 0, "mergeRoute: negative cost");
304
305 if (log.isDebugEnabled())
306 log.debug("mergeRoute: start");
307
308 Cluster cluster = (Cluster) map.get(newEntry.routing.key);
309 log.assertLog(cluster != null, "Cluster from routing entry not known locally");
310 RoutingTableEntry rt = cluster.getRoutingEntry();
311 if (newEntry.routing.cost + 1 < rt.cost) {
312 if (log.isDebugEnabled()) {
313 log.debug("new.cost=" + (newEntry.routing.cost+1) + " < rt.cost=" + rt.cost);
314 log.debug("Found a better route for " + cluster + " --> " + newEntry);
315 }
316 } else if (sendingCluster.equals(rt.route)) {
317
318 if (log.isDebugEnabled())
319 log.debug("Sending cluster same as my route: " + sendingCluster + " --> "+ newEntry);
320 } else {
321 if (log.isDebugEnabled())
322 log.debug("mergeRoute: end");
323 return;
324 }
325
326
327
328
329 rt.route = sendingCluster;
330 rt.TTL = config.getMaxTTL();
331 int maxPathLength = config.getMaxPathLength();
332 int pathLengthWarning = config.getPathWarning();
333 if (newEntry.routing.cost + 1 <= maxPathLength)
334 rt.cost = newEntry.routing.cost + 1;
335 else
336 rt.cost = maxPathLength;
337
338
339
340
341
342
343
344 if (rt.cost == maxPathLength) {
345
346
347
348 } else if (rt.cost >= pathLengthWarning) {
349
350
351
352 if (log.isDebugEnabled()) {
353 log.warn(cluster + " is badly reachable - cost=" + rt.cost);
354 }
355 for (int j = 0; j < newEntry.reachable.length; j++) {
356
357 MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
358 if (!reachable.pingOK(pathLengthWarning)) {
359
360 mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
361 }
362 }
363
364
365
366
367
368
369 cluster.setWarning(config.getTTLWarning());
370
371 } else {
372
373
374
375 if (log.isDebugEnabled()) {
376 log.debug(cluster + " is well reachable - cost=" + rt.cost);
377 }
378 for (int j = 0; j < newEntry.reachable.length; j++) {
379 MssHost reachable = mssds.hostLookup(newEntry.reachable[j]);
380
381 if (!reachable.isLocal() && !reachable.pingOK())
382
383 mssds.setAsReachable(reachable, newEntry.incarnationId[j]);
384 }
385 }
386 if (log.isDebugEnabled())
387 log.debug("mergeRoute: end");
388 }
389
390
391
392
393
394
395
396
397
398 private void doubleCapacity()
399 {
400 Cluster[] newtable = new Cluster[table.length*2];
401 System.arraycopy(table, 0, newtable, 0, table.length);
402 table = newtable;
403 }
404
405
406
407
408
409
410
411
412
413 public String toString()
414 {
415 StringBuilder buffer = new StringBuilder();
416 buffer.append("[ClusterTable: {");
417 for (int i = 0; i < size; i++) {
418 if (i != 0)
419 buffer.append(",");
420 buffer.append(table[i]);
421 }
422 buffer.append("}]");
423 return buffer.toString();
424 }
425
426 }