1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Random;
25
26 import jgroup.core.ConfigurationException;
27 import jgroup.core.EndPoint;
28 import jgroup.relacs.config.Domain;
29 import jgroup.relacs.config.TransportConfig;
30
31 import org.apache.log4j.Logger;
32
33
34 /**
35 * The <code> Cluster </code> class
36 *
37 * FIXME HEIN: This class should contain a SortedMap of members in the
38 * cluster, sorted according to each members reachability value. in
39 * addition there should be a sub-view of this map, reflecting those
40 * deemed as reachable.
41 *
42 * @author Salvatore Camarrata
43 * @author Hein Meling
44 * @since Jgroup 0.5
45 */
46 final class Cluster
47 implements MssConstants
48 {
49
50 ////////////////////////////////////////////////////////////////////////////////////////////
51 // Logger
52 ////////////////////////////////////////////////////////////////////////////////////////////
53
54 /** Obtain logger for this class */
55 private static final Logger log = Logger.getLogger(Cluster.class);
56
57
58 ////////////////////////////////////////////////////////////////////////////////////////////
59 // Static section
60 ////////////////////////////////////////////////////////////////////////////////////////////
61
62 /** Random generator */
63 private static final Random rnd = new Random();
64
65 /**
66 * Constant null array to be returned for the case of no reachable
67 * hosts (object reuse). See Item 27 in Effective Java.
68 */
69 private final static MssHost[] NULL_MSS_HOST_ARRAY = new MssHost[0];
70
71
72 ///////////////////////////////////////////////////////////////////////
73 // Fields
74 ///////////////////////////////////////////////////////////////////////
75
76 /** The distributed system configuration */
77 private MssDS mssds;
78
79 /** Configuration parameters */
80 private TransportConfig config;
81
82 /** EndPoints */
83 private RoutingTableEntry rt;
84
85 /** Data link layer */
86 private NI ni;
87
88 /** Hosts that are member of the cluster */
89 private final List<MssHost> members;
90
91 /** True if this object is the local cluster */
92 private boolean local;
93
94 /** Number of members in the cluster that are reachable */
95 private int reachableCounter;
96
97 /** Message Flow Layer */
98 private MsgFlowSndrSide msgFlow;
99
100 private MsgCntrl msgCntrl;
101
102 /**
103 * Message receivers associated with this cluster, when the message
104 * is not addressed to all members of the cluster.
105 */
106 private final List<EndPoint> receivers;
107
108
109 ///////////////////////////////////////////////////////////////////////
110 // Constructors
111 ///////////////////////////////////////////////////////////////////////
112
113 /**
114 * Builds a cluster identified by <code>domain</code>.
115 */
116 Cluster(MssDS mssds, TransportConfig config, Domain domain, NI ni)
117 {
118 /* Copy initialization parameters */
119 this.mssds = mssds;
120 this.ni = ni;
121 this.config = config;
122 local = domain.isLocal();
123 reachableCounter = (local ? 1 : 0);
124 int cost = (local ? 0 : config.getMaxPathLength());
125
126 /*
127 * Note that when marshalling the routing table entry, it will use
128 * the marshalling of <code>EndPointImpl</code> since currently the
129 * <code>Domain</code> class do not need any marshalling. Further,
130 * note that EndPointImpl do not detect local clusters correctly.
131 */
132 rt = new RoutingTableEntry(domain.getEndpoint(), domain.getEndpoint(), cost);
133
134 /* Initialize list of members */
135 members = new ArrayList<MssHost>(domain.size());
136 receivers = new ArrayList<EndPoint>(domain.size());
137
138 /* Flow msg management */
139 msgFlow= new MsgFlowSndrSide(this, config);
140
141 if (log.isDebugEnabled())
142 log.debug("Cluster:<init>: " + rt.key);
143 }
144
145
146 ///////////////////////////////////////////////////////////////////////
147 // Public methods
148 ///////////////////////////////////////////////////////////////////////
149
150 /**
151 * Returns the endpoint associated to this cluster.
152 */
153 EndPoint getEndPoint()
154 {
155 return rt.key;
156 }
157
158
159 String getName()
160 {
161 return rt.key.getAddress().getHostName();
162 }
163
164
165 MsgCntrl getControl()
166 {
167 return msgCntrl;
168 }
169
170
171 void setControl(MsgCntrl msgCntrl)
172 {
173 this.msgCntrl = msgCntrl;
174 }
175
176
177 /**
178 *
179 */
180 RoutingTableEntry getRoutingEntry()
181 {
182 return rt;
183 }
184
185
186 /**
187 * Reset the routing entry for this cluster. This should only be used
188 * for the local cluster. <p>
189 *
190 * It is used by the clustertable update method, and is required to
191 * ensure that remote failures are detected.
192 */
193 void resetRoute()
194 {
195 rt.cost = 0;
196 rt.TTL = config.getMaxTTL();
197 rt.route = rt.key;
198 }
199
200
201 /**
202 * Returns true if this is the local cluster.
203 */
204 boolean isLocal()
205 {
206 return local;
207 }
208
209
210 /**
211 * Returns the <CODE>MsgFlowSndrSide</CODE> object associated to
212 * this host.
213 */
214 MsgFlowSndrSide getMsgFlow()
215 {
216 return msgFlow;
217 }
218
219
220 /**
221 * Returns the number of hosts in this cluster.
222 */
223 int size()
224 {
225 return members.size();
226 }
227
228
229 /**
230 * Returns and iterator over the hosts in this cluster.
231 */
232 Iterator<MssHost> iterator()
233 {
234 return members.iterator();
235 }
236
237
238 /**
239 *
240 */
241 void insertMember(MssHost member)
242 throws ConfigurationException
243 {
244 if (members.contains(member))
245 throw new ConfigurationException("Trying to insert a previously inserted member in cluster");
246 else {
247 member.setClusterIndex(members.size());
248 members.add(member);
249 }
250 }
251
252
253 /**
254 * Returns the endpoint of a host included in this cluster whose
255 * reachability value is greater than the given threshold; returns
256 * the cluster endpoint if no hosts in this cluster has a
257 * reachability value above the given threshold.
258 *
259 * The leader is selected randomly among the endpoints in the cluster
260 * whose reachability value is greater than the threshold.
261 *
262 * FIXME POSSIBLE OPTIMIZATION: If the hosts were stored in a
263 * SortedMap (on a more permanent basis), we could use the sorted
264 * order to determine the range from which the random number should
265 * be picked.
266 */
267 EndPoint getLeaderEndPoint(int threshold)
268 {
269 int size = size();
270 List<EndPoint> candidates = new ArrayList<EndPoint>(size);
271 for (MssHost candidate : members) {
272 if (candidate.isReachable(threshold))
273 candidates.add(candidate.getEndPoint());
274 }
275 return (candidates.isEmpty() ? getEndPoint()
276 : (EndPoint) candidates.get(rnd.nextInt(candidates.size())));
277 }
278
279
280 /**
281 * Returns the endpoint of a host included in this cluster whose
282 * reachability value is greater than threshold obtained from the
283 * transport configuration object; returns the cluster endpoint if no
284 * hosts in this cluster has a reachability value above the
285 * associated threshold.
286 */
287 EndPoint getLeaderEndPoint()
288 {
289 return getLeaderEndPoint(config.getReachabilityThreshold());
290 }
291
292
293 /**
294 * Set all reachable members of the cluster to the given reachability
295 * warning level, and check if there are unreachable members.
296 */
297 void setWarning(int TTLwarning)
298 {
299 for (MssHost host : members) {
300 if (host.isReachable(TTLwarning)) {
301 host.pingOK(TTLwarning);
302 } else if (!host.pingKO() && host.wasReachable()) {
303 /* If the host was reachable, but is no longer reachable */
304 mssds.setAsUnreachable(host);
305 }
306 }
307 }
308
309
310 /**
311 * Check for previously reachable members of the cluster, and mark
312 * them as unreachable.
313 */
314 void updateReachability()
315 {
316 for (MssHost host : members) {
317 if (!host.isLocal() && host.isReachable()) {
318 host.printReachParams();
319 }
320 if (!host.isLocal() && !host.pingKO() && host.wasReachable()) {
321 /* If the member was reachable, but is no longer reachable */
322 mssds.setAsUnreachable(host);
323 }
324 }
325 }
326
327
328 /**
329 * Mark all reachable members of the cluster as unreachable, and
330 * check if there are new unreachable members.
331 */
332 void setClusterAsUnreachable()
333 {
334 for (MssHost host : members) {
335 if (host.wasReachable()) {
336 /* If the member was reachable, mark it as unreachable */
337 mssds.setAsUnreachable(host);
338 }
339 }
340 }
341
342
343 MssHost[] getReachable(int threshold)
344 {
345 int size = members.size();
346 List<MssHost> reachable = new ArrayList<MssHost>(size);
347 for (MssHost host : members) {
348 if (host.isReachable(threshold))
349 reachable.add(host);
350 }
351 return (MssHost[]) reachable.toArray(NULL_MSS_HOST_ARRAY);
352 }
353
354
355 MssHost[] getReachable()
356 {
357 return getReachable(MINIMUM_ALIVE_VALUE);
358 }
359
360
361 void incrementReachableCounter()
362 {
363 reachableCounter++;
364 }
365
366
367 void decrementReachableCounter()
368 {
369 reachableCounter--;
370 }
371
372
373 int getReachableCounter()
374 {
375 return reachableCounter;
376 }
377
378
379 boolean isReachable()
380 {
381 return (reachableCounter != 0);
382 }
383
384
385 /**
386 * Returns true if the local cluster is directly connected to this
387 * cluster. False is returned if there is no direct connection, and
388 * messages to this cluster (from the local cluster) must be routed
389 * through another cluster.
390 */
391 boolean directlyConnected()
392 {
393 return (rt.key.equals(rt.route));
394 }
395
396
397 /**
398 * Returns the cluster that we need to route through to get to this
399 * cluster from the local cluster.
400 */
401 private Cluster getRouteCluster()
402 {
403 return mssds.clusterLookup(rt.route);
404 }
405
406
407 /**
408 * Reset the sender side message flow for this cluster. This method
409 * should be invoked only during initialization.
410 */
411 void resetMsgFlow()
412 {
413 msgFlow.reset(rnd.nextInt());
414 }
415
416
417 /**
418 * Reset the sender side message flow for this cluster to the given
419 * value. This is used to indicate that this cluster is unreachable.
420 */
421 void resetMsgFlow(int mid)
422 {
423 msgFlow.reset(mid);
424 }
425
426
427 ////////////////////////////////////////////////////////////////////////////////////////////
428 // Receiver management
429 ////////////////////////////////////////////////////////////////////////////////////////////
430
431 void addReceiver(EndPoint member)
432 {
433 receivers.add(member);
434 }
435
436 void clearReceivers()
437 {
438 receivers.clear();
439 }
440
441 boolean hasReceivers()
442 {
443 return !(receivers.isEmpty());
444 }
445
446
447 ////////////////////////////////////////////////////////////////////////////////////////////
448 // Cluster sending methods
449 ////////////////////////////////////////////////////////////////////////////////////////////
450
451 /**
452 * Forward a raw message fragment to this cluster.
453 */
454 void forward(byte[] buffer, int bufLen)
455 {
456 if (local) {
457
458 /* If it is my local cluster (LAN) */
459
460 ni.send(buffer, bufLen);
461 if (log.isDebugEnabled())
462 log.debug("Forwarded fragment to local cluster");
463
464 } else {
465
466 /*
467 * Sending to a remote cluster, preferably through the cluster
468 * leader; otherwise, we send directly to each member of the
469 * remote cluster.
470 */
471 EndPoint leader = getLeaderEndPoint();
472 if (!leader.isMulticastEndPoint()) {
473
474 /*
475 * If there is a reachable leader member for this cluster, then
476 * send the message directly to the leader. The leader will
477 * broadcast the message within its local cluster to the other
478 * cluster members.
479 */
480 ni.send(leader, buffer, bufLen);
481 if (log.isDebugEnabled())
482 log.debug("Forwarded fragment to remote cluster through leader: " + leader);
483
484 } else {
485
486 /*
487 * Set the receiver list to all members, unless the message is
488 * associated with a set of receivers.
489 */
490 if (receivers.isEmpty()) {
491 /* Sending to all members in cluster */
492 for (MssHost host : members) {
493 ni.send(host.getEndPoint(), buffer, bufLen);
494 if (log.isDebugEnabled())
495 log.debug("Forwarded fragment to receiver " + host);
496 }
497 } else {
498 /* Sending to receivers in cluster */
499 for (EndPoint recv : receivers) {
500 ni.send(recv, buffer, bufLen);
501 if (log.isDebugEnabled())
502 log.debug("Forwarded fragment to receiver " + recv);
503 }
504 }
505 }
506 }
507 }
508
509
510 /**
511 * Method for resending Jgroup related messages. This involves
512 * setting the cluster, receiver set, and flow control information
513 * for this cluster. The actual Jgroup level message remains the
514 * same; we only changed the cluster, flow control data and receiver
515 * set, in addition to the fragment identifier in the case the
516 * original message was sent to multiple clusters (each with
517 * different fragId sequence numbers).
518 */
519 void resend(FragmentIterator fragIter, int fragId)
520 {
521 MsgJG msgjg = (MsgJG) fragIter.getMsg();
522 EndPoint[] rcvrs = msgjg.getReceivers();
523
524 /*
525 * Add receivers in the destination cluster to the receiver set of
526 * the cluster.
527 */
528 for (int i = 0; i < rcvrs.length; i++) {
529 MssHost host = mssds.hostLookup(rcvrs[i]);
530 if (!rcvrs[i].isLocal() && host.isIn(this)) {
531 addReceiver(rcvrs[i]);
532 if (log.isDebugEnabled()) {
533 log.debug("Added host to the destination list: " + host);
534 }
535 } else if (log.isDebugEnabled()) {
536 log.debug("Removed host from the destination list: " + host);
537 }
538 }
539
540 /*
541 * Update the cluster header field and the receiver set of the
542 * message.
543 */
544 msgjg.setCluster(this);
545 msgjg.setReceivers(receivers);
546
547 /*
548 * Reset the fragment identifier, in case it was changed due to
549 * sending the same fragment to different clusters (with different
550 * fragment id sequences).
551 */
552 byte[] msgFragment = fragIter.getFragment(fragId);
553 FragmentHeader.marshal(false, fragId, msgFragment);
554
555 /*
556 * At this point the message should be updated again, allowing us to
557 * send a correct fragment.
558 */
559 forward(msgFragment, fragIter.fragmentLength());
560 }
561
562
563 /**
564 * Method for sending Jgroup related messages. This involves setting
565 * the cluster, receiver set, and flow control information for this
566 * cluster.
567 */
568 void send(MsgJG msgjg)
569 {
570 /*
571 * Update the cluster header field and the receiver set of the
572 * message.
573 */
574 msgjg.setCluster(this);
575 msgjg.setReceivers(receivers);
576
577 if (ROUTING_ENABLED) {
578
579 /*
580 * Check if the message has to be routed (that is, this cluster is
581 * not directly connected to the local cluster)
582 */
583 if (!directlyConnected()) {
584 if (log.isDebugEnabled()) {
585 log.debug("Stack trace", new Exception());
586 log.debug("localCluster=" + ClusterTable.getLocalCluster() + ", Route=" + rt);
587 }
588
589 /*
590 * The message must be forwarded; simply forward it to the next
591 * cluster in route for the destination (this) cluster.
592 */
593 getRouteCluster().send((Msg) msgjg);
594 }
595
596 } else {
597
598 send((Msg) msgjg);
599
600 }
601 }
602
603
604 /**
605 * Send a generic mss level message to this cluster.
606 *
607 * Note that this method used to be synchronzied. However, I believe
608 * that this is unecessary since the fragment iteration loops will have
609 * all their data in the local stack frame of the current invocation.
610 * Only those methods whom modify the object state needs to be synchronized.
611 */
612 void send(Msg msg)
613 {
614 /* Send the msg directly to the cluster */
615
616 if (local) {
617
618 /* If it is my local cluster (LAN) */
619
620 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
621 /* Generate and send a fragment */
622 byte[] buffer = frag.next(false);
623 int bufLen = frag.fragmentLength();
624
625 if (log.isDebugEnabled())
626 log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster " + rt.key);
627 ni.send(buffer, bufLen);
628 }
629
630 } else {
631
632 /*
633 * Sending to a remote cluster, preferably through the cluster
634 * leader; otherwise, we send directly to each member of the
635 * remote cluster.
636 */
637
638 EndPoint leader = getLeaderEndPoint();
639 if (!leader.isMulticastEndPoint()) {
640
641 /*
642 * If there is a reachable leader member for this cluster, then
643 * send the message directly to the leader. The leader will
644 * broadcast the message within its cluster to the other cluster
645 * members.
646 */
647
648 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
649 /* Generate and send a fragment */
650 byte[] buffer = frag.next(true);
651 int bufLen = frag.fragmentLength();
652
653 if (log.isDebugEnabled())
654 log.debug("Sending msg fragment (" + frag.getFid() + ") to cluster leader " + leader);
655 ni.send(leader, buffer, bufLen);
656 }
657
658 } else {
659
660 /*
661 * Otherwise send the message to all the members of the
662 * cluster; alternatively, if only a subset of the cluster
663 * members are receivers, only those will receive the message.
664 */
665
666 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
667 /* Generate and send a fragment */
668 byte[] buffer = frag.next(false);
669 int bufLen = frag.fragmentLength();
670
671 /*
672 * Set the receiver list to all members, unless the message is
673 * associated with a set of receivers.
674 */
675 if (receivers.isEmpty()) {
676 /* Sending to all members in cluster */
677 for (MssHost host : members) {
678 if (log.isDebugEnabled())
679 log.debug("Sending msg fragment (" + frag.getFid() + ") to member " + host);
680 ni.send(host.getEndPoint(), buffer, bufLen);
681 }
682 } else {
683 /* Sending to receivers in cluster */
684 for (EndPoint recv : receivers) {
685 if (log.isDebugEnabled())
686 log.debug("Sending msg fragment (" + frag.getFid() + ") to receiver " + recv);
687 ni.send(recv, buffer, bufLen);
688 }
689 }
690
691 } // end for-loop frag iterator
692 } // end if-else leader mcast endpoint
693 } // end if-else local/remote cluster
694 }
695
696
697 ////////////////////////////////////////////////////////////////////////////////////////////
698 // Methods from Object
699 ////////////////////////////////////////////////////////////////////////////////////////////
700
701 /**
702 * Returns the hash code for this object.
703 */
704 public int hashCode()
705 {
706 return rt.key.hashCode();
707 }
708
709
710 /**
711 * Returns a string representation of this object.
712 */
713 public String toString()
714 {
715 StringBuilder buf = new StringBuilder();
716 buf.append("[Cluster: ");
717 buf.append(rt.key);
718 buf.append("]");
719 return buf.toString();
720 }
721
722
723 /**
724 * Compares two objects for content equality.
725 *
726 * @param obj
727 * The object to compare this object with.
728 * @return
729 * True if these objects are equal; false otherwise.
730 */
731 public boolean equals(Object obj)
732 {
733 if (!(obj instanceof Cluster))
734 return false;
735 return this.rt.equals(((Cluster) obj).rt);
736 }
737
738 } // END Cluster