1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Random;
25
26 import jgroup.core.ConfigurationException;
27 import jgroup.core.EndPoint;
28 import jgroup.relacs.config.Domain;
29 import jgroup.relacs.config.TransportConfig;
30
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 final class Cluster
47 implements MssConstants
48 {
49
50
51
52
53
54
55 private static final Logger log = Logger.getLogger(Cluster.class);
56
57
58
59
60
61
62
63 private static final Random rnd = new Random();
64
65
66
67
68
69 private final static MssHost[] NULL_MSS_HOST_ARRAY = new MssHost[0];
70
71
72
73
74
75
76
77 private MssDS mssds;
78
79
80 private TransportConfig config;
81
82
83 private RoutingTableEntry rt;
84
85
86 private NI ni;
87
88
89 private final List<MssHost> members;
90
91
92 private boolean local;
93
94
95 private int reachableCounter;
96
97
98 private MsgFlowSndrSide msgFlow;
99
100 private MsgCntrl msgCntrl;
101
102
103
104
105
106 private final List<EndPoint> receivers;
107
108
109
110
111
112
113
114
115
116 Cluster(MssDS mssds, TransportConfig config, Domain domain, NI ni)
117 {
118
119 this.mssds = mssds;
120 this.ni = ni;
121 this.config = config;
122 local = domain.isLocal();
123 reachableCounter = (local ? 1 : 0);
124 int cost = (local ? 0 : config.getMaxPathLength());
125
126
127
128
129
130
131
132 rt = new RoutingTableEntry(domain.getEndpoint(), domain.getEndpoint(), cost);
133
134
135 members = new ArrayList<MssHost>(domain.size());
136 receivers = new ArrayList<EndPoint>(domain.size());
137
138
139 msgFlow= new MsgFlowSndrSide(this, config);
140
141 if (log.isDebugEnabled())
142 log.debug("Cluster:<init>: " + rt.key);
143 }
144
145
146
147
148
149
150
151
152
153 EndPoint getEndPoint()
154 {
155 return rt.key;
156 }
157
158
159 String getName()
160 {
161 return rt.key.getAddress().getHostName();
162 }
163
164
165 MsgCntrl getControl()
166 {
167 return msgCntrl;
168 }
169
170
171 void setControl(MsgCntrl msgCntrl)
172 {
173 this.msgCntrl = msgCntrl;
174 }
175
176
177
178
179
180 RoutingTableEntry getRoutingEntry()
181 {
182 return rt;
183 }
184
185
186
187
188
189
190
191
192
193 void resetRoute()
194 {
195 rt.cost = 0;
196 rt.TTL = config.getMaxTTL();
197 rt.route = rt.key;
198 }
199
200
201
202
203
204 boolean isLocal()
205 {
206 return local;
207 }
208
209
210
211
212
213
214 MsgFlowSndrSide getMsgFlow()
215 {
216 return msgFlow;
217 }
218
219
220
221
222
223 int size()
224 {
225 return members.size();
226 }
227
228
229
230
231
232 Iterator<MssHost> iterator()
233 {
234 return members.iterator();
235 }
236
237
238
239
240
241 void insertMember(MssHost member)
242 throws ConfigurationException
243 {
244 if (members.contains(member))
245 throw new ConfigurationException("Trying to insert a previously inserted member in cluster");
246 else {
247 member.setClusterIndex(members.size());
248 members.add(member);
249 }
250 }
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267 EndPoint getLeaderEndPoint(int threshold)
268 {
269 int size = size();
270 List<EndPoint> candidates = new ArrayList<EndPoint>(size);
271 for (MssHost candidate : members) {
272 if (candidate.isReachable(threshold))
273 candidates.add(candidate.getEndPoint());
274 }
275 return (candidates.isEmpty() ? getEndPoint()
276 : (EndPoint) candidates.get(rnd.nextInt(candidates.size())));
277 }
278
279
280
281
282
283
284
285
286
287 EndPoint getLeaderEndPoint()
288 {
289 return getLeaderEndPoint(config.getReachabilityThreshold());
290 }
291
292
293
294
295
296
297 void setWarning(int TTLwarning)
298 {
299 for (MssHost host : members) {
300 if (host.isReachable(TTLwarning)) {
301 host.pingOK(TTLwarning);
302 } else if (!host.pingKO() && host.wasReachable()) {
303
304 mssds.setAsUnreachable(host);
305 }
306 }
307 }
308
309
310
311
312
313
314 void updateReachability()
315 {
316 for (MssHost host : members) {
317 if (!host.isLocal() && host.isReachable()) {
318 host.printReachParams();
319 }
320 if (!host.isLocal() && !host.pingKO() && host.wasReachable()) {
321
322 mssds.setAsUnreachable(host);
323 }
324 }
325 }
326
327
328
329
330
331
332 void setClusterAsUnreachable()
333 {
334 for (MssHost host : members) {
335 if (host.wasReachable()) {
336
337 mssds.setAsUnreachable(host);
338 }
339 }
340 }
341
342
343 MssHost[] getReachable(int threshold)
344 {
345 int size = members.size();
346 List<MssHost> reachable = new ArrayList<MssHost>(size);
347 for (MssHost host : members) {
348 if (host.isReachable(threshold))
349 reachable.add(host);
350 }
351 return (MssHost[]) reachable.toArray(NULL_MSS_HOST_ARRAY);
352 }
353
354
355 MssHost[] getReachable()
356 {
357 return getReachable(MINIMUM_ALIVE_VALUE);
358 }
359
360
361 void incrementReachableCounter()
362 {
363 reachableCounter++;
364 }
365
366
367 void decrementReachableCounter()
368 {
369 reachableCounter--;
370 }
371
372
373 int getReachableCounter()
374 {
375 return reachableCounter;
376 }
377
378
379 boolean isReachable()
380 {
381 return (reachableCounter != 0);
382 }
383
384
385
386
387
388
389
390
391 boolean directlyConnected()
392 {
393 return (rt.key.equals(rt.route));
394 }
395
396
397
398
399
400
401 private Cluster getRouteCluster()
402 {
403 return mssds.clusterLookup(rt.route);
404 }
405
406
407
408
409
410
411 void resetMsgFlow()
412 {
413 msgFlow.reset(rnd.nextInt());
414 }
415
416
417
418
419
420
421 void resetMsgFlow(int mid)
422 {
423 msgFlow.reset(mid);
424 }
425
426
427
428
429
430
431 void addReceiver(EndPoint member)
432 {
433 receivers.add(member);
434 }
435
436 void clearReceivers()
437 {
438 receivers.clear();
439 }
440
441 boolean hasReceivers()
442 {
443 return !(receivers.isEmpty());
444 }
445
446
447
448
449
450
451
452
453
454 void forward(byte[] buffer, int bufLen)
455 {
456 if (local) {
457
458
459
460 ni.send(buffer, bufLen);
461 if (log.isDebugEnabled())
462 log.debug("Forwarded fragment to local cluster");
463
464 } else {
465
466
467
468
469
470
471 EndPoint leader = getLeaderEndPoint();
472 if (!leader.isMulticastEndPoint()) {
473
474
475
476
477
478
479
480 ni.send(leader, buffer, bufLen);
481 if (log.isDebugEnabled())
482 log.debug("Forwarded fragment to remote cluster through leader: " + leader);
483
484 } else {
485
486
487
488
489
490 if (receivers.isEmpty()) {
491
492 for (MssHost host : members) {
493 ni.send(host.getEndPoint(), buffer, bufLen);
494 if (log.isDebugEnabled())
495 log.debug("Forwarded fragment to receiver " + host);
496 }
497 } else {
498
499 for (EndPoint recv : receivers) {
500 ni.send(recv, buffer, bufLen);
501 if (log.isDebugEnabled())
502 log.debug("Forwarded fragment to receiver " + recv);
503 }
504 }
505 }
506 }
507 }
508
509
510
511
512
513
514
515
516
517
518
519 void resend(FragmentIterator fragIter, int fragId)
520 {
521 MsgJG msgjg = (MsgJG) fragIter.getMsg();
522 EndPoint[] rcvrs = msgjg.getReceivers();
523
524
525
526
527
528 for (int i = 0; i < rcvrs.length; i++) {
529 MssHost host = mssds.hostLookup(rcvrs[i]);
530 if (!rcvrs[i].isLocal() && host.isIn(this)) {
531 addReceiver(rcvrs[i]);
532 if (log.isDebugEnabled()) {
533 log.debug("Added host to the destination list: " + host);
534 }
535 } else if (log.isDebugEnabled()) {
536 log.debug("Removed host from the destination list: " + host);
537 }
538 }
539
540
541
542
543
544 msgjg.setCluster(this);
545 msgjg.setReceivers(receivers);
546
547
548
549
550
551
552 byte[] msgFragment = fragIter.getFragment(fragId);
553 FragmentHeader.marshal(false, fragId, msgFragment);
554
555
556
557
558
559 forward(msgFragment, fragIter.fragmentLength());
560 }
561
562
563
564
565
566
567
568 void send(MsgJG msgjg)
569 {
570
571
572
573
574 msgjg.setCluster(this);
575 msgjg.setReceivers(receivers);
576
577 if (ROUTING_ENABLED) {
578
579
580
581
582
583 if (!directlyConnected()) {
584 if (log.isDebugEnabled()) {
585 log.debug("Stack trace", new Exception());
586 log.debug("localCluster=" + ClusterTable.getLocalCluster() + ", Route=" + rt);
587 }
588
589
590
591
592
593 getRouteCluster().send((Msg) msgjg);
594 }
595
596 } else {
597
598 send((Msg) msgjg);
599
600 }
601 }
602
603
604
605
606
607
608
609
610
611
612 void send(Msg msg)
613 {
614
615
616 if (local) {
617
618
619
620 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
621
622 byte[] buffer = frag.next(false);
623 int bufLen = frag.fragmentLength();
624
625 if (log.isDebugEnabled())
626 log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster " + rt.key);
627 ni.send(buffer, bufLen);
628 }
629
630 } else {
631
632
633
634
635
636
637
638 EndPoint leader = getLeaderEndPoint();
639 if (!leader.isMulticastEndPoint()) {
640
641
642
643
644
645
646
647
648 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
649
650 byte[] buffer = frag.next(true);
651 int bufLen = frag.fragmentLength();
652
653 if (log.isDebugEnabled())
654 log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster leader " + leader);
655 ni.send(leader, buffer, bufLen);
656 }
657
658 } else {
659
660
661
662
663
664
665
666 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
667
668 byte[] buffer = frag.next(false);
669 int bufLen = frag.fragmentLength();
670
671
672
673
674
675 if (receivers.isEmpty()) {
676
677 for (MssHost host : members) {
678 if (log.isDebugEnabled())
679 log.debug("Sending msg fragment (" + frag.getFid() + ") to member " + host);
680 ni.send(host.getEndPoint(), buffer, bufLen);
681 }
682 } else {
683
684 for (EndPoint recv : receivers) {
685 if (log.isDebugEnabled())
686 log.debug("Sending msg fragment (" + frag.getFid() + ") to receiver " + recv);
687 ni.send(recv, buffer, bufLen);
688 }
689 }
690
691 }
692 }
693 }
694 }
695
696
697
698
699
700
701
702
703
704 public int hashCode()
705 {
706 return rt.key.hashCode();
707 }
708
709
710
711
712
713 public String toString()
714 {
715 StringBuilder buf = new StringBuilder();
716 buf.append("[Cluster: ");
717 buf.append(rt.key);
718 buf.append("]");
719 return buf.toString();
720 }
721
722
723
724
725
726
727
728
729
730
731 public boolean equals(Object obj)
732 {
733 if (!(obj instanceof Cluster))
734 return false;
735 return this.rt.equals(((Cluster) obj).rt);
736 }
737
738 }