View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  
24  import jgroup.core.ConfigurationException;
25  import jgroup.core.EndPoint;
26  import jgroup.relacs.config.DistributedSystemConfig;
27  import jgroup.relacs.config.Domain;
28  import jgroup.relacs.config.DomainSet;
29  import jgroup.relacs.config.HostSet;
30  import jgroup.relacs.config.TransportConfig;
31  
32  import org.apache.log4j.Logger;
33  
34  /**
35   *  The <code>MssDS</code> object contains the description of the
36   *  distributed system in which the distributed computation is carried
37   *  out.  A distributed system is described as a collection of domains
38   *  and hosts, and may be constructed in two ways: by adding a
39   *  collection of hosts to a initially empty distributed system, or by
40   *  reading a configuration file specifying the list of hosts and
41   *  domains.
42   *
43   *  @author Salvatore Cammarata
44   *  @since Jgroup 1.2
45   */
46  final class MssDS
47    implements MssConstants, MssTag
48  {
49  
50    ////////////////////////////////////////////////////////////////////////////////////////////
51    // Logger
52    ////////////////////////////////////////////////////////////////////////////////////////////
53  
54    /** Obtain logger for this class */
55    private static final Logger log = Logger.getLogger(MssDS.class);
56  
57  
58    ////////////////////////////////////////////////////////////////////////////////////////////
59    // Fields
60    ////////////////////////////////////////////////////////////////////////////////////////////
61  
62    /** Configuration parameters */
63    private TransportConfig tconf;
64  
65    /** Topology information: host table */
66    private HostTable hosttable;
67  
68    /** Topology information: cluster table */
69    private ClusterTable clustertable;
70  
71    /** Topology information: routing table */
72    private RoutingTable routingtable;
73  
74    /** reachability view of the Distributed System */
75    private DSView view;        
76  
77    /** reachability view of the Distributed System */
78    private DSView upperview;       
79  
80    /** Network interface */
81    private NI ni;
82  
83  
84    ////////////////////////////////////////////////////////////////////////////////////////////
85    // Constructors
86    ////////////////////////////////////////////////////////////////////////////////////////////
87  
88    /**
89     *  Constructs a distributed system whose description is contained in
90     *
91     */
92    MssDS(EventHandler ehandler, DistributedSystemConfig dsc, TransportConfig tconf)
93      throws IOException, ConfigurationException
94    {
95      /* Copy references to other data structures */
96      this.tconf = tconf;
97      ni = new NI(ehandler, tconf.getPayload() + OVERHEAD_SIZE, tconf.getMulticastTTL());
98  
99      /*
100      *  Initialize the tables
101      */
102     hosttable = new HostTable();
103     clustertable = new ClusterTable(this, tconf);
104 
105     /*
106      *  Extracts the hosts from the distributed system configuration and
107      *  creates mss tables for hosts and domains (clusters).
108      */
109     DomainSet domains = dsc.getDomainSet();
110     for (Iterator i = domains.iterator(); i.hasNext(); ) {
111       Domain domain = (Domain) i.next();
112 
113       /*
114        * Construct entries for the cluster table, based on the domains
115        * parsed from the configuration file.
116        */
117       Cluster cluster = new Cluster(this, tconf, domain, ni);
118       clustertable.insert(cluster);
119 
120       HostSet hosts = domain.getHostSet();
121       for (Iterator j = hosts.iterator(); j.hasNext(); ) {
122         EndPoint hostEndPoint = (EndPoint) j.next();
123         /*
124          * Creates a <code>MssHost</code> object and insert it in the
125          * hosttable and cluster.
126          */
127         MssHost host = new MssHost(tconf, hostEndPoint, ni, cluster, ehandler);
128         cluster.insertMember(host);
129         hosttable.insert(host);
130       }
131     }
132 
133     /* Get the local host */
134     MssHost me = HostTable.getLocalHost();
135 
136     /* Reset message flows of all clusters and hosts */
137     clustertable.resetMsgFlow();
138     hosttable.resetMsgFlow();
139 
140     /* Initialize the routing table */
141     routingtable = new RoutingTable(clustertable, tconf.getMaxTTL() - 1);
142 
143     /* Creates a new view */
144     view = new DSView(hosttable, me);
145     upperview = new DSView(hosttable, me);
146   }
147 
148 
149   ////////////////////////////////////////////////////////////////////////////////////////////
150   // Getter methods
151   ////////////////////////////////////////////////////////////////////////////////////////////
152 
153   /**
154    *  
155    */
156   DSView getView()
157   {
158     return view;
159   }
160  
161   /**
162    *  
163    */
164   DSView getUpperView()
165   {
166     return upperview;
167   }
168  
169   /**
170    *  
171    */
172   void setControl(MsgCntrl control)
173   {
174     for (Iterator iter = clustertable.iterator(); iter.hasNext(); ) {
175       Cluster cluster = (Cluster) iter.next();
176       cluster.setControl(control);
177     }
178   }
179 
180   /**
181    *  
182    */
183   ClusterTable getClusterTable() 
184   {
185     return clustertable;
186   }
187 
188 
189   /**
190    *  
191    */
192   HostTable getHostTable() 
193   {
194     return hosttable;
195   }
196 
197 
198   /**
199    *  
200    */
201   MssHost hostLookup(EndPoint endpoint)
202   {
203     return hosttable.lookup(endpoint);
204   }
205 
206   /**
207    *  
208    */
209   Cluster clusterLookup(EndPoint endpoint)
210   {
211     return clustertable.lookup(endpoint);
212   }
213 
214   /**
215    *  
216    */
217   void doStart()
218   {
219     ni.doStart();
220   }
221 
222 
223   /**
224    *  Returns the size of the distributed system; that is the total
225    *  number of hosts in the system.  This includes all clusters.
226    */
227   int size()
228   {
229     return hosttable.size();
230   }
231 
232 
233   /**
234    *  Returns the number of clusters (domains) in the distributed
235    *  system.
236    */
237   int numOfClusters()
238   {
239     return clustertable.size();
240   }
241 
242 
243   /**
244    *  Returns a RoutingTable object with references to the clusters
245    *  contained in the distributed systems; note that modification
246    *  in the RoutingTable object are reflected in this data structure
247    */
248   RoutingTable getRoutingTable()
249   {
250     boolean changed = routingtable.updateReachability();
251     if (log.isDebugEnabled()) {
252       if (changed)
253         log.debug("Reachability has changed");
254       log.debug(routingtable);
255     }
256     return routingtable;
257   }
258 
259 
260   /**
261    *  Update the routing table with information obtained from the given
262    *  routing message.
263    */
264   void updateRoutingTable(MsgRouting msg)
265   {
266     if (log.isDebugEnabled()) {
267       log.debug(msg);
268       /* Print the old routing table */
269       log.debug(routingtable);
270     }
271     view.clear();
272     clustertable.updateRoutingTable(msg);
273     /*
274      * Note that we don't update the local routing table, since this is
275      * done in the <code>getRoutingTable()</code> method above.  This
276      * saves the cost of updating it, until we need it for a MsgRouting
277      * message.
278      */
279   }
280 
281 
282   /**
283    *  Update the reachability information of the sender of given
284    *  IAMALIVE message.
285    */
286   void updateReachability(MsgIamAlive msg)
287   {
288     if (log.isDebugEnabled()) {
289       log.debug(msg);
290       /* Print the old routing table */
291       log.debug(routingtable);
292     }
293     view.clear();
294     MssHost sender = msg.getSender();
295 
296     if (!sender.isLocal() && !sender.pingOK()) {
297       // if the sender was not reachable before
298       setAsReachable(sender, msg.getIncarnationId());
299     }
300     /*
301      * Note that we don't update the local routing table, since this is
302      * done in the <code>getRoutingTable()</code> method above.  This
303      * saves the cost of updating it, until we need it for a MsgRouting
304      * message.
305      */
306   }
307 
308   void update()
309   {
310     if (log.isDebugEnabled()) {
311       /* Print the old routing table */
312       log.debug(routingtable);
313     }
314     view.clear();
315     clustertable.update();
316     /*
317      * Note that we don't update the local routing table, since this is
318      * done in the <code>getRoutingTable()</code> method above.  This
319      * saves the cost of updating it, until we need it for a MsgRouting
320      * message.
321      */
322   }
323 
324 
325   /**
326    *  Check if the sender has the same incarnation as previously known.
327    *
328    *  @param header
329    *    The <code>FragmentHeader</code> object to check for same
330    *    incarnation identifier.
331    *  @return 
332    *    True if the message fragment has a new incarnation identifier
333    *    for the sender; false otherwise.
334    */
335   boolean hasNewIncarnation(FragmentHeader header)
336   {
337     view.clear();
338     MssHost sender = header.getSender();
339     if (sender.isReachable() && sender.getIncarnationId() != header.getIncarnationId()) {
340       setAsUnreachable(sender);
341       return true;
342     }
343     return false;
344   }
345 
346 
347   /**
348    *  Check the <code>FWDROUTING</code> routing message, to see if the
349    *  incarnations are equal to the locally known incarnations.
350    *
351    *  @param msg
352    *    The <code>MsgRouting</code> object to check for incarnation id.
353    *  @return 
354    *    True if there are some new incarnation identifiers that were
355    *    previously unknown; false otherwise.
356    */
357   boolean checkIncarnation(MsgRouting msg)
358   {
359     /* Flag to indicate if there are new incarnations */
360     boolean newInc = false;
361 
362     view.clear();
363     TopologyEntry[] table = msg.getTopologyTable();
364     for (int i = 0; i < table.length; i++) {
365       for (int j = 0; j < table[i].reachable.length; j++) {
366         MssHost member = hostLookup(table[i].reachable[j]);
367         if (!member.isLocal() && member.isReachable() && 
368             member.getIncarnationId() != table[i].incarnationId[j]) {
369           setAsUnreachable(member);
370           newInc = true;
371         }
372       }
373     }
374     return newInc;
375   }
376 
377 
378   /**
379    *  Mark the given host as unreachable.  Also update the message flow
380    *  data of both sender and receiver side, and notify the upper layer
381    *  of a view change, excluding this partiuclar member.
382    *
383    *  @param member
384    *    The host member to mark as unreachable.
385    */
386   void setAsUnreachable(MssHost member) 
387   {
388     if (log.isDebugEnabled())
389       log.debug("setAsUnreachable: start " + member);
390 
391     member.setAsUnreachable();
392     Cluster cluster = member.getCluster();
393     cluster.decrementReachableCounter();
394     view.setAsUnreachable(member);
395     upperview.setAsUnreachable(member);
396 
397     /*
398      * If none of the hosts included in the cluster containing this
399      * member is reachable, resets the flow control information for the
400      * sender side (cluster).
401      */
402     if (!cluster.isReachable()) {
403       if (!cluster.isLocal()) {
404         cluster.resetMsgFlow(UNDEF);
405         if (log.isDebugEnabled()) {
406           log.debug("setAsUnreachable: reset Msg Flow of " + cluster);
407         }
408       } else {
409         throw new IllegalStateException("Local cluster unreachable!");
410       }
411     }
412 
413     /*
414      * If the new unreachable member is in the local cluster, update the
415      * control flow information for all hosts.
416      */
417     if (cluster.isLocal()) {
418       for (Iterator iter = hosttable.iterator(); iter.hasNext(); ) {
419         MssHost host = (MssHost) iter.next();
420         host.getMsgFlow().clusterWindow.set(member.getClusterIndex(), UNDEF);
421       }
422     }
423 
424     /* Reset the receiver side message flow of the unreachable member */
425     member.resetMsgFlow();
426     if (log.isDebugEnabled()) {
427       log.debug("setAsUnreachable: reset Msg Flow of " + member);
428     }
429 
430     if (log.isDebugEnabled())
431       log.debug("setAsUnreachable: end");
432   }
433 
434 
435   void setAsReachable(MssHost member, int inc) 
436   {
437     if (log.isDebugEnabled())
438       log.debug("setAsReachable: start " + member + " " + inc);
439 
440     Cluster cluster = member.getCluster();
441     cluster.incrementReachableCounter();
442     view.setAsReachable(member);
443 
444     /*
445      * Note that we don't need to reinitialize the message flows for the
446      * cluster and member at this point since that is done when
447      * receiving an ASYN message from the member; see Mss.handleSYNMsg()
448      * for details.
449      */
450 
451     /*
452      * If the incarnation number is changed, we must notify the upper
453      * layer of this fact.
454      */
455     if (member.getIncarnationId() != inc) {
456       view.setNewIncarnation(member);
457       upperview.setNewIncarnation(member);
458       member.setIncarnationId(inc);
459     }
460     if (log.isDebugEnabled())
461       log.debug("setAsReachable: end");
462   }
463 
464 
465   /**
466    *  This flow control method updates the last message delivered
467    *  by the remote cluster in each of the members of the cluster.
468    *  The new value is equal to <CODE>lastMsgRcvd</CODE> of the
469    *  cluster window.
470    */
471   FCEntry[] getClusterFCEntry(EndPoint key)
472   {
473     Cluster cluster = clustertable.lookup(key);
474     if (cluster == null) {
475       log.debug("clustertable: "+ clustertable);
476       log.warn("No cluster exists in cluster table for endpoint: "+ key);
477     }
478 
479     FCEntry[] fc = new FCEntry[cluster.size()];
480     int i = 0;
481     for (Iterator iter = cluster.iterator(); iter.hasNext(); ) {
482       MssHost host = (MssHost) iter.next();
483       fc[i++] = new FCEntry(host);
484     }
485     return fc;
486   }
487 
488 
489   /**
490    *  This flow control method returns an array of <CODE>FCEntry</CODE>
491    *  objects containing the last delivered message of all members
492    *  (local or remote). Clearly, for <CODE>member[i] == me</CODE>
493    *  this value is equal to <CODE>lastMsgSent</CODE>.
494    */
495   FCEntry[] getAllFCEntry() 
496   {
497     int size = hosttable.size();
498     FCEntry[] ret = new FCEntry[size];
499     for (int i=0; i < size; i++) {
500       MssHost host = hosttable.get(i);
501       ret[i] = new FCEntry(host.getEndPoint(), host.getMsgFlow().getLastMsgDlvr());
502     }
503     return ret; 
504   }
505 
506 
507   ////////////////////////////////////////////////////////////////////////////////////////////
508   // Methods from Object
509   ////////////////////////////////////////////////////////////////////////////////////////////
510 
511   /**
512    *  Returns a string representation of this object
513    */
514   public String toString()
515   {
516     return "[MssDS:" + hosttable + ", "+ clustertable + "]";
517   }
518 
519 } // END MssDS