1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23
24 import jgroup.core.ConfigurationException;
25 import jgroup.core.EndPoint;
26 import jgroup.relacs.config.DistributedSystemConfig;
27 import jgroup.relacs.config.Domain;
28 import jgroup.relacs.config.DomainSet;
29 import jgroup.relacs.config.HostSet;
30 import jgroup.relacs.config.TransportConfig;
31
32 import org.apache.log4j.Logger;
33
34 /**
35 * The <code>MssDS</code> object contains the description of the
36 * distributed system in which the distributed computation is carried
37 * out. A distributed system is described as a collection of domains
38 * and hosts, and may be constructed in two ways: by adding a
39 * collection of hosts to a initially empty distributed system, or by
40 * reading a configuration file specifying the list of hosts and
41 * domains.
42 *
43 * @author Salvatore Cammarata
44 * @since Jgroup 1.2
45 */
46 final class MssDS
47 implements MssConstants, MssTag
48 {
49
50 ////////////////////////////////////////////////////////////////////////////////////////////
51 // Logger
52 ////////////////////////////////////////////////////////////////////////////////////////////
53
54 /** Obtain logger for this class */
55 private static final Logger log = Logger.getLogger(MssDS.class);
56
57
58 ////////////////////////////////////////////////////////////////////////////////////////////
59 // Fields
60 ////////////////////////////////////////////////////////////////////////////////////////////
61
62 /** Configuration parameters */
63 private TransportConfig tconf;
64
65 /** Topology information: host table */
66 private HostTable hosttable;
67
68 /** Topology information: cluster table */
69 private ClusterTable clustertable;
70
71 /** Topology information: routing table */
72 private RoutingTable routingtable;
73
74 /** reachability view of the Distributed System */
75 private DSView view;
76
77 /** reachability view of the Distributed System */
78 private DSView upperview;
79
80 /** Network interface */
81 private NI ni;
82
83
84 ////////////////////////////////////////////////////////////////////////////////////////////
85 // Constructors
86 ////////////////////////////////////////////////////////////////////////////////////////////
87
88 /**
89 * Constructs a distributed system whose description is contained in
90 *
91 */
92 MssDS(EventHandler ehandler, DistributedSystemConfig dsc, TransportConfig tconf)
93 throws IOException, ConfigurationException
94 {
95 /* Copy references to other data structures */
96 this.tconf = tconf;
97 ni = new NI(ehandler, tconf.getPayload() + OVERHEAD_SIZE, tconf.getMulticastTTL());
98
99 /*
100 * Initialize the tables
101 */
102 hosttable = new HostTable();
103 clustertable = new ClusterTable(this, tconf);
104
105 /*
106 * Extracts the hosts from the distributed system configuration and
107 * creates mss tables for hosts and domains (clusters).
108 */
109 DomainSet domains = dsc.getDomainSet();
110 for (Iterator i = domains.iterator(); i.hasNext(); ) {
111 Domain domain = (Domain) i.next();
112
113 /*
114 * Construct entries for the cluster table, based on the domains
115 * parsed from the configuration file.
116 */
117 Cluster cluster = new Cluster(this, tconf, domain, ni);
118 clustertable.insert(cluster);
119
120 HostSet hosts = domain.getHostSet();
121 for (Iterator j = hosts.iterator(); j.hasNext(); ) {
122 EndPoint hostEndPoint = (EndPoint) j.next();
123 /*
124 * Creates a <code>MssHost</code> object and insert it in the
125 * hosttable and cluster.
126 */
127 MssHost host = new MssHost(tconf, hostEndPoint, ni, cluster, ehandler);
128 cluster.insertMember(host);
129 hosttable.insert(host);
130 }
131 }
132
133 /* Get the local host */
134 MssHost me = HostTable.getLocalHost();
135
136 /* Reset message flows of all clusters and hosts */
137 clustertable.resetMsgFlow();
138 hosttable.resetMsgFlow();
139
140 /* Initialize the routing table */
141 routingtable = new RoutingTable(clustertable, tconf.getMaxTTL() - 1);
142
143 /* Creates a new view */
144 view = new DSView(hosttable, me);
145 upperview = new DSView(hosttable, me);
146 }
147
148
149 ////////////////////////////////////////////////////////////////////////////////////////////
150 // Getter methods
151 ////////////////////////////////////////////////////////////////////////////////////////////
152
153 /**
154 *
155 */
156 DSView getView()
157 {
158 return view;
159 }
160
161 /**
162 *
163 */
164 DSView getUpperView()
165 {
166 return upperview;
167 }
168
169 /**
170 *
171 */
172 void setControl(MsgCntrl control)
173 {
174 for (Iterator iter = clustertable.iterator(); iter.hasNext(); ) {
175 Cluster cluster = (Cluster) iter.next();
176 cluster.setControl(control);
177 }
178 }
179
180 /**
181 *
182 */
183 ClusterTable getClusterTable()
184 {
185 return clustertable;
186 }
187
188
189 /**
190 *
191 */
192 HostTable getHostTable()
193 {
194 return hosttable;
195 }
196
197
198 /**
199 *
200 */
201 MssHost hostLookup(EndPoint endpoint)
202 {
203 return hosttable.lookup(endpoint);
204 }
205
206 /**
207 *
208 */
209 Cluster clusterLookup(EndPoint endpoint)
210 {
211 return clustertable.lookup(endpoint);
212 }
213
214 /**
215 *
216 */
217 void doStart()
218 {
219 ni.doStart();
220 }
221
222
223 /**
224 * Returns the size of the distributed system; that is the total
225 * number of hosts in the system. This includes all clusters.
226 */
227 int size()
228 {
229 return hosttable.size();
230 }
231
232
233 /**
234 * Returns the number of clusters (domains) in the distributed
235 * system.
236 */
237 int numOfClusters()
238 {
239 return clustertable.size();
240 }
241
242
243 /**
244 * Returns a RoutingTable object with references to the clusters
245 * contained in the distributed systems; note that modification
246 * in the RoutingTable object are reflected in this data structure
247 */
248 RoutingTable getRoutingTable()
249 {
250 boolean changed = routingtable.updateReachability();
251 if (log.isDebugEnabled()) {
252 if (changed)
253 log.debug("Reachability has changed");
254 log.debug(routingtable);
255 }
256 return routingtable;
257 }
258
259
260 /**
261 * Update the routing table with information obtained from the given
262 * routing message.
263 */
264 void updateRoutingTable(MsgRouting msg)
265 {
266 if (log.isDebugEnabled()) {
267 log.debug(msg);
268 /* Print the old routing table */
269 log.debug(routingtable);
270 }
271 view.clear();
272 clustertable.updateRoutingTable(msg);
273 /*
274 * Note that we don't update the local routing table, since this is
275 * done in the <code>getRoutingTable()</code> method above. This
276 * saves the cost of updating it, until we need it for a MsgRouting
277 * message.
278 */
279 }
280
281
282 /**
283 * Update the reachability information of the sender of given
284 * IAMALIVE message.
285 */
286 void updateReachability(MsgIamAlive msg)
287 {
288 if (log.isDebugEnabled()) {
289 log.debug(msg);
290 /* Print the old routing table */
291 log.debug(routingtable);
292 }
293 view.clear();
294 MssHost sender = msg.getSender();
295
296 if (!sender.isLocal() && !sender.pingOK()) {
297 // if the sender was not reachable before
298 setAsReachable(sender, msg.getIncarnationId());
299 }
300 /*
301 * Note that we don't update the local routing table, since this is
302 * done in the <code>getRoutingTable()</code> method above. This
303 * saves the cost of updating it, until we need it for a MsgRouting
304 * message.
305 */
306 }
307
308 void update()
309 {
310 if (log.isDebugEnabled()) {
311 /* Print the old routing table */
312 log.debug(routingtable);
313 }
314 view.clear();
315 clustertable.update();
316 /*
317 * Note that we don't update the local routing table, since this is
318 * done in the <code>getRoutingTable()</code> method above. This
319 * saves the cost of updating it, until we need it for a MsgRouting
320 * message.
321 */
322 }
323
324
325 /**
326 * Check if the sender has the same incarnation as previously known.
327 *
328 * @param header
329 * The <code>FragmentHeader</code> object to check for same
330 * incarnation identifier.
331 * @return
332 * True if the message fragment has a new incarnation identifier
333 * for the sender; false otherwise.
334 */
335 boolean hasNewIncarnation(FragmentHeader header)
336 {
337 view.clear();
338 MssHost sender = header.getSender();
339 if (sender.isReachable() && sender.getIncarnationId() != header.getIncarnationId()) {
340 setAsUnreachable(sender);
341 return true;
342 }
343 return false;
344 }
345
346
347 /**
348 * Check the <code>FWDROUTING</code> routing message, to see if the
349 * incarnations are equal to the locally known incarnations.
350 *
351 * @param msg
352 * The <code>MsgRouting</code> object to check for incarnation id.
353 * @return
354 * True if there are some new incarnation identifiers that were
355 * previously unknown; false otherwise.
356 */
357 boolean checkIncarnation(MsgRouting msg)
358 {
359 /* Flag to indicate if there are new incarnations */
360 boolean newInc = false;
361
362 view.clear();
363 TopologyEntry[] table = msg.getTopologyTable();
364 for (int i = 0; i < table.length; i++) {
365 for (int j = 0; j < table[i].reachable.length; j++) {
366 MssHost member = hostLookup(table[i].reachable[j]);
367 if (!member.isLocal() && member.isReachable() &&
368 member.getIncarnationId() != table[i].incarnationId[j]) {
369 setAsUnreachable(member);
370 newInc = true;
371 }
372 }
373 }
374 return newInc;
375 }
376
377
378 /**
379 * Mark the given host as unreachable. Also update the message flow
380 * data of both sender and receiver side, and notify the upper layer
381 * of a view change, excluding this partiuclar member.
382 *
383 * @param member
384 * The host member to mark as unreachable.
385 */
386 void setAsUnreachable(MssHost member)
387 {
388 if (log.isDebugEnabled())
389 log.debug("setAsUnreachable: start " + member);
390
391 member.setAsUnreachable();
392 Cluster cluster = member.getCluster();
393 cluster.decrementReachableCounter();
394 view.setAsUnreachable(member);
395 upperview.setAsUnreachable(member);
396
397 /*
398 * If none of the hosts included in the cluster containing this
399 * member is reachable, resets the flow control information for the
400 * sender side (cluster).
401 */
402 if (!cluster.isReachable()) {
403 if (!cluster.isLocal()) {
404 cluster.resetMsgFlow(UNDEF);
405 if (log.isDebugEnabled()) {
406 log.debug("setAsUnreachable: reset Msg Flow of " + cluster);
407 }
408 } else {
409 throw new IllegalStateException("Local cluster unreachable!");
410 }
411 }
412
413 /*
414 * If the new unreachable member is in the local cluster, update the
415 * control flow information for all hosts.
416 */
417 if (cluster.isLocal()) {
418 for (Iterator iter = hosttable.iterator(); iter.hasNext(); ) {
419 MssHost host = (MssHost) iter.next();
420 host.getMsgFlow().clusterWindow.set(member.getClusterIndex(), UNDEF);
421 }
422 }
423
424 /* Reset the receiver side message flow of the unreachable member */
425 member.resetMsgFlow();
426 if (log.isDebugEnabled()) {
427 log.debug("setAsUnreachable: reset Msg Flow of " + member);
428 }
429
430 if (log.isDebugEnabled())
431 log.debug("setAsUnreachable: end");
432 }
433
434
435 void setAsReachable(MssHost member, int inc)
436 {
437 if (log.isDebugEnabled())
438 log.debug("setAsReachable: start " + member + " " + inc);
439
440 Cluster cluster = member.getCluster();
441 cluster.incrementReachableCounter();
442 view.setAsReachable(member);
443
444 /*
445 * Note that we don't need to reinitialize the message flows for the
446 * cluster and member at this point since that is done when
447 * receiving an ASYN message from the member; see Mss.handleSYNMsg()
448 * for details.
449 */
450
451 /*
452 * If the incarnation number is changed, we must notify the upper
453 * layer of this fact.
454 */
455 if (member.getIncarnationId() != inc) {
456 view.setNewIncarnation(member);
457 upperview.setNewIncarnation(member);
458 member.setIncarnationId(inc);
459 }
460 if (log.isDebugEnabled())
461 log.debug("setAsReachable: end");
462 }
463
464
465 /**
466 * This flow control method updates the last message delivered
467 * by the remote cluster in each of the members of the cluster.
468 * The new value is equal to <CODE>lastMsgRcvd</CODE> of the
469 * cluster window.
470 */
471 FCEntry[] getClusterFCEntry(EndPoint key)
472 {
473 Cluster cluster = clustertable.lookup(key);
474 if (cluster == null) {
475 log.debug("clustertable: "+ clustertable);
476 log.warn("No cluster exists in cluster table for endpoint: "+ key);
477 }
478
479 FCEntry[] fc = new FCEntry[cluster.size()];
480 int i = 0;
481 for (Iterator iter = cluster.iterator(); iter.hasNext(); ) {
482 MssHost host = (MssHost) iter.next();
483 fc[i++] = new FCEntry(host);
484 }
485 return fc;
486 }
487
488
489 /**
490 * This flow control method returns an array of <CODE>FCEntry</CODE>
491 * objects containing the last delivered message of all members
492 * (local or remote). Clearly, for <CODE>member[i] == me</CODE>
493 * this value is equal to <CODE>lastMsgSent</CODE>.
494 */
495 FCEntry[] getAllFCEntry()
496 {
497 int size = hosttable.size();
498 FCEntry[] ret = new FCEntry[size];
499 for (int i=0; i < size; i++) {
500 MssHost host = hosttable.get(i);
501 ret[i] = new FCEntry(host.getEndPoint(), host.getMsgFlow().getLastMsgDlvr());
502 }
503 return ret;
504 }
505
506
507 ////////////////////////////////////////////////////////////////////////////////////////////
508 // Methods from Object
509 ////////////////////////////////////////////////////////////////////////////////////////////
510
511 /**
512 * Returns a string representation of this object
513 */
514 public String toString()
515 {
516 return "[MssDS:" + hosttable + ", "+ clustertable + "]";
517 }
518
519 } // END MssDS