1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.arm;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.InputStream;
24 import java.io.InputStreamReader;
25 import java.io.OutputStream;
26 import java.io.PrintWriter;
27 import java.lang.reflect.InvocationTargetException;
28 import java.lang.reflect.Method;
29 import java.lang.reflect.Modifier;
30 import java.net.DatagramSocket;
31 import java.net.SocketException;
32 import java.rmi.AlreadyBoundException;
33 import java.rmi.Remote;
34 import java.rmi.RemoteException;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.Map;
39 import java.util.Set;
40
41 import jgroup.core.ConfigManager;
42 import jgroup.core.ConfigurationException;
43 import jgroup.core.JgroupException;
44 import jgroup.core.arm.ExecException;
45 import jgroup.core.arm.ExecService;
46 import jgroup.core.arm.ReplicationManager;
47 import jgroup.core.registry.BootstrapRegistry;
48 import jgroup.core.registry.DependableRegistry;
49 import jgroup.core.registry.RegistryFactory;
50 import jgroup.relacs.config.AppConfig;
51 import jgroup.relacs.config.ClassData;
52 import jgroup.relacs.config.DistributedSystemConfig;
53 import jgroup.relacs.config.Host;
54 import jgroup.relacs.daemon.DaemonInteraction;
55 import jgroup.util.Abort;
56 import jgroup.util.Network;
57 import jgroup.util.log.Eventlogger;
58 import jgroup.util.log.ReplicaEvent;
59 import static jgroup.util.log.ReplicaEvent.Type.Removed;
60 import static jgroup.util.log.ReplicaEvent.Type.Shutdown;
61 import net.jini.export.Exporter;
62 import net.jini.jeri.BasicILFactory;
63 import net.jini.jeri.BasicJeriExporter;
64 import net.jini.jeri.tcp.TcpServerEndpoint;
65
66 import org.apache.log4j.LogManager;
67 import org.apache.log4j.Logger;
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class ExecDaemon
82 implements ExecService
83 {
84
85
86
87
88
89
90 private static final Logger log = Logger.getLogger(ExecDaemon.class);
91
92
93
94
95
96
97
98 private Map<ClassData,ReplicaThread> replicaMap =
99 Collections.synchronizedMap(new HashMap<ClassData,ReplicaThread>());
100
101
102
103
104
105
106
107 private static StringBuilder jvmCmd = new StringBuilder();
108
109
110 private static final String appStarter = ApplicationStarter.class.getName();
111
112
113 private static final Runtime rt = Runtime.getRuntime();
114
115
116 private static final String thisHost = Network.getLocalHostName();
117
118
119 private static final String thisMachine = Network.getLocalMachineName();
120
121
122 static {
123 String s = File.separator;
124 jvmCmd.append(System.getProperty("java.home"));
125 jvmCmd.append(s);
126 jvmCmd.append("bin");
127 jvmCmd.append(s);
128 jvmCmd.append("java");
129
130
131
132 String[] jgroupProperties = new String[] {
133 "java.library.path",
134 "jgroup.simulator",
135 "jgroup.log.dir",
136 "jgroup.log.config",
137 "jgroup.log.msgcontent",
138 "jgroup.log.measurements",
139 "jgroup.system.config",
140 "jgroup.system.services",
141 "jgroup.system.applications",
142 "java.util.logging.config.file",
143 };
144 for (int i = 0; i < jgroupProperties.length; i++) {
145 String property = System.getProperty(jgroupProperties[i]);
146 if (property != null && property.length() > 0) {
147 jvmCmd.append(" -D");
148 jvmCmd.append(jgroupProperties[i]);
149 jvmCmd.append("=");
150 jvmCmd.append(property);
151 }
152 }
153 String classpath = System.getProperty("java.class.path");
154 if (classpath != null && classpath.length() > 0) {
155 jvmCmd.append(" -classpath ");
156 jvmCmd.append(classpath);
157 }
158
159 jvmCmd.append(" -ea");
160
161
162 }
163
164
165
166
167
168
169 public ExecDaemon()
170 throws RemoteException, JgroupException, ConfigurationException
171 {
172 ConfigManager.init();
173 DistributedSystemConfig dsc = ConfigManager.getDistributedSystem();
174 if (!dsc.containsLocalHost()) {
175 JgroupException e = new JgroupException(
176 "Distributed system configuration does not contain local host: " + Network.getLocalHostName() +
177 "\nThis is required by the execdaemon to start servers.");
178 throw e;
179 }
180 Host host = DistributedSystemConfig.getLocalHost();
181 try {
182 DatagramSocket socket = new DatagramSocket(host.getPort(), host.getAddress());
183
184 socket.close();
185 } catch (SocketException e1) {
186 JgroupException e = new JgroupException(
187 "The datagram socket required by the Jgroup/daemon is occupied.\n" +
188 "This could be due to an old copy of the Jgroup/daemon that was not properly\n" +
189 "shutdown in a previous run. Please ensure to kill any remaining java processes,\n" +
190 "or switch to a different host port: " + host.getAddress() + ":" + host.getPort(), e1);
191 throw e;
192 }
193
194
195
196
197
198 Runtime.getRuntime().addShutdownHook(new Thread("ED-ShutdownHook") {
199 public void run() {
200 log.info("ED-ShutdownHook: shutdown commenced (removing replicas)...");
201 if (Eventlogger.ENABLED)
202 Eventlogger.logEventFlush("HostShutdown");
203 removeAllReplicas();
204 LogManager.shutdown();
205 }});
206
207
208
209
210
211 try {
212 BootstrapRegistry.createRegistry();
213 } catch (RemoteException e) {
214 log.error("Could not create bootstrap registry", e);
215 throw new Abort("ExecDaemon could not be started on " + thisHost, e);
216 }
217
218 Exporter exporter =
219 new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
220 Remote execProxy = exporter.export(this);
221 try {
222 BootstrapRegistry.bind(EXEC_DAEMON_NAME, execProxy);
223 } catch (AlreadyBoundException e) {
224 throw new Abort("Execution daemon already running on " + thisHost, e);
225 }
226
227 if (Boolean.getBoolean("jgroup.factory.notify.presence")) {
228 notifyPresence();
229 }
230
231 DaemonInteraction.initDaemon();
232
233
234
235
236
237
238
239 }
240
241
242
243
244
245 private void notifyPresence()
246 {
247
248 AppConfig rmApp = AppConfig.getApplication(ReplicaManagerImpl.class);
249 String rmRegName = rmApp.getRegistryName();
250
251 try {
252
253
254
255
256 DependableRegistry registry = RegistryFactory.getRegistry();
257 ReplicationManager replicationManager = (ReplicationManager) registry.lookup(rmRegName);
258
259 replicationManager.notifyEvent(new EDPresentEvent(Network.getLocalHost()));
260
261 } catch (Exception e) {
262 if (log.isDebugEnabled())
263 log.debug(rmRegName + " not yet available.");
264 }
265 }
266
267
268
269
270
271
272 public static void main(String[] argv)
273 throws Exception
274 {
275
276 try {
277 ExecDaemon exec = new ExecDaemon();
278 System.out.println("Replica object factory ready: " + Network.getLocalHostName());
279 } catch(Exception e) {
280 Abort.exit("Failed to start the execution daemon on the local host", e, 1);
281 }
282 }
283
284
285
286
287
288
289
290
291
292 public synchronized boolean createExecReplica(ClassData classData)
293 throws RemoteException, ExecException
294 {
295 if (replicaMap.containsKey(classData)) {
296
297 return false;
298 } else {
299 Class c = classData.getClassObject();
300 String mainError = c.getName() + " does not implement a 'public static void"
301 + " main(String[] args)' method.";
302
303
304
305
306 try {
307
308 Method main = c.getDeclaredMethod("main", new Class[] { String[].class });
309 int m = main.getModifiers();
310 if (!Modifier.isPublic(m) || !Modifier.isStatic(m)) {
311 throw new ExecException(mainError);
312 }
313
314 ReplicaThread replicaThrd = new ReplicaThread(classData, main);
315 replicaThrd.setDaemon(true);
316 replicaThrd.start();
317
318
319 replicaMap.put(classData, replicaThrd);
320 if (log.isDebugEnabled())
321 log.debug("Created replica: " + classData.getShortName());
322 return true;
323
324 } catch (NoSuchMethodException e) {
325 throw new ExecException(mainError, e);
326 }
327 }
328 }
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355 public synchronized boolean createReplica(ClassData classData)
356 throws RemoteException, ExecException
357 {
358 if (replicaMap.containsKey(classData)) {
359
360 return false;
361 } else {
362 ReplicaThread replicaThrd = new ReplicaThread(classData);
363 replicaThrd.setPriority(Thread.NORM_PRIORITY+1);
364 replicaThrd.setDaemon(true);
365 replicaThrd.start();
366
367
368 replicaMap.put(classData, replicaThrd);
369 if (log.isDebugEnabled())
370 log.debug("Created replica: " + classData.getShortName());
371 return true;
372 }
373 }
374
375
376 public synchronized boolean removeReplica(ClassData classData)
377 throws RemoteException
378 {
379 if (log.isDebugEnabled())
380 log.debug("Removing replica: " + classData.getShortName());
381 if (replicaMap.containsKey(classData)) {
382
383
384
385
386 ReplicaThread replicaThrd = (ReplicaThread) replicaMap.remove(classData);
387
388 replicaThrd.remove();
389 if (Eventlogger.ENABLED) {
390 int groupId = AppConfig.getApplication(classData.getClassObject()).getGroupId();
391 Eventlogger.logEventFlush(new ReplicaEvent(Removed, groupId));
392 }
393
394 return true;
395 } else {
396
397 return false;
398 }
399 }
400
401
402
403
404
405 private void removeAllReplicas()
406 {
407 try {
408 Set<ClassData> replicas = queryReplicas();
409 for (ClassData cl : replicas) {
410 removeReplica(cl);
411 }
412 } catch (RemoteException e) {
413
414 e.printStackTrace();
415 }
416 }
417
418
419
420
421
422 public synchronized Set<ClassData> queryReplicas()
423 throws RemoteException
424 {
425
426 synchronized (replicaMap) {
427 return new HashSet<ClassData>(replicaMap.keySet());
428 }
429 }
430
431
432
433
434
435 public void ping()
436 throws RemoteException
437 {
438 if (log.isDebugEnabled())
439 log.debug("PING");
440 return;
441 }
442
443
444
445
446 public void shutdown(final int delay)
447 throws RemoteException
448 {
449 new Thread() {
450 public void run() {
451 if (delay < 0) {
452 System.out.println("Halting ExecDaemon and all local replicas");
453 removeAllReplicas();
454 try { sleep(2000); } catch (InterruptedException e) { }
455
456 if (Eventlogger.ENABLED)
457 Eventlogger.close();
458 LogManager.shutdown();
459
460 rt.halt(0);
461 } else if (delay > 0) {
462 System.out.println("I'm shuting down in " + delay + " milliseconds.");
463 try { sleep(delay); } catch (InterruptedException e) { }
464 } else {
465
466 System.out.println("I'm shuting down now");
467 }
468 if (Eventlogger.ENABLED)
469 Eventlogger.logEventFlush("InjectingFailure");
470
471 LogManager.shutdown();
472 System.out.println(new jgroup.util.log.Event(Shutdown, "InjectingFailure"));
473 System.exit(0);
474
475 }
476 }.start();
477 return;
478 }
479
480
481
482
483
484
485
486
487
488
489
490
491 private class ReplicaThread
492 extends Thread
493 {
494
495
496
497
498 private Process replica;
499
500
501 private String shortClass;
502
503
504 private ClassData classData;
505
506
507
508
509
510 private boolean removed;
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526 ReplicaThread(ClassData classData, Method main)
527 throws ExecException
528 {
529 super("Local-ReplicaThread-" + classData.getShortName());
530
531 removed = false;
532 this.classData = classData;
533 shortClass = classData.getShortName();
534
535 try {
536
537
538
539
540 main.invoke(null, new Object[] { classData.getArgs() });
541 } catch (InvocationTargetException e) {
542 throw new ExecException(classData.getClassObject()
543 + " has thrown an exception during invocation of the main method.", e.getTargetException());
544 } catch (IllegalAccessException e) {
545 throw new ExecException(shortClass + " failed to start", e);
546 }
547 log.info("Starting replica: " + shortClass);
548 }
549
550
551
552
553
554
555
556
557
558
559
560 ReplicaThread(ClassData classData)
561 throws ExecException
562 {
563 super("External-ReplicaThread-" + classData.getShortName());
564
565 this.classData = classData;
566 shortClass = classData.getShortName();
567 String cmd = classData.getCommand(jvmCmd.toString(), appStarter);
568 try {
569 replica = rt.exec(cmd);
570
571
572
573
574
575 Thread outThrd = new Thread(shortClass + "-out") {
576 public void run() {
577 InputStream drOut = replica.getInputStream();
578 BufferedReader out = new BufferedReader(new InputStreamReader(drOut));
579 String stdout;
580 try {
581 while ((stdout = out.readLine()) != null) {
582 synchronized(this) {
583 if (log.isDebugEnabled()) {
584 log.debug(stdout);
585 }
586 System.out.println(shortClass + "-" + thisMachine + ": " + stdout);
587 }
588 }
589 out.close();
590 } catch (java.io.IOException e) {
591 log.warn("Replica " + shortClass + " failed.", e);
592 }
593 }
594 };
595 outThrd.setDaemon(true);
596 outThrd.start();
597
598
599
600
601
602 Thread errThrd = new Thread(shortClass + "-err") {
603 public void run() {
604 InputStream drErr = replica.getErrorStream();
605 BufferedReader err = new BufferedReader(new InputStreamReader(drErr));
606 String stderr;
607 try {
608 while ((stderr = err.readLine()) != null) {
609 synchronized(this) {
610 log.error(stderr);
611 System.out.println(shortClass + "-" + thisMachine + "-err: " + stderr);
612 }
613 }
614 err.close();
615 } catch (java.io.IOException e) {
616 log.warn("Replica " + shortClass + " failed.", e);
617 }
618 }
619 };
620 errThrd.setDaemon(true);
621 errThrd.start();
622
623 if (log.isDebugEnabled())
624 log.debug("Starting replica: " + cmd);
625 else
626 log.info("Starting replica: " + shortClass);
627
628 } catch (java.io.IOException ioe) {
629 throw new ExecException("Replica could not be started.", ioe);
630 }
631 }
632
633
634
635
636
637
638
639
640
641
642
643
644 synchronized void remove()
645 {
646 if (replica == null) {
647
648 removed = true;
649 notifyAll();
650 } else {
651
652 OutputStream os = replica.getOutputStream();
653 PrintWriter writer = new PrintWriter(os, true);
654
655 if (log.isDebugEnabled())
656 log.debug("Sending SHUTDOWN message to replica");
657 writer.println(SHUTDOWN_REPLICA);
658 writer.close();
659 }
660 }
661
662
663
664
665
666
667 public void run()
668 {
669 try {
670 if (replica == null) {
671 synchronized (this) {
672 while (!removed) {
673 wait();
674 }
675 }
676 } else {
677 if (replica.waitFor() != 0) {
678 log.warn("Got non-zero return value; replica failed or was removed.");
679 }
680 }
681
682 } catch (InterruptedException e) {
683 log.warn("Replica " + shortClass + " failed.", e);
684
685 } finally {
686
687 log.info("Replica exiting: " + shortClass);
688 synchronized (this) {
689 replicaMap.remove(classData);
690 if (replica != null)
691 replica.destroy();
692 }
693
694
695
696
697
698
699 }
700 }
701 }
702
703 }