View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.config;
20  
21  import static jgroup.relacs.simulator.SocketStatus.SOCKET_STATUS;
22  import static jgroup.util.log.ConnectionPatternEvent.Type.Merge;
23  import static jgroup.util.log.ConnectionPatternEvent.Type.Partition;
24  
25  import java.io.Externalizable;
26  import java.io.IOException;
27  import java.io.ObjectInput;
28  import java.io.ObjectOutput;
29  import java.net.InetAddress;
30  import java.net.UnknownHostException;
31  import java.rmi.RemoteException;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.Map;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.locks.Condition;
37  import java.util.concurrent.locks.Lock;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import jgroup.core.EndPoint;
41  import jgroup.core.registry.BootstrapRegistry;
42  import jgroup.relacs.simulator.SocketStatus;
43  import jgroup.relacs.types.EndPointImpl;
44  import jgroup.util.Abort;
45  import jgroup.util.Network;
46  import jgroup.util.log.ConnectionPatternEvent;
47  import jgroup.util.log.Eventlogger;
48  
49  import org.apache.log4j.Logger;
50  
51  /**
52   *  Class for holding domain information.
53   *
54   *  @author Hein Meling
55   *  @since Jgroup 1.2
56   */
57  public class Domain
58    extends EndPointImpl
59    implements Cloneable, Comparable, Externalizable
60  {
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Logger
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Obtain logger for this class */
67    private static final Logger log = Logger.getLogger(Domain.class);
68  
69  
70    ////////////////////////////////////////////////////////////////////////////////////////////
71    // Static fields
72    ////////////////////////////////////////////////////////////////////////////////////////////
73  
74    private static final long serialVersionUID = -2367179058534386338L;
75  
76    /** In this JVM there can be only one local multicast address. */
77    private static InetAddress localMcastAddress = null;
78  
79    private static final String DEFAULT_MCAST_ADR = "226.7.8.9";
80  
81    private static final int DEFAULT_MCAST_PORT   = 61341;
82  
83  
84    ////////////////////////////////////////////////////////////////////////////////////////////
85    // Fields
86    ////////////////////////////////////////////////////////////////////////////////////////////
87  
88    /** Domain name */
89    private String domainName;
90  
91    /** Map of hosts contained within this domain */
92    private HostSet hosts = new HostSet();
93  
94    /** Map for storing content associated with this <code>Domain</code> object */
95    private Map<String,Object> content = new HashMap<String,Object>();
96  
97    /**
98     *  The number of daemons in this domain.  The default is that all hosts
99     *  should contain a Jgroup daemon; this is indicated by value -1.
100    */
101   private int numOfDaemons = -1;
102 
103   /** The number of replicas in this domain; initially zero. */
104   private int replicaCount = 0;
105 
106 
107   ////////////////////////////////////////////////////////////////////////////////////////////
108   // Constructors
109   ////////////////////////////////////////////////////////////////////////////////////////////
110 
111   /**
112    * Constructor for deserialization.
113    */
114   public Domain()
115   {
116   }
117 
118   /**
119    *  Constructs a domain from another domain object.  If the <code>copyHosts</code>
120    *  parameter is set to false, no hosts will be copied from the domain object.
121    * 
122    *  @param domain the domain to copy
123    *  @param copyHosts if true, the internal host set of the provided domain
124    *     is also copied; otherwise the internal host set will be empty.
125    */
126   public Domain(Domain domain, boolean copyHosts)
127   {
128     this(domain.domainName, domain.address, domain.port, domain.numOfDaemons);
129     if (copyHosts)
130       this.hosts = (HostSet) domain.hosts.clone();
131   }
132 
133   public Domain(String domainName)
134     throws UnknownHostException
135   {
136     this(domainName, InetAddress.getByName(DEFAULT_MCAST_ADR), DEFAULT_MCAST_PORT, -1);
137   }
138 
139   public Domain(String domainName, String mcastAdr, int mcastPort)
140     throws UnknownHostException
141   {
142     this(domainName, InetAddress.getByName(mcastAdr), mcastPort, -1);
143   }
144 
145   public Domain(String domainName, String mcastAdr, int mcastPort, int jdaemons)
146     throws UnknownHostException
147   {
148     this(domainName, InetAddress.getByName(mcastAdr), mcastPort, jdaemons);
149   }
150 
151   private Domain(String domainName, InetAddress mcastAddress, int mcastPort, int jdaemons)
152   {
153     super(mcastAddress, mcastPort);
154     this.numOfDaemons = jdaemons;
155     if (jdaemons < -1) {
156       throw new IllegalArgumentException("Invalid number Jgroup daemons");
157     }
158 
159     this.domainName = domainName;
160     /*
161      * We cannot rely on the local detection in EndPointImpl for
162      * multicast addresses.  Thus, we require that local has to be set
163      * using the <code>setLocal()</code> method.
164      */
165     local = false;
166     if (!mcastAddress.isMulticastAddress())
167       throw new IllegalArgumentException("Domain IP address is not a multicast address.");
168   }
169 
170 
171   ////////////////////////////////////////////////////////////////////////////////////////////
172   // Overridden EndPointImpl methods from the EndPoint interface
173   ////////////////////////////////////////////////////////////////////////////////////////////
174 
175   /**
176    *  Returns true if this is a local domain.
177    */
178   public boolean isLocal()
179   {
180     if (localMcastAddress == null)
181       throw new IllegalStateException("Local domain multicast address not initialized");
182     return local;
183   }
184 
185 
186   /**
187    *  Returns true always, since a domain endpoint must be a multicast
188    *  endpoint.  There is a check for this in the class constructor.
189    */
190   public boolean isMulticastEndPoint()
191   {
192     return true;
193   }
194 
195 
196   ////////////////////////////////////////////////////////////////////////////////////////////
197   // Domain interface methods
198   ////////////////////////////////////////////////////////////////////////////////////////////
199 
200   /**
201    *  Set the local flag for this domain.
202    */
203   public void setLocal(boolean local)
204   {
205     this.local = local;
206     if (local)
207       localMcastAddress = address;
208   }
209 
210 
211   /**
212    *  Returns the domain name.
213    */
214   public String getName() 
215   {
216     return domainName;
217   }
218 
219   public EndPoint getEndpoint()
220   {
221     return new EndPointImpl(super.address, super.port);
222   }
223 
224   /**
225    *  Returns the number of hosts contained in this domain.
226    */
227   public int size() 
228   {
229     return hosts.size();
230   }
231 
232 
233   /**
234    *  Returns true if there are no hosts in this domain; false otherwise.
235    */
236   public boolean isEmpty()
237   {
238     return hosts.isEmpty();
239   }
240 
241 
242   /**
243    *  Returns the <code>HostSet</code> for the hosts in this domain.
244    */
245   public HostSet getHostSet()
246   {
247     return hosts;
248   }
249 
250   
251   /**
252    *  Add a host to this domain.
253    *
254    *  @param host
255    *    The host to add to this domain.
256    *  @return 
257    *    True if the <code>Domain</code> did not already contain this host.
258    */
259   public boolean addHost(Host host)
260   {
261     return hosts.addHost(host);
262   }
263 
264 
265   /**
266    *  Retrieve the value associated with this domain for the given key.
267    */
268   public Object get(String key)
269   {
270     return content.get(key);
271   }
272 
273 
274   /**
275    *  Associate the given value with this domain under the given key.
276    */
277   public Object put(String key, Object value)
278   {
279     return content.put(key, value);
280   }
281 
282 
283   /**
284    *  Returns true if all hosts in this domain should have a daemon.
285    */
286   public boolean allDaemons()
287   {
288     return (numOfDaemons == -1);
289   }
290 
291 
292   /**
293    *  Returns true if this domain should not have any daemons.
294    */
295   public boolean hasNoDaemons()
296   {
297     return (numOfDaemons == 0);
298   }
299 
300 
301   /**
302    *  Returns the number of daemons that should reside in this domain.
303    *  A value of -1 indicates that all hosts in this domain should have
304    *  a Jgroup daemon.  A value of 0 indicates that this domain have no
305    *  daemons, and that we should look in other domains for a daemon.
306    */
307   public int numOfDaemons()
308   {
309     return numOfDaemons;
310   }
311 
312   //FIXME these three methods should probably be part of an aspect??!
313   public void decReplicaCount()
314   {
315     replicaCount--;
316   }
317 
318   public void incReplicaCount()
319   {
320     replicaCount++;
321   }
322 
323   public int getReplicaCount()
324   {
325     return replicaCount;
326   }
327 
328   public void setReplicaCount(int newReplicaCount)
329   {
330     this.replicaCount = newReplicaCount;
331   }
332 
333 
334   ////////////////////////////////////////////////////////////////////////////////////////////
335   // Cloneable interface
336   ////////////////////////////////////////////////////////////////////////////////////////////
337 
338   /**
339    *  Returns a shallow copy of this <code>Domain</code> instance.
340    *  (The elements themselves are not copied.)
341    *
342    *  @return 
343    *    a clone of this <code>Domain</code> instance.
344    */
345   @SuppressWarnings("unchecked")
346   public Object clone()
347   {
348     try {
349       Domain newDomain = (Domain) super.clone();
350       newDomain.content = (HashMap) ((HashMap) content).clone();
351       newDomain.hosts = (HostSet) hosts.clone();
352       return newDomain;
353     } catch (CloneNotSupportedException e) {
354       /* Cannot happen since HashMap and HostSet supports clone. */
355       throw new Abort("Internal error", e);
356     }
357   }
358 
359 
360   ////////////////////////////////////////////////////////////////////////////////////////////
361   // Comparable interface
362   ////////////////////////////////////////////////////////////////////////////////////////////
363 
364   /**
365    * Compares this object with the specified object for ordering the objects.
366    * The replicaCount is the primary ordering value; the size of the domain 
367    * (the number of hosts in the domain) is the secondary ordering value, while
368    * the multicast IP and port number is the third ordering value.  Note that
369    * we rank domains with the largest number of hosts first.
370    */
371   public int compareTo(Object obj)
372   {
373     if (this == obj) {
374       return 0;
375     } else {
376       Domain domain = (Domain) obj;
377       if (replicaCount < domain.replicaCount) {
378         return -1;
379       } else if (replicaCount == domain.replicaCount) {
380         if (size() < domain.size()) {
381           // Note that we do the reverse ordering for domain size;
382           // that is the highest value should be first
383           return 1;
384         } else if (size() == domain.size()) {
385           // If the two domains have the same number of hosts,
386           // use the IP and port number ordering defined by the EndPointImpl
387           return super.compareTo(obj);
388         } else {
389           return -1;
390         }
391       } else {
392         return 1;
393       }
394     }
395   }
396 
397 
398   ////////////////////////////////////////////////////////////////////////////////////////////
399   // Override methods in Object/EndPointImpl
400   ////////////////////////////////////////////////////////////////////////////////////////////
401 
402   /**
403    *  Returns a string representation of this object
404    */
405    public String toString() 
406    {
407      return toString(false);
408    }
409 
410   /**
411    *  Returns a string representation of this object
412    */
413    public String toString(boolean full) 
414    {
415      StringBuilder buf = new StringBuilder();
416 //     buf.append("[");
417 //     buf.append(domainName);
418 //     buf.append(": ");
419      buf.append(super.toString());
420 //     buf.append("]");
421      if (full) {
422        for (Iterator<Host> i = hosts.iterator(); i.hasNext();) {
423          buf.append("\n  ");
424          buf.append((i.next()).toString());
425        }
426      }
427      return buf.toString();
428    }
429 
430 
431    ////////////////////////////////////////////////////////////////////////////////////////////
432    //  Marshaling/unmarshaling methods
433    ////////////////////////////////////////////////////////////////////////////////////////////
434 
435    /**
436     *  Restores the content of this object from the marshalled data contained
437     *  in the specified input stream.
438     * 
439     *  @param in the stream to be read
440     */
441    public void readExternal(ObjectInput in)
442      throws ClassNotFoundException, IOException
443    {
444      super.readExternal(in);
445      replicaCount = in.readInt();
446      hosts = (HostSet) in.readObject();
447      content = (Map) in.readObject();
448    }
449 
450    /**
451     *  Marshals the content of this object to the specified output stream.
452     * 
453     *  @param out the stream to be written
454     */
455    public void writeExternal(ObjectOutput out)
456      throws IOException
457    {
458      super.writeExternal(out);
459      out.writeInt(replicaCount);
460      out.writeObject(hosts);
461      out.writeObject(content);
462    }
463 
464 
465    ////////////////////////////////////////////////////////////////////////////////////////////
466    // Methods for the partition simulator
467    //
468    // //FIXME these methods should be placed in an aspect instead
469    ////////////////////////////////////////////////////////////////////////////////////////////
470 
471    /** List of socket status objects that should be committed */
472    private static final Map<Host, SocketStatus> statusMap = new HashMap<Host, SocketStatus>();
473 
474    /** The connection event to be committed */
475    private static ConnectionPatternEvent connEvent = null;
476 
477    /** The number of remaining commits in the current injection campaign */
478    private static volatile int remainingCommits;
479 
480    /** Lock and condition to exclude concurrent access */
481    private static final Lock lock = new ReentrantLock();
482    private static final Condition completed = lock.newCondition();
483 
484 
485    /**
486     * Inject a partition on this domain so that it becomes unreachable from the given
487     * domain.
488     * 
489     * This method will only configure the reachability pattern, and thus it must
490     * be followed by an invocation of the <code>commit()</code> method below.
491     */
492    public void partition(Domain domain)
493    {
494      if (connEvent == null)
495        connEvent = new ConnectionPatternEvent();
496      connEvent.addEvent(Partition, this, domain);
497      boolean success = setReachability(0, domain);
498      if (!success) {
499        // Clear and retry
500        clearStatus();
501        if (log.isDebugEnabled())
502          log.debug("Retrying partition: " + domain);
503        success = setReachability(0, domain);
504        if (!success)
505          log.error("FAILED TO RETRY PARTITION: " + domain);
506      }
507    }
508 
509    /**
510     * Inject a merge on this domain so that it becomes reachable from the given domain.
511     * 
512     * This method will only configure the reachability pattern, and thus it must
513     * be followed by an invocation of the <code>commit()</code> method below.
514     */
515    public void merge(Domain domain)
516    {
517      if (connEvent == null)
518        connEvent = new ConnectionPatternEvent();
519      connEvent.addEvent(Merge, this, domain);
520      boolean success = setReachability(100, domain);
521      if (!success) {
522        // Clear and retry
523        clearStatus();
524        if (log.isDebugEnabled())
525          log.debug("Retrying merge: " + domain);
526        success = setReachability(100, domain);
527        if (!success)
528          log.error("FAILED TO RETRY MERGE: " + domain);
529      }
530    }
531 
532    /**
533     * Commit the currently configured reachability pattern, essentially activating
534     * the pattern.
535     * 
536     * Calls to this method <i>must</i> only occur after a call to either the
537     * <code>partition()</code> or <code>merge()</code> method, otherwise an
538     * <code>IllegalStateException</code> is thrown.  It cannot be called multiple
539     * times per call to <code>partition()</code> or <code>merge()</code>.
540     */
541    public static void commit(String pattern)
542    {
543      if (connEvent == null)
544        throw new IllegalStateException("Cannot call commit() before partition() or merge()");
545      ConnectionPatternEvent preEvent =
546        new ConnectionPatternEvent("PRE-COMMIT", pattern, connEvent.getConnectionEvents());
547      if (Eventlogger.ENABLED)
548        Eventlogger.logEventFlush(preEvent);
549 
550      // initialize the remaining commits counter
551      lock.lock();
552      try {
553        remainingCommits = statusMap.size();
554      } finally {
555        lock.unlock();
556      }
557 
558      /*
559       * Iterate over the stored socket status objects and perform commit
560       * once for each host.
561       */
562      for (final Map.Entry<Host, SocketStatus> entry : statusMap.entrySet()) {
563        final Host host = entry.getKey();
564        Thread commitThread = new Thread("Commit-" + host) {
565          public void run() {
566            if (log.isDebugEnabled())
567              log.debug("Committing: " + host);
568            try {
569              entry.getValue().commit();
570              if (log.isDebugEnabled())
571                log.debug("Have committed: " + host);
572            } catch (RemoteException e) {
573              log.warn("Failed to commit: " + host, e);
574            } finally {
575              lock.lock();
576              try {
577                // decrement the remaining commits counter
578                remainingCommits--;
579                /*
580                 * Only signal when all commit threads have completed.
581                 * This reduces unecessary context-switching.
582                 */
583                if (remainingCommits == 0)
584                  completed.signal();
585              } finally {
586                lock.unlock();
587              }
588            }
589          }
590        };
591        commitThread.setDaemon(true);
592        commitThread.setPriority(Thread.MAX_PRIORITY);
593        commitThread.start();
594      }
595 
596      // Check if all commit threads have completed
597      lock.lock();
598      try {
599        // We still need a while loop here since the await call can have spurious wakeups.
600        while (remainingCommits > 0) {
601          log.info("Commit threads remaining: " + remainingCommits);
602          try {
603            boolean timeout = !completed.await(5, TimeUnit.SECONDS);
604            if (timeout && remainingCommits > 0)
605              throw new RuntimeException("Timeout during commit; threads remaining: " + remainingCommits);
606          } catch (InterruptedException e) {
607            log.warn("Waiting for commit thread interrupted (should not happen)", e);
608          }
609        }
610      } finally {
611        lock.unlock();
612      }
613 
614      ConnectionPatternEvent postEvent =
615        new ConnectionPatternEvent("POST-COMMIT", pattern, connEvent.getConnectionEvents());
616      if (Eventlogger.ENABLED)
617        Eventlogger.logEventFlush(postEvent);
618      if (log.isDebugEnabled())
619        log.debug("ALL COMMITTED");
620      /*
621       * Reset the connection pattern to ensure that calls to commit() are
622       * always proceeded by calls to either partition() or merge() first.
623       */
624      connEvent = null;
625    }
626 
627   /**
628    * Clear out the status map to avoid that the socket status remote
629    * reference objects are propagated from one experiment to the next.
630    */
631   public void clearStatus()
632   {
633     statusMap.clear();
634   }
635 
636   /**
637    * Set the reachability between this domain and the given domain.
638    * 
639    * @param reachability The reachability percentage to set for this domain
640    * @param domain The domain for which this domain the reachability is to be updated.
641    * @return True if all nodes were successfully updated with the given
642    *   reachability percentage.  False is returned if updating failed on at
643    *   least one of the nodes.
644    */
645   private boolean setReachability(int reachability, Domain domain)
646    {
647     if (!domain.equals(this)) {
648       for (Host h1 : hosts) {
649         StringBuilder buf = new StringBuilder();
650         buf.append(h1.getHostName());
651         buf.append("(");
652         buf.append(reachability);
653         buf.append("): ");
654         try {
655           SocketStatus h1status = getStatus(h1);
656           for (Host h2 : domain.getHostSet()) {
657             h1status.setStatus(h2.getAddress(), reachability);
658             buf.append(h2.getHostName());
659             buf.append(" ");
660             // Ensure to establish also the reverse reachability relation
661             SocketStatus h2status = getStatus(h2);
662             h2status.setStatus(h1.getAddress(), reachability);
663           }
664           if (log.isDebugEnabled())
665             log.debug(buf);
666         } catch (Exception e) {
667           log.warn("Failed to reach host", e);
668           // Abort early if one failed
669           return false;
670         }
671       }
672     } else {
673       log.warn(domain + " is the same as this");
674     }
675      // All successful
676      return true;
677    }
678 
679    private SocketStatus getStatus(Host host)
680      throws Exception
681    {
682      SocketStatus status = statusMap.get(host);
683      if (status == null) {
684        try {
685          status = (SocketStatus) host.lookup(SOCKET_STATUS);
686          if (log.isDebugEnabled())
687            log.debug("SocketStatus for " + host + ": " + status);
688        } catch (Exception e) {
689          log.warn(host.getHostName() + " is not available", e);
690          throw e;
691        }
692        statusMap.put(host, status);
693      }
694      return status;
695    }
696 
697 } // END Domain