View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18   
19  package jgroup.arm;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.InputStream;
24  import java.io.InputStreamReader;
25  import java.io.OutputStream;
26  import java.io.PrintWriter;
27  import java.lang.reflect.InvocationTargetException;
28  import java.lang.reflect.Method;
29  import java.lang.reflect.Modifier;
30  import java.net.DatagramSocket;
31  import java.net.SocketException;
32  import java.rmi.AlreadyBoundException;
33  import java.rmi.Remote;
34  import java.rmi.RemoteException;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.HashSet;
38  import java.util.Map;
39  import java.util.Set;
40  
41  import jgroup.core.ConfigManager;
42  import jgroup.core.ConfigurationException;
43  import jgroup.core.JgroupException;
44  import jgroup.core.arm.ExecException;
45  import jgroup.core.arm.ExecService;
46  import jgroup.core.arm.ReplicationManager;
47  import jgroup.core.registry.BootstrapRegistry;
48  import jgroup.core.registry.DependableRegistry;
49  import jgroup.core.registry.RegistryFactory;
50  import jgroup.relacs.config.AppConfig;
51  import jgroup.relacs.config.ClassData;
52  import jgroup.relacs.config.DistributedSystemConfig;
53  import jgroup.relacs.config.Host;
54  import jgroup.relacs.daemon.DaemonInteraction;
55  import jgroup.util.Abort;
56  import jgroup.util.Network;
57  import jgroup.util.log.Eventlogger;
58  import jgroup.util.log.ReplicaEvent;
59  import static jgroup.util.log.ReplicaEvent.Type.Removed;
60  import static jgroup.util.log.ReplicaEvent.Type.Shutdown;
61  import net.jini.export.Exporter;
62  import net.jini.jeri.BasicILFactory;
63  import net.jini.jeri.BasicJeriExporter;
64  import net.jini.jeri.tcp.TcpServerEndpoint;
65  
66  import org.apache.log4j.LogManager;
67  import org.apache.log4j.Logger;
68  
69  
70  /**
71   *  ExecDaemon implements the ExecService allowing a replica manager or
72   *  other entity to start replicas on hosts running the ExecDaemon.
73   *
74   *  Note that an important property of ExecDaemon is that it does not
75   *  have any state (except the map of local processes).  Thus the rest
76   *  of the distributed system does not depend on its state.
77   *
78   *  @author Hein Meling
79   *  @since Jgroup 1.2
80   */
81  public class ExecDaemon
82    implements ExecService
83  {
84  
85    ////////////////////////////////////////////////////////////////////////////////////////////
86    // Logger
87    ////////////////////////////////////////////////////////////////////////////////////////////
88  
89    /** Obtain logger for this class */
90    private static final Logger log = Logger.getLogger(ExecDaemon.class);
91  
92  
93    ////////////////////////////////////////////////////////////////////////////////////////////
94    // Fields
95    ////////////////////////////////////////////////////////////////////////////////////////////
96  
97    /** Map of replica classes running on this host */
98    private Map<ClassData,ReplicaThread> replicaMap =
99      Collections.synchronizedMap(new HashMap<ClassData,ReplicaThread>());
100 
101 
102   ////////////////////////////////////////////////////////////////////////////////////////////
103   // Static fields and block
104   ////////////////////////////////////////////////////////////////////////////////////////////
105 
106   /** Variable holding JVM executable, classpath and system properties */
107   private static StringBuilder jvmCmd      = new StringBuilder();
108 
109   /** The application starter class name */
110   private static final String  appStarter  = ApplicationStarter.class.getName();
111 
112   /** The Runtime environment for this VM */
113   private static final Runtime rt          = Runtime.getRuntime();
114 
115   /** Determine the local host name */
116   private static final String  thisHost    = Network.getLocalHostName();
117 
118   /** Determine the local host name */
119   private static final String  thisMachine = Network.getLocalMachineName();
120 
121 
122   static {
123     String s = File.separator;
124     jvmCmd.append(System.getProperty("java.home"));
125     jvmCmd.append(s);
126     jvmCmd.append("bin");
127     jvmCmd.append(s);
128     jvmCmd.append("java");
129     /*
130      * Jgroup properties to pass on to JVMs started from the ExecDaemon.
131      */
132     String[] jgroupProperties = new String[] {
133       "java.library.path",
134       "jgroup.simulator",
135       "jgroup.log.dir",
136       "jgroup.log.config",
137       "jgroup.log.msgcontent",
138       "jgroup.log.measurements",
139       "jgroup.system.config",
140       "jgroup.system.services",
141       "jgroup.system.applications",
142       "java.util.logging.config.file",
143     };
144     for (int i = 0; i < jgroupProperties.length; i++) {
145       String property = System.getProperty(jgroupProperties[i]);
146       if (property != null && property.length() > 0) {
147         jvmCmd.append(" -D");
148         jvmCmd.append(jgroupProperties[i]);
149         jvmCmd.append("=");
150         jvmCmd.append(property);
151       }
152     }
153     String classpath = System.getProperty("java.class.path");
154     if (classpath != null && classpath.length() > 0) {
155       jvmCmd.append(" -classpath ");
156       jvmCmd.append(classpath);
157     }
158     // Enable assertions
159     jvmCmd.append(" -ea");
160 //    jvmCmd.append(" -verbose:gc -Xloggc:/tmp/meling-logs/ed-gc.log");
161 //    System.out.println("JVM command line:\n" + jvmCmd.toString());
162   }
163 
164 
165   ////////////////////////////////////////////////////////////////////////////////////////////
166   // Constructor
167   ////////////////////////////////////////////////////////////////////////////////////////////
168 
169   public ExecDaemon()
170     throws RemoteException, JgroupException, ConfigurationException
171   {
172     ConfigManager.init();
173     DistributedSystemConfig dsc = ConfigManager.getDistributedSystem();
174     if (!dsc.containsLocalHost()) {
175       JgroupException e = new JgroupException(
176         "Distributed system configuration does not contain local host: " + Network.getLocalHostName() +
177         "\nThis is required by the execdaemon to start servers.");
178       throw e;
179     }
180     Host host = DistributedSystemConfig.getLocalHost();
181     try {
182       DatagramSocket socket = new DatagramSocket(host.getPort(), host.getAddress());
183       // close the socket to allow the Jgroup/daemon to use it once a replica starts
184       socket.close();
185     } catch (SocketException e1) {
186       JgroupException e = new JgroupException(
187         "The datagram socket required by the Jgroup/daemon is occupied.\n" +
188         "This could be due to an old copy of the Jgroup/daemon that was not properly\n" +
189         "shutdown in a previous run.  Please ensure to kill any remaining java processes,\n" +
190         "or switch to a different host port: " + host.getAddress() + ":" + host.getPort(), e1);
191       throw e;
192     }
193 
194     /*
195      * Set up a shutdown hook for the ExecDaemon to ensure that
196      * replicas are removed when the ExecDaemon is shutdown or crashes.
197      */
198     Runtime.getRuntime().addShutdownHook(new Thread("ED-ShutdownHook") {
199       public void run() {
200         log.info("ED-ShutdownHook: shutdown commenced (removing replicas)...");
201         if (Eventlogger.ENABLED)
202           Eventlogger.logEventFlush("HostShutdown");
203         removeAllReplicas();
204         LogManager.shutdown();
205       }});
206 
207     /*
208      * Ensure that we create a local bootstrap registry for this machine,
209      * and that there is no other registry occupying the registry access port.
210      */
211     try {
212       BootstrapRegistry.createRegistry();
213     } catch (RemoteException e) {
214       log.error("Could not create bootstrap registry", e);
215       throw new Abort("ExecDaemon could not be started on " + thisHost, e);
216     }
217 
218     Exporter exporter =
219       new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
220     Remote execProxy = exporter.export(this);
221     try {
222       BootstrapRegistry.bind(EXEC_DAEMON_NAME, execProxy);
223     } catch (AlreadyBoundException e) {
224       throw new Abort("Execution daemon already running on " + thisHost, e);
225     }
226 
227     if (Boolean.getBoolean("jgroup.factory.notify.presence")) {
228       notifyPresence();
229     }
230 
231     DaemonInteraction.initDaemon();
232 //    if (Boolean.getBoolean("jgroup.daemon")) {
233 //      /*
234 //       * Start the Jgroup daemon local to the execdaemon; thus all group managers
235 //       * will use RMI to communicate with the Jgroup daemon.
236 //       */
237 //      DaemonInteraction.initDaemon();
238 //    }
239   }
240 
241   /**
242    * If the ExecDaemon is started after the ReplicaManager, then lookup
243    * the replication manager in the dependable registry and notify our presence.
244    */
245   private void notifyPresence()
246   {
247     /* Obtain the replication manager registry name. */
248     AppConfig rmApp = AppConfig.getApplication(ReplicaManagerImpl.class);
249     String rmRegName = rmApp.getRegistryName();
250 
251     try {
252       /*
253        * Obtain a proxy for the dependable registry running in the
254        * distributed system.
255        */
256       DependableRegistry registry = RegistryFactory.getRegistry();
257       ReplicationManager replicationManager = (ReplicationManager) registry.lookup(rmRegName);
258       /* Notify the replication manager of our presence. */
259       replicationManager.notifyEvent(new EDPresentEvent(Network.getLocalHost()));
260 
261     } catch (Exception e) {
262       if (log.isDebugEnabled())
263         log.debug(rmRegName + " not yet available.");
264     }
265   }
266 
267 
268   ////////////////////////////////////////////////////////////////////////////////////////////
269   // Main method
270   ////////////////////////////////////////////////////////////////////////////////////////////
271 
272   public static void main(String[] argv) 
273     throws Exception
274   {
275     // Start the ExecDaemon
276     try {
277       ExecDaemon exec = new ExecDaemon();
278       System.out.println("Replica object factory ready: " + Network.getLocalHostName());
279     } catch(Exception e) {
280       Abort.exit("Failed to start the execution daemon on the local host", e, 1);
281     }
282   }
283 
284 
285   ////////////////////////////////////////////////////////////////////////////////////////////
286   // Methods from ExecService interface
287   ////////////////////////////////////////////////////////////////////////////////////////////
288 
289   /**
290    *  Comments inherited from jgroup.arm.ExecService
291    */
292   public synchronized boolean createExecReplica(ClassData classData)
293     throws RemoteException, ExecException
294   {
295     if (replicaMap.containsKey(classData)) {
296       // Replica is already running on this host.
297       return false;
298     } else {
299       Class c = classData.getClassObject();
300       String mainError = c.getName() + " does not implement a 'public static void"
301                          + " main(String[] args)' method.";
302 
303       /*
304        * Check for method:  main(String[] args)  in the specified class.
305        */
306       try {
307 
308         Method main = c.getDeclaredMethod("main", new Class[] { String[].class });
309         int m = main.getModifiers();
310         if (!Modifier.isPublic(m) || !Modifier.isStatic(m)) {
311           throw new ExecException(mainError);
312         }
313 
314         ReplicaThread replicaThrd = new ReplicaThread(classData, main);
315         replicaThrd.setDaemon(true);
316         replicaThrd.start();
317 
318         /* Keep the replica thread object in case we want to remove it */
319         replicaMap.put(classData, replicaThrd);
320         if (log.isDebugEnabled())
321           log.debug("Created replica: " + classData.getShortName());
322         return true;
323 
324       } catch (NoSuchMethodException e) {
325         throw new ExecException(mainError, e);
326       }
327     }
328   }
329 
330 
331   /**
332    *  Instantiate and start the given class (replica) in a JVM process
333    *  separate from the execution service JVM process.
334    *
335    *  Creates a new replica in a seperate JVM process.  Only one replica
336    *  for each matching <code>ClassData</code> object (or group) can be
337    *  started on each host.  This makes the operation idempotent,
338    *  allowing several replication managers to invoke the same create
339    *  operation on the same host as another replication manager already
340    *  may have done.  This prevents creating multiple instances of the
341    *  same class with identical arguments.  Thus, different arguments
342    *  are not considered the same instance.
343    *
344    *  @param classData 
345    *    The class data object representing the replica to start.
346    *  @return
347    *    True if the replica was created; false is returned if the
348    *    replica is already running on this host.
349    * 
350    *  @throws RemoteException 
351    *    If remote operation failed.
352    *  @throws ExecException 
353    *    If the specified class (replica) could not be instantiated.
354    */
355   public synchronized boolean createReplica(ClassData classData)
356     throws RemoteException, ExecException
357   {
358     if (replicaMap.containsKey(classData)) {
359       // Replica is already running on this host.
360       return false;
361     } else {
362       ReplicaThread replicaThrd = new ReplicaThread(classData);
363       replicaThrd.setPriority(Thread.NORM_PRIORITY+1);
364       replicaThrd.setDaemon(true);
365       replicaThrd.start();
366 
367       /* Keep the replica thread object in case we want to remove it */
368       replicaMap.put(classData, replicaThrd);
369       if (log.isDebugEnabled())
370         log.debug("Created replica: " + classData.getShortName());
371       return true;
372     }
373   }
374 
375 
376   public synchronized boolean removeReplica(ClassData classData)
377     throws RemoteException
378   {
379     if (log.isDebugEnabled())
380       log.debug("Removing replica: " + classData.getShortName());
381     if (replicaMap.containsKey(classData)) {
382       /*
383        * Remove the local replica thread for the specified class
384        * (group) from the replicaMap.
385        */
386       ReplicaThread replicaThrd = (ReplicaThread) replicaMap.remove(classData);
387       /* Remove the local replica and its thread. */
388       replicaThrd.remove();
389       if (Eventlogger.ENABLED) {
390         int groupId = AppConfig.getApplication(classData.getClassObject()).getGroupId();
391         Eventlogger.logEventFlush(new ReplicaEvent(Removed, groupId));
392       }
393       /* Further clean up is performed by the replica thread. */
394       return true;
395     } else {
396       // Replica not running on this host; may have been removed already
397       return false;
398     }
399   }
400 
401 
402   /**
403    *  Remove all local replicas associated with this ExecDaemon.
404    */
405   private void removeAllReplicas()
406   {
407     try {
408       Set<ClassData> replicas = queryReplicas();
409       for (ClassData cl : replicas) {
410         removeReplica(cl);
411       }
412     } catch (RemoteException e) {
413       // Cannot happen since we are local to this JVM
414       e.printStackTrace();
415     }
416   }
417 
418 
419   /**
420    * @see jgroup.core.arm.ExecService#queryReplicas()
421    */
422   public synchronized Set<ClassData> queryReplicas()
423     throws RemoteException
424   {
425     // clone the set of replicas to avoid concurrent access to the map
426     synchronized (replicaMap) {
427       return new HashSet<ClassData>(replicaMap.keySet());
428     }
429   }
430 
431 
432   /**
433    * @see jgroup.core.arm.ExecService#ping()
434    */
435   public void ping()
436     throws RemoteException
437   {
438     if (log.isDebugEnabled())
439       log.debug("PING");
440     return;
441   }
442 
443   /* (non-Javadoc)
444    * @see jgroup.core.arm.ExecService#shutdown()
445    */
446   public void shutdown(final int delay)
447     throws RemoteException
448   {
449     new Thread() {
450       public void run() {
451         if (delay < 0) {
452           System.out.println("Halting ExecDaemon and all local replicas");
453           removeAllReplicas();
454           try { sleep(2000); } catch (InterruptedException e) { }
455           // ensure that log files are closed
456           if (Eventlogger.ENABLED)
457             Eventlogger.close();
458           LogManager.shutdown();
459           // do not run the shutdown hooks
460           rt.halt(0);
461         } else if (delay > 0) {
462           System.out.println("I'm shuting down in " + delay + " milliseconds.");
463           try { sleep(delay); } catch (InterruptedException e) { }
464         } else {
465           // delay == 0
466           System.out.println("I'm shuting down now");
467         }
468         if (Eventlogger.ENABLED)
469           Eventlogger.logEventFlush("InjectingFailure");
470         // ensure that log files are closed
471         LogManager.shutdown();
472         System.out.println(new jgroup.util.log.Event(Shutdown, "InjectingFailure"));
473         System.exit(0);
474         // The ExecDaemon shutdown hook will deal with removing replicas.
475       }
476     }.start();
477     return;
478   }
479 
480 
481   ////////////////////////////////////////////////////////////////////////////////////////////
482   // Private inner classfor handling replica threads.
483   ////////////////////////////////////////////////////////////////////////////////////////////
484 
485   /**
486    *  Private inner class <code>ReplicaThread</code> is used to start
487    *  replicas, and will keep waiting for the replica to end, vulentarely
488    *
489    *  @author Hein Meling
490    */
491   private class ReplicaThread
492     extends Thread
493   {
494     /**
495      *  The process in which the replica may be running, assuming it is
496      *  running in a JVM external to the execution daemon.
497      */
498     private Process replica;
499 
500     /** Short classname used for logging. */
501     private String shortClass;
502 
503     /** The full class information for this replica. */
504     private ClassData classData;
505 
506     /**
507      *  Indicator used to trigger the removal of a replica.
508      *  Used only for replicas internal to the execution daemon.
509      */
510     private boolean removed;
511 
512 
513     //////////////////////////////////////////////////////////////////////////////////////////
514     // Constructors for the replica thread
515     //////////////////////////////////////////////////////////////////////////////////////////
516 
517     /**
518      *  Creates a new <code>ReplicaThread</code> instance for a replica
519      *  started in the same JVM as the execution daemon.
520      *
521      *  @param classData
522      *    Class and argument information for the replica to be started.
523      *  @exception ExecException
524      *    Raised if the replica could not be started.
525      */
526     ReplicaThread(ClassData classData, Method main)
527       throws ExecException
528     {
529       super("Local-ReplicaThread-" + classData.getShortName());
530 
531       removed = false;
532       this.classData = classData;
533       shortClass = classData.getShortName();
534 
535       try {
536         /*
537          * Invoke the main method to start the replica in the same
538          * JVM as the execution daemon.
539          */
540         main.invoke(null, new Object[] { classData.getArgs() });
541       } catch (InvocationTargetException e) {
542         throw new ExecException(classData.getClassObject()
543           + " has thrown an exception during invocation of the main method.", e.getTargetException());
544       } catch (IllegalAccessException e) {
545         throw new ExecException(shortClass + " failed to start", e);
546       }
547       log.info("Starting replica: " + shortClass);
548     }
549 
550 
551     /**
552      *  Creates a new <code>ReplicaThread</code> instance for a replica
553      *  started external to the execution daemon, i.e., in a separate JVM.
554      *
555      *  @param classData
556      *    Class and argument information for the replica to be started.
557      *  @exception ExecException
558      *    Raised if the replica could not be started.
559      */
560     ReplicaThread(ClassData classData)
561       throws ExecException
562     {
563       super("External-ReplicaThread-" + classData.getShortName());
564 
565       this.classData = classData;
566       shortClass = classData.getShortName();
567       String cmd = classData.getCommand(jvmCmd.toString(), appStarter);
568       try {
569         replica = rt.exec(cmd);
570 
571         /*
572          * Create a new thread to read and print the <i>stdout</i> for
573          * this replica.
574          */
575         Thread outThrd = new Thread(shortClass + "-out") {
576           public void run() {
577             InputStream drOut = replica.getInputStream();
578             BufferedReader out = new BufferedReader(new InputStreamReader(drOut));
579             String stdout;
580             try {
581               while ((stdout = out.readLine()) != null) {
582                 synchronized(this) {
583                   if (log.isDebugEnabled()) {
584                     log.debug(stdout);
585                   }
586                   System.out.println(shortClass + "-" + thisMachine + ": " + stdout);
587                 }
588               }
589               out.close();
590             } catch (java.io.IOException e) {
591               log.warn("Replica " + shortClass + " failed.", e);
592             }
593           }
594         };
595         outThrd.setDaemon(true);
596         outThrd.start();
597 
598         /*
599          * Create a new thread to read and print the <i>stderr</i> for
600          * this replica.
601          */
602         Thread errThrd = new Thread(shortClass + "-err") {
603           public void run() {
604             InputStream drErr = replica.getErrorStream();
605             BufferedReader err = new BufferedReader(new InputStreamReader(drErr));
606             String stderr;
607             try {
608               while ((stderr = err.readLine()) != null) {
609                 synchronized(this) {
610                   log.error(stderr);
611                   System.out.println(shortClass + "-" + thisMachine + "-err: " + stderr);
612                 }
613               }
614               err.close();
615             } catch (java.io.IOException e) {
616               log.warn("Replica " + shortClass + " failed.", e);
617             }
618           }
619         };
620         errThrd.setDaemon(true);
621         errThrd.start();
622 
623         if (log.isDebugEnabled())
624           log.debug("Starting replica: " + cmd);
625         else
626           log.info("Starting replica: " + shortClass);
627 
628       } catch (java.io.IOException ioe) {
629         throw new ExecException("Replica could not be started.", ioe);
630       }
631     }
632 
633 
634     /**
635      *  Invoked by the <code>removeReplica()</code> method to stop the
636      *  thread, causing the replica itself to exit.
637      *
638      *  For the case when the replica is executing in a separate process,
639      *  invoking the <code>destory()</code> method will cause the
640      *  <code>run()</code> method below to pass through the
641      *  <code>waitFor()</code> method and thus it will clean up its
642      *  tables and exit this thread.
643      */
644     synchronized void remove()
645     {
646       if (replica == null) {
647         /* The replica is running in this JVM. */
648         removed = true;
649         notifyAll();
650       } else {
651         /* The replica is running in a separate JVM process. */
652         OutputStream os = replica.getOutputStream();
653         PrintWriter writer = new PrintWriter(os, true);
654         /* Send a shutdown message to the child replica process. */
655         if (log.isDebugEnabled())
656           log.debug("Sending SHUTDOWN message to replica");
657         writer.println(SHUTDOWN_REPLICA);
658         writer.close();
659       }
660     }
661 
662 
663     /**
664      *  This method will also wait until the replica exits and will thus
665      *  clean up the execution service tables.
666      */
667     public void run()
668     {
669       try {
670         if (replica == null) {
671           synchronized (this) {
672             while (!removed) {
673               wait();
674             }
675           }
676         } else {
677           if (replica.waitFor() != 0) {
678             log.warn("Got non-zero return value; replica failed or was removed.");
679           }
680         }
681 
682       } catch (InterruptedException e) {
683         log.warn("Replica " + shortClass + " failed.", e);
684 
685       } finally {
686         /* Replica exit; always clean up. */
687         log.info("Replica exiting: " + shortClass);
688         synchronized (this) {
689           replicaMap.remove(classData);
690           if (replica != null)
691             replica.destroy();
692         }
693 //        try {
694 //          /* Notify the replication manager of this replica failure. */
695 //          replicaManager.notifyEvent(new ReplicaFailureEvent(classData));
696 //        } catch (Exception e) {
697 //          log.warn("Unable to notify Replication Manager", e);
698 //        }
699       }
700     }
701   } // END of private inner class ReplicaThread
702 
703 } // END of public class ExecDaemon