1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23
24 import jgroup.core.ConfigurationException;
25 import jgroup.core.EndPoint;
26 import jgroup.relacs.config.DistributedSystemConfig;
27 import jgroup.relacs.config.Domain;
28 import jgroup.relacs.config.DomainSet;
29 import jgroup.relacs.config.HostSet;
30 import jgroup.relacs.config.TransportConfig;
31
32 import org.apache.log4j.Logger;
33
34
35
36
37
38
39
40
41
42
43
44
45
46 final class MssDS
47 implements MssConstants, MssTag
48 {
49
50
51
52
53
54
55 private static final Logger log = Logger.getLogger(MssDS.class);
56
57
58
59
60
61
62
63 private TransportConfig tconf;
64
65
66 private HostTable hosttable;
67
68
69 private ClusterTable clustertable;
70
71
72 private RoutingTable routingtable;
73
74
75 private DSView view;
76
77
78 private DSView upperview;
79
80
81 private NI ni;
82
83
84
85
86
87
88
89
90
91
92 MssDS(EventHandler ehandler, DistributedSystemConfig dsc, TransportConfig tconf)
93 throws IOException, ConfigurationException
94 {
95
96 this.tconf = tconf;
97 ni = new NI(ehandler, tconf.getPayload() + OVERHEAD_SIZE, tconf.getMulticastTTL());
98
99
100
101
102 hosttable = new HostTable();
103 clustertable = new ClusterTable(this, tconf);
104
105
106
107
108
109 DomainSet domains = dsc.getDomainSet();
110 for (Iterator i = domains.iterator(); i.hasNext(); ) {
111 Domain domain = (Domain) i.next();
112
113
114
115
116
117 Cluster cluster = new Cluster(this, tconf, domain, ni);
118 clustertable.insert(cluster);
119
120 HostSet hosts = domain.getHostSet();
121 for (Iterator j = hosts.iterator(); j.hasNext(); ) {
122 EndPoint hostEndPoint = (EndPoint) j.next();
123
124
125
126
127 MssHost host = new MssHost(tconf, hostEndPoint, ni, cluster, ehandler);
128 cluster.insertMember(host);
129 hosttable.insert(host);
130 }
131 }
132
133
134 MssHost me = HostTable.getLocalHost();
135
136
137 clustertable.resetMsgFlow();
138 hosttable.resetMsgFlow();
139
140
141 routingtable = new RoutingTable(clustertable, tconf.getMaxTTL() - 1);
142
143
144 view = new DSView(hosttable, me);
145 upperview = new DSView(hosttable, me);
146 }
147
148
149
150
151
152
153
154
155
156 DSView getView()
157 {
158 return view;
159 }
160
161
162
163
164 DSView getUpperView()
165 {
166 return upperview;
167 }
168
169
170
171
172 void setControl(MsgCntrl control)
173 {
174 for (Iterator iter = clustertable.iterator(); iter.hasNext(); ) {
175 Cluster cluster = (Cluster) iter.next();
176 cluster.setControl(control);
177 }
178 }
179
180
181
182
183 ClusterTable getClusterTable()
184 {
185 return clustertable;
186 }
187
188
189
190
191
192 HostTable getHostTable()
193 {
194 return hosttable;
195 }
196
197
198
199
200
201 MssHost hostLookup(EndPoint endpoint)
202 {
203 return hosttable.lookup(endpoint);
204 }
205
206
207
208
209 Cluster clusterLookup(EndPoint endpoint)
210 {
211 return clustertable.lookup(endpoint);
212 }
213
214
215
216
217 void doStart()
218 {
219 ni.doStart();
220 }
221
222
223
224
225
226
227 int size()
228 {
229 return hosttable.size();
230 }
231
232
233
234
235
236
237 int numOfClusters()
238 {
239 return clustertable.size();
240 }
241
242
243
244
245
246
247
248 RoutingTable getRoutingTable()
249 {
250 boolean changed = routingtable.updateReachability();
251 if (log.isDebugEnabled()) {
252 if (changed)
253 log.debug("Reachability has changed");
254 log.debug(routingtable);
255 }
256 return routingtable;
257 }
258
259
260
261
262
263
264 void updateRoutingTable(MsgRouting msg)
265 {
266 if (log.isDebugEnabled()) {
267 log.debug(msg);
268
269 log.debug(routingtable);
270 }
271 view.clear();
272 clustertable.updateRoutingTable(msg);
273
274
275
276
277
278
279 }
280
281
282
283
284
285
286 void updateReachability(MsgIamAlive msg)
287 {
288 if (log.isDebugEnabled()) {
289 log.debug(msg);
290
291 log.debug(routingtable);
292 }
293 view.clear();
294 MssHost sender = msg.getSender();
295
296 if (!sender.isLocal() && !sender.pingOK()) {
297
298 setAsReachable(sender, msg.getIncarnationId());
299 }
300
301
302
303
304
305
306 }
307
308 void update()
309 {
310 if (log.isDebugEnabled()) {
311
312 log.debug(routingtable);
313 }
314 view.clear();
315 clustertable.update();
316
317
318
319
320
321
322 }
323
324
325
326
327
328
329
330
331
332
333
334
335 boolean hasNewIncarnation(FragmentHeader header)
336 {
337 view.clear();
338 MssHost sender = header.getSender();
339 if (sender.isReachable() && sender.getIncarnationId() != header.getIncarnationId()) {
340 setAsUnreachable(sender);
341 return true;
342 }
343 return false;
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357 boolean checkIncarnation(MsgRouting msg)
358 {
359
360 boolean newInc = false;
361
362 view.clear();
363 TopologyEntry[] table = msg.getTopologyTable();
364 for (int i = 0; i < table.length; i++) {
365 for (int j = 0; j < table[i].reachable.length; j++) {
366 MssHost member = hostLookup(table[i].reachable[j]);
367 if (!member.isLocal() && member.isReachable() &&
368 member.getIncarnationId() != table[i].incarnationId[j]) {
369 setAsUnreachable(member);
370 newInc = true;
371 }
372 }
373 }
374 return newInc;
375 }
376
377
378
379
380
381
382
383
384
385
386 void setAsUnreachable(MssHost member)
387 {
388 if (log.isDebugEnabled())
389 log.debug("setAsUnreachable: start " + member);
390
391 member.setAsUnreachable();
392 Cluster cluster = member.getCluster();
393 cluster.decrementReachableCounter();
394 view.setAsUnreachable(member);
395 upperview.setAsUnreachable(member);
396
397
398
399
400
401
402 if (!cluster.isReachable()) {
403 if (!cluster.isLocal()) {
404 cluster.resetMsgFlow(UNDEF);
405 if (log.isDebugEnabled()) {
406 log.debug("setAsUnreachable: reset Msg Flow of " + cluster);
407 }
408 } else {
409 throw new IllegalStateException("Local cluster unreachable!");
410 }
411 }
412
413
414
415
416
417 if (cluster.isLocal()) {
418 for (Iterator iter = hosttable.iterator(); iter.hasNext(); ) {
419 MssHost host = (MssHost) iter.next();
420 host.getMsgFlow().clusterWindow.set(member.getClusterIndex(), UNDEF);
421 }
422 }
423
424
425 member.resetMsgFlow();
426 if (log.isDebugEnabled()) {
427 log.debug("setAsUnreachable: reset Msg Flow of " + member);
428 }
429
430 if (log.isDebugEnabled())
431 log.debug("setAsUnreachable: end");
432 }
433
434
435 void setAsReachable(MssHost member, int inc)
436 {
437 if (log.isDebugEnabled())
438 log.debug("setAsReachable: start " + member + " " + inc);
439
440 Cluster cluster = member.getCluster();
441 cluster.incrementReachableCounter();
442 view.setAsReachable(member);
443
444
445
446
447
448
449
450
451
452
453
454
455 if (member.getIncarnationId() != inc) {
456 view.setNewIncarnation(member);
457 upperview.setNewIncarnation(member);
458 member.setIncarnationId(inc);
459 }
460 if (log.isDebugEnabled())
461 log.debug("setAsReachable: end");
462 }
463
464
465
466
467
468
469
470
471 FCEntry[] getClusterFCEntry(EndPoint key)
472 {
473 Cluster cluster = clustertable.lookup(key);
474 if (cluster == null) {
475 log.debug("clustertable: "+ clustertable);
476 log.warn("No cluster exists in cluster table for endpoint: "+ key);
477 }
478
479 FCEntry[] fc = new FCEntry[cluster.size()];
480 int i = 0;
481 for (Iterator iter = cluster.iterator(); iter.hasNext(); ) {
482 MssHost host = (MssHost) iter.next();
483 fc[i++] = new FCEntry(host);
484 }
485 return fc;
486 }
487
488
489
490
491
492
493
494
495 FCEntry[] getAllFCEntry()
496 {
497 int size = hosttable.size();
498 FCEntry[] ret = new FCEntry[size];
499 for (int i=0; i < size; i++) {
500 MssHost host = hosttable.get(i);
501 ret[i] = new FCEntry(host.getEndPoint(), host.getMsgFlow().getLastMsgDlvr());
502 }
503 return ret;
504 }
505
506
507
508
509
510
511
512
513
514 public String toString()
515 {
516 return "[MssDS:" + hosttable + ", "+ clustertable + "]";
517 }
518
519 }