1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21 import java.io.IOException;
22 import java.rmi.Remote;
23 import java.rmi.RemoteException;
24 import java.rmi.server.ExportException;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.Set;
28
29 import jgroup.core.ConfigManager;
30 import jgroup.core.ConfigurationException;
31 import jgroup.core.EndPoint;
32 import jgroup.core.JgroupException;
33 import jgroup.core.MemberId;
34 import jgroup.core.View;
35 import jgroup.core.registry.BootstrapRegistry;
36 import jgroup.relacs.config.DistributedSystemConfig;
37 import jgroup.relacs.config.TransportConfig;
38 import jgroup.relacs.events.DeliveryAck;
39 import jgroup.relacs.events.Event;
40 import jgroup.relacs.events.EventTags;
41 import jgroup.relacs.events.InstallAck;
42 import jgroup.relacs.events.InstallEvent;
43 import jgroup.relacs.events.JoinRequest;
44 import jgroup.relacs.events.LeaveRequest;
45 import jgroup.relacs.events.MulticastRequest;
46 import jgroup.relacs.events.PrepareAck;
47 import jgroup.relacs.mss.HostTable;
48 import jgroup.relacs.mss.Mss;
49 import jgroup.relacs.mss.MssConstants;
50 import jgroup.relacs.mss.MssHost;
51 import jgroup.relacs.mss.MssUser;
52 import jgroup.relacs.types.GroupId;
53 import jgroup.relacs.types.MemberIdImpl;
54 import jgroup.relacs.types.ViewId;
55 import jgroup.relacs.types.ViewImpl;
56 import jgroup.util.InMessage;
57 import jgroup.util.Network;
58 import jgroup.util.ThreadMonitor;
59 import jgroup.util.Util;
60 import net.jini.export.Exporter;
61 import net.jini.jeri.BasicILFactory;
62 import net.jini.jeri.BasicJeriExporter;
63 import net.jini.jeri.tcp.TcpServerEndpoint;
64
65 import org.apache.log4j.Logger;
66 import org.apache.log4j.MDC;
67 import org.apache.log4j.NDC;
68
69
70
71
72
73
74
75
76
77
78 public final class Daemon
79 implements DaemonService, MssUser, Tag, EventTags, MssConstants
80 {
81
82
83
84
85
86
87 private static final Logger log = Logger.getLogger(Daemon.class);
88
89
90
91
92
93
94
95 private static int members = 0;
96
97
98 private static final String daemonRegName =
99 DAEMON_NAME + DistributedSystemConfig.getLocalHost().getPort();
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public static Daemon createDaemon(TransportConfig conf)
123 throws ConfigurationException, IOException
124 {
125 if (log.isDebugEnabled())
126 log.debug("Attempting to create a daemon on the localhost");
127
128 DistributedSystemConfig dsc = ConfigManager.getDistributedSystem();
129
130 Daemon daemon = new Daemon(dsc, conf);
131 if (log.isDebugEnabled())
132 log.debug("Successfully created a daemon on localhost: " + daemon);
133 return daemon;
134 }
135
136
137 public static void bindLocalDaemon(DaemonService daemonService)
138 throws JgroupException
139 {
140 try {
141
142
143
144
145
146
147 Exporter exporter =
148 new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
149 Remote daemonProxy = exporter.export(daemonService);
150 if (log.isDebugEnabled())
151 log.debug("DaemonService has been exported");
152 BootstrapRegistry.rebind(daemonRegName, daemonProxy);
153 if (log.isDebugEnabled())
154 log.debug("DaemonService has been bound to the bootstrap registry");
155 } catch (ExportException ee) {
156 if (log.isDebugEnabled())
157 log.warn("Failed to export DaemonService", ee);
158 throw new JgroupException("Failed to export the Daemon", ee);
159 } catch (IOException ioe) {
160 if (log.isDebugEnabled())
161 log.warn("Failed to bind the DaemonService", ioe);
162 throw new JgroupException("Failed to bind the Daemon", ioe);
163 }
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public static DaemonService lookupLocalDaemon()
186 throws JgroupException
187 {
188 Exception ex = null;
189 for (int retryCount = retries; retryCount > 0; retryCount--) {
190 try {
191
192 DaemonService daemonService = (DaemonService) BootstrapRegistry.lookup(daemonRegName);
193 if (log.isDebugEnabled())
194 log.debug("Successfully obtained RMI-based daemon reference for localhost");
195 ex = null;
196 return daemonService;
197
198 } catch (Exception e) {
199 try {
200 BootstrapRegistry.refreshRegistryStub();
201 } catch (RemoteException e1) {
202 log.warn("Problem connecting to the bootstrap registry.", e1);
203 }
204 ex = e;
205 log.warn("Retry (" + (retries-retryCount) + "/" + retries
206 + ") to fetch Daemon named " + daemonRegName + " failed");
207
208
209
210
211
212
213
214
215
216 try {
217 Thread.sleep(retryDelay);
218 } catch (InterruptedException ie) {
219
220 }
221
222 }
223 }
224 String mesg = "Cannot obtain a reference for a remote daemon (on the localhost)";
225 if (log.isDebugEnabled())
226 log.error(mesg);
227 throw new JgroupException(mesg, ex);
228 }
229
230
231
232
233 public static DaemonService quickLookupLocalDaemon()
234 {
235
236 try {
237 DaemonService ds = (DaemonService) BootstrapRegistry.lookup(daemonRegName);
238 if (log.isDebugEnabled())
239 log.debug("Quick lookup successfully obtained RMI-based daemon reference for localhost");
240 return ds;
241 } catch (Exception e) {
242 return null;
243 }
244 }
245
246
247
248
249
250
251
252 private static int retries;
253
254
255 private static int retryDelay;
256
257
258 private final GroupTable grouptable = new GroupTable();
259
260
261 private final Set<MemberId> localMembers = new HashSet<MemberId>(5);
262
263
264 private final MssHost me;
265
266
267 private final Mss mss;
268
269
270 private int nsuspects;
271
272
273 private EndPoint[] trset;
274
275
276 private EndPoint[] nrset;
277
278
279 private EndPoint[] nuset;
280
281
282
283
284
285
286
287
288
289 private Daemon(DistributedSystemConfig ds, TransportConfig conf)
290 throws ConfigurationException, IOException
291 {
292
293 retries = conf.getRetryNo();
294 retryDelay = conf.getRetryDelay();
295
296
297 mss = new Mss(this, ds, conf);
298
299
300 HostTable hosttable = mss.getHostTable();
301 me = HostTable.getLocalHost();
302
303
304 nsuspects = 0;
305 trset = new EndPoint[] { me.getEndPoint() };
306 nrset = new EndPoint[0];
307 nuset = new EndPoint[0];
308
309
310 for (int i=0; i < hosttable.size(); i++) {
311 MssHost scan = hosttable.get(i);
312 EndPoint[] symset = new EndPoint[] { scan.getEndPoint() };
313 grouptable.setSymset(scan.getEndPoint(), symset);
314 }
315
316 }
317
318
319
320
321
322
323
324
325
326
327
328
329 public MemberId getMemberId(EndPoint localEndPoint)
330 throws RemoteException
331 {
332 MemberId member = new MemberIdImpl(me.getEndPoint(), me.getIncarnationId(), localEndPoint, ++members);
333 localMembers.add(member);
334 if (log.isDebugEnabled()) {
335 log.debug("Current set of local members: ");
336 for (Iterator iter = localMembers.iterator(); iter.hasNext();) {
337 log.debug(iter.next());
338 }
339 }
340 return member;
341 }
342
343
344
345
346
347 public int members()
348 throws RemoteException
349 {
350 if (log.isDebugEnabled()) {
351 log.debug("I'm the daemon, and " + members + " members are associated with me");
352 ThreadMonitor.dumpAllThreads();
353 }
354 return members;
355 }
356
357
358
359
360
361 public void addEvent(Event event)
362 throws RemoteException
363 {
364 if (log.isDebugEnabled()) {
365 MDC.put("group", "[Group: " + event.getGid() + "]");
366 log.debug("Received member event " + event);
367 }
368 mss.lsend(event);
369 }
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 public void remoteReceive(byte tag, InMessage stream, EndPoint src)
385 {
386 if (src.isLocal()) {
387 if (log.isDebugEnabled())
388 log.debug("Local message received; discarded at the daemon layer");
389 return;
390 }
391 if (log.isDebugEnabled()) {
392 NDC.push(Network.getMachineName(src.getAddress().getHostName()));
393 }
394 try {
395
396
397
398
399
400 int gid = GroupId.unmarshal(stream);
401 Group group = grouptable.getGroup(gid);
402 if (log.isDebugEnabled()) {
403 String tagString = Tag.R_TAGS[0];
404 if (tag > 0 && tag <= R_RESULT)
405 tagString = Tag.R_TAGS[tag];
406 else if (tag >= R_NOGID && tag <= R_JOINACK)
407 tagString = Tag.R_ExTAGS[tag-R_NOGID];
408 if (group == null)
409 MDC.put("group", "[Group: " + gid + "]");
410 else
411 MDC.put("group", group.toString());
412 log.debug("remoteReceive: tag=" + tagString + ", gid=" + gid + ", src=" + src);
413 }
414 if (group == null) {
415 if (log.isDebugEnabled())
416 log.debug("Not interested in group: " + gid + "; sending R_LEAVE to: " + src);
417 try {
418 MsgGroup tosend = MsgGroup.marshal(gid, new java.util.ArrayList(0), 1);
419 mss.send(R_LEAVE, tosend, src);
420 } catch (IOException e) {
421 log.error("Unable to marshal group message", e);
422 }
423 } else {
424
425
426
427 switch(tag) {
428 case R_MCAST:
429 group.handleMsgMcast (MsgMcast.unmarshal(stream), src);
430 break;
431 case R_ACK:
432 group.handleMsgAck (MsgAck.unmarshal(stream), src);
433 break;
434 case R_PROP:
435 group.handleMsgProp (MsgProp.unmarshal(stream), src);
436 break;
437 case R_ESTIM:
438 group.handleMsgEstim (MsgEstim.unmarshal(stream), src);
439 break;
440 case R_FORWARD:
441 group.handleMsgDlvrd (MsgMcast.unmarshal(stream), src);
442 break;
443 case R_SYNCH:
444 group.handleMsgSynch (MsgSynch.unmarshal(stream), src);
445 break;
446 case R_SYMM:
447 group.handleMsgSymm (MsgSymm.unmarshal(stream), src);
448 break;
449 case R_VIEW:
450 group.handleMsgView (MsgView.unmarshal(stream), src);
451 break;
452 case R_RESULT:
453 group.handleMsgResult (MsgResult.unmarshal(stream), src);
454 break;
455 case R_JOIN:
456 {
457 boolean memberAdded = group.handleMsgJoin(MsgGroup.unmarshal(stream), src);
458 if (memberAdded && Util.in(trset, src)) {
459 group.handleRemoteSuspect(nsuspects, trset, nrset, nuset);
460 }
461 break;
462 }
463 case R_JOINACK:
464 {
465 boolean memberAdded = group.handleMsgJoinAck(MsgGroup.unmarshal(stream), src);
466 if (memberAdded && Util.in(trset, src)) {
467 group.handleRemoteSuspect(nsuspects, trset, nrset, nuset);
468 }
469 break;
470 }
471 case R_LEAVE:
472 group.leave(src);
473 break;
474 default:
475 log.warn("Ignoring unknown message tag: " + tag);
476 }
477 if (group.isEmpty()) {
478
479 grouptable.removeGroup(gid);
480 group.sendLeave();
481 }
482 }
483 } catch (IOException e) {
484 log.error("Unable to handle mreceived message", e);
485 } catch (ClassNotFoundException e) {
486 log.error("Internal error: Cannot unmarshal mreceived message", e);
487 } finally {
488 NDC.pop();
489 }
490 }
491
492
493
494
495
496
497
498 public void localReceive(Event event)
499 {
500 int gid = event.getGid();
501 Group group = grouptable.getGroup(gid);
502 if (event.getTag() == JOIN_REQUEST && group != null && group.isEmpty()) {
503 grouptable.removeGroup(gid);
504 group = null;
505 }
506 if (group != null) {
507 if (log.isDebugEnabled()) {
508 NDC.push("local");
509 MDC.put("group", group.toString());
510 log.debug("localReceive: " + event);
511 }
512
513
514
515
516 switch (event.getTag()) {
517 case MULTICAST_REQUEST:
518 group.handleMulticastRequest ((MulticastRequest) event);
519 break;
520 case DELIVERY_ACK:
521 group.handleLocalDlvrAck ((DeliveryAck) event);
522 break;
523 case INSTALL_ACK:
524 group.handleLocalViewAck ((InstallAck) event);
525 break;
526 case PREPARE_ACK:
527 group.handleLocalPrepareAck ((PrepareAck) event);
528 break;
529 case JOIN_REQUEST:
530 group.handleLocalJoin ((JoinRequest) event, this);
531 break;
532 case LEAVE_REQUEST:
533 group.handleLeaveRequest ((LeaveRequest) event);
534 break;
535 }
536 } else if (event.getTag() == JOIN_REQUEST) {
537 if (log.isDebugEnabled()) {
538 NDC.push("local");
539 MDC.put("group", "[Group: " + gid + "]");
540 }
541
542
543
544
545 createGroup((JoinRequest) event);
546 } else {
547 log.warn("Illegal local event (unknown group " + gid + ")");
548 }
549 if (log.isDebugEnabled())
550 NDC.pop();
551 }
552
553
554
555
556
557
558
559
560
561
562 public void remoteSuspect(EndPoint[] trset, EndPoint[] nrset, EndPoint[] nuset, EndPoint[] newinc)
563 {
564
565 nsuspects++;
566
567 if (log.isDebugEnabled()) {
568 log.debug("nsuspects=" + nsuspects);
569 StringBuilder buf = new StringBuilder("trset(");
570 buf.append(trset.length);
571 buf.append("): ");
572 for (int i = 0; i < trset.length; i++) {
573 buf.append(trset[i]);
574 buf.append(" ");
575 }
576 log.debug(buf);
577
578 buf = new StringBuilder("nrset(");
579 buf.append(nrset.length);
580 buf.append("): ");
581 for (int i = 0; i < nrset.length; i++) {
582 buf.append(nrset[i]);
583 buf.append(" ");
584 }
585 log.debug(buf);
586
587 buf = new StringBuilder("nuset(");
588 buf.append(nuset.length);
589 buf.append("): ");
590 for (int i = 0; i < nuset.length; i++) {
591 buf.append(nuset[i]);
592 buf.append(" ");
593 }
594 log.debug(buf);
595 }
596
597
598
599
600
601 for(int i=0; i < nrset.length; i++)
602 grouptable.setSymset(nrset[i], this.trset);
603 this.trset = trset;
604 this.nrset = nrset;
605 this.nuset = nuset;
606
607
608 if (nrset.length > 0) {
609 for (Iterator iter = grouptable.iterator(); iter.hasNext(); ) {
610 Group group = (Group) iter.next();
611 MDC.put("group", group.toString());
612 try {
613 MsgGroup tosend = MsgGroup.marshal(group.getGid(), group.getHosts(), nrset.length);
614 mss.msend(R_JOIN, tosend, nrset);
615 } catch (IOException e) {
616 log.error("Unable to marshal group message", e);
617 }
618 if (log.isDebugEnabled())
619 log.debug("Sent JOIN message for group (" + group.getGid() + ") to new reachable hosts");
620 }
621 }
622
623
624 if (newinc != null) {
625 for (int i=0; i < newinc.length; i++) {
626
627 for (Iterator iter = grouptable.getGroups(newinc[i]); iter.hasNext(); ) {
628 Group group = (Group) iter.next();
629 HostData hd = group.getHost(newinc[i]);
630 if (hd != null)
631 hd.newIncarnationId();
632 if (log.isDebugEnabled()) {
633 MDC.put("group", group.toString());
634 log.debug("Newinc: " + newinc[i]);
635 }
636 }
637 }
638 }
639 activateGroups(nrset);
640 activateGroups(nuset);
641 }
642
643
644
645
646
647
648
649
650
651
652 public void localSuspect(MemberId memberId)
653 {
654
655
656
657
658 for (Iterator iter = grouptable.iterator(); iter.hasNext(); ) {
659 Group group = (Group) iter.next();
660 MDC.put("group", group.toString());
661 group.handleLocalSuspect(memberId);
662 }
663 }
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679 private void createGroup(JoinRequest request)
680 {
681 int gid = request.getGid();
682 if (log.isDebugEnabled()) {
683 MDC.put("group", "[Group: " + gid +"]");
684 log.debug("Creating new (local) group: " + gid);
685 }
686 EndPoint endpoint = me.getEndPoint();
687 long vid = ViewId.create(endpoint, 0);
688 MemberId joiningMember = request.getMemberId();
689 View view = new ViewImpl(gid, vid, joiningMember);
690 MemberData md = new MemberData(joiningMember, request.getDispatcher());
691 md.schedulePingTimer(this);
692 HostData hd = new HostData(me);
693 Group group = new Group(grouptable, mss, gid, view, hd, md);
694 if (log.isDebugEnabled())
695 MDC.put("group", group.toString());
696 grouptable.insertGroup(group);
697 grouptable.addEndpointToGroup(endpoint, group);
698 InstallEvent event = new InstallEvent(gid, view, 0, 0);
699 boolean success = DaemonInteraction.addEvent(md, event);
700 if (!success) {
701
702
703
704
705
706
707 grouptable.removeGroup(gid);
708 if (log.isDebugEnabled())
709 log.debug("Failed to create group " + gid + "; new member crashed");
710 return;
711 }
712 try {
713 MsgGroup tosend = MsgGroup.marshal(gid, group.getHosts(), ALL);
714
715
716
717
718 mss.msend(R_JOIN, tosend);
719 } catch (IOException e) {
720 log.error("Unable to marshal group message", e);
721 }
722 if (log.isDebugEnabled())
723 log.debug("Successfully created (local) group: " + gid);
724 }
725
726
727
728
729
730 private void activateGroups(EndPoint[] hosts)
731 {
732 for (int i=0; i < hosts.length; i++) {
733 for (Iterator iter = grouptable.getGroups(hosts[i]); iter.hasNext(); ) {
734 Group group = (Group) iter.next();
735 MDC.put("group", group.toString());
736 group.handleRemoteSuspect(nsuspects, trset, nrset, nuset);
737 }
738 }
739 }
740
741 }