1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30
31 import jgroup.core.EndPoint;
32 import jgroup.core.MemberId;
33 import jgroup.core.View;
34 import jgroup.core.multicast.ChainIdentifier;
35 import jgroup.relacs.events.DeliveryAck;
36 import jgroup.relacs.events.InstallAck;
37 import jgroup.relacs.events.JoinRequest;
38 import jgroup.relacs.events.LeaveRequest;
39 import jgroup.relacs.events.MemberLeftEvent;
40 import jgroup.relacs.events.MulticastRequest;
41 import jgroup.relacs.events.PrepareAck;
42 import jgroup.relacs.events.PrepareEvent;
43 import jgroup.relacs.mss.HostTable;
44 import jgroup.relacs.mss.Mss;
45 import jgroup.relacs.mss.MssConstants;
46 import jgroup.relacs.mss.MssHost;
47 import jgroup.relacs.mss.MssUser;
48 import jgroup.relacs.types.LocalId;
49 import jgroup.relacs.types.MemberIdImpl;
50 import jgroup.relacs.types.ViewId;
51 import jgroup.relacs.types.ViewImpl;
52 import jgroup.util.Queue;
53 import jgroup.util.Util;
54
55 import org.apache.log4j.Logger;
56 import org.apache.log4j.MDC;
57
58
59
60
61
62
63
64
65 final class Group
66 implements Tag, MssConstants
67 {
68
69
70
71
72
73
74 public static final Logger log = Logger.getLogger(Group.class);
75
76
77
78
79
80
81
82
83
84 private final GroupTable grouptable;
85
86
87 private final HostTable hosttable;
88
89
90 private final Mss mss;
91
92
93
94
95 private int gid;
96
97
98 private int status;
99
100
101 private EndPoint[] rset;
102
103
104 private final HostData me;
105
106
107 private final EndPoint myaddress;
108
109
110 private int ninstalled;
111
112
113 private int ncoordinated;
114
115
116 private int nsuspects;
117
118
119 private final Map<EndPoint,HostData> thosts;
120
121
122 private final Map<LocalId,MemberData> tmembers;
123
124
125 private int nmembers;
126
127
128
129
130
131 private long cvid;
132
133
134 private long pvid;
135
136
137 private ViewDescription viewDesc;
138
139
140
141
142
143 private Queue delayedSent;
144
145
146 private Queue delayedRcvd;
147
148
149 private Estimate estimate;
150
151
152 private MsgProp proposal;
153
154
155 private boolean msgsStable;
156
157
158
159
160
161
162
163
164
165 private boolean installationComplete;
166
167
168 private final SendBuffer sbuffer;
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 Group(GroupTable grouptable, Mss mss, int gid, View view, HostData hd, MemberData md)
184 {
185
186 this.grouptable = grouptable;
187 this.mss = mss;
188 this.hosttable = mss.getHostTable();
189
190
191 this.gid = gid;
192
193 this.status = S_IDLE;
194 MDC.put("group", toString());
195 myaddress = hd.getEndPoint();
196 rset = new EndPoint[] { myaddress };
197 me = hd;
198 ncoordinated = 0;
199 ninstalled = 0;
200 nsuspects = 0;
201 thosts = new HashMap<EndPoint,HostData>();
202 tmembers = new HashMap<LocalId,MemberData>();
203 thosts.put(myaddress, hd);
204 tmembers.put(md.getMemberId().getLocalId(), md);
205 nmembers = 1;
206
207
208 cvid = view.getVid();
209 pvid = cvid;
210 viewDesc = new ViewDescription(hd, md);
211 sbuffer = new SendBuffer();
212 sbuffer.setView(view, 1);
213
214
215 delayedSent = new Queue();
216 delayedRcvd = new Queue();
217 proposal = new MsgProp();
218 msgsStable = true;
219 installationComplete = false;
220 }
221
222
223
224
225
226
227
228
229
230 int getGid()
231 {
232 return gid;
233 }
234
235
236
237
238
239 Collection<HostData> getHosts()
240 {
241 return thosts.values();
242 }
243
244
245
246
247 Collection<MemberData> getMembers()
248 {
249 return tmembers.values();
250 }
251
252
253
254
255
256 boolean isEmpty()
257 {
258 return tmembers.isEmpty();
259 }
260
261
262
263
264 public String toString()
265 {
266 return toString(false);
267 }
268
269
270
271
272 public String toString(boolean full)
273 {
274 StringBuilder buf = new StringBuilder("[Group: ");
275 buf.append(gid);
276 buf.append(", ");
277 switch (status) {
278 case S_IDLE :
279 buf.append("IDLE");
280 break;
281 case S_SYNCH :
282 buf.append("SYNCH");
283 break;
284 case S_ESTIMATE :
285 buf.append("ESTIMATE");
286 break;
287 default :
288 buf.append("INVALID STATUS");
289 break;
290 }
291 if (full) {
292 buf.append(", ");
293 buf.append(estimate);
294 }
295 buf.append("]");
296 return buf.toString();
297 }
298
299
300
301
302 HostData getHost(EndPoint endpoint)
303 {
304 return (HostData) thosts.get(endpoint);
305 }
306
307
308
309
310
311
312
313
314 boolean join(MssHost src, MssHost[] hosts)
315 {
316 if (log.isDebugEnabled())
317 log.debug("Group.join: START");
318
319 EndPoint endpoint = src.getEndPoint();
320 HostData sender = (HostData) thosts.get(endpoint);
321 if (sender == null) {
322 if (log.isDebugEnabled())
323 log.debug("Group.join: new host");
324
325
326 thosts.put(endpoint, new HostData(src));
327 grouptable.addEndpointToGroup(endpoint, this);
328 for (int i=0; i < hosts.length; i++) {
329 HostData scan = (HostData) thosts.get(hosts[i].getEndPoint());
330 if (scan == null) {
331 thosts.put(hosts[i].getEndPoint(), new HostData(hosts[i]));
332 grouptable.addEndpointToGroup(hosts[i].getEndPoint(), this);
333 }
334 }
335
336
337 return true;
338
339 } else if (sender.isLeaving()) {
340
341
342 sender.setLeaving(false);
343 if (status == S_IDLE && Util.in(rset, endpoint))
344 synchronizationPhaseInit();
345 }
346
347
348 return false;
349 }
350
351
352
353
354
355 void leave(EndPoint src)
356 {
357 HostData sender = thosts.get(src);
358 if (sender != null) {
359 sender.setLeaving(true);
360 switch (status) {
361
362 case S_IDLE:
363 synchronizationPhaseInit();
364 if (log.isDebugEnabled())
365 log.debug("Leaving in S_IDLE: " );
366 break;
367
368 case S_SYNCH:
369 estimate.remove(src);
370 if (log.isDebugEnabled())
371 log.debug("Leaving in S_SYNCH: EST= " + estimate);
372 checkSynchronization();
373 break;
374
375 case S_ESTIMATE:
376 estimate.remove(src);
377 if (log.isDebugEnabled())
378 log.debug("Leaving in S_ESTIMATE: EST= " + estimate);
379 sendEstimate();
380 break;
381 }
382 }
383 }
384
385
386
387
388
389
390
391
392
393 void handleLocalSuspect(MemberId id)
394 {
395 MemberData md = (MemberData) tmembers.get(id.getLocalId());
396 if (md != null) {
397
398 if (log.isDebugEnabled())
399 log.debug("Group:handleLocalSuspect: " + id);
400 if (!md.isLeaving() && !md.isCrashed()) {
401 md.setCrashed();
402 nmembers--;
403 runAgreementProtocol();
404 }
405 }
406 }
407
408
409
410
411 void handleRemoteSuspect(int nsuspects, EndPoint[] trset, EndPoint[] nrset, EndPoint[] nuset)
412 {
413 if (nsuspects > this.nsuspects) {
414 if (log.isDebugEnabled())
415 log.debug("nsuspects=" + nsuspects);
416 this.nsuspects = nsuspects;
417
418
419
420
421
422
423 try {
424 MsgSymm tosend = MsgSymm.marshal(gid, me.getVersion(),
425 HostData.extract(thosts, nrset), rset, nrset.length);
426 mss.msend(R_SYMM, tosend, nrset);
427 } catch (IOException e) {
428 log.error("Unable to marshal symmetrical reachability message", e);
429 return;
430 }
431
432
433 rset = trset;
434
435 switch (status) {
436
437 case S_IDLE:
438 synchronizationPhaseInit();
439 break;
440
441 case S_SYNCH:
442 estimate.remove(nuset);
443 if (log.isDebugEnabled())
444 log.debug(estimate);
445 checkSynchronization();
446 break;
447
448 case S_ESTIMATE:
449 estimate.remove(nuset);
450 if (log.isDebugEnabled())
451 log.debug(estimate);
452 sendEstimate();
453 break;
454 }
455 }
456 }
457
458
459
460
461
462
463 void handleLocalDlvrAck(DeliveryAck ack)
464 {
465 if (log.isDebugEnabled()) {
466 log.debug("host.pos=" + ack.getHostIndex() + ", mem.pos=" + ack.getMemberIndex());
467 }
468 LocalId ackSender = ack.getAckSender().getLocalId();
469 MemberData md = (MemberData) tmembers.get(ackSender);
470 md.removeAck(ack.getMessageId());
471
472 log.assertLog(viewDesc.size() > ack.getHostIndex(),
473 "Host index of DeliveryAck is greater than the current view size ("
474 + viewDesc.size() + "): " + ack + "; msgStable: " + msgsStable);
475
476
477
478
479
480
481
482 MsgResult result = new MsgResult(gid, pvid, ack.getMessageId(),
483 ack.getSender(), viewDesc.getDelivered(ack.getHostIndex()),
484 ack.getHostIndex(), ack.getViewIndex(), ack.getResult());
485 try {
486 EndPoint[] receivers = viewDesc.getEndPoints();
487 MsgResult tosend = result.marshal(receivers.length);
488 sbuffer.insertMsgResult(result);
489 if (log.isDebugEnabled())
490 log.debug("Sending R_RESULT: " + tosend);
491 mss.msend(R_RESULT, tosend, receivers);
492 } catch (IOException e) {
493 log.error("Unable to marshal result message", e);
494 return;
495 }
496
497 boolean stable = viewDesc.localDeliveryAck(ack);
498 if (log.isDebugEnabled()) {
499 log.debug("Handling deliveryACK (stable=" + stable + ")");
500 log.debug(viewDesc);
501 }
502 if (stable) {
503 msgsStable = true;
504 if (status == S_ESTIMATE) {
505 try {
506 int[] delivered = viewDesc.getDelivered(estimate.getHosts());
507 proposal.encodeMsgsProp(delivered);
508 sendProposal();
509 } catch (IOException e) {
510 log.error("Unable to encode messages proposal", e);
511 return;
512 }
513 }
514 }
515 }
516
517
518
519
520
521
522
523 void handleMulticastRequest(MulticastRequest request)
524 {
525 if (log.isDebugEnabled())
526 log.debug(request);
527
528
529 MsgMcast msg = null;
530 try {
531 msg = new MsgMcast(request);
532 } catch (IOException e) {
533 log.error("Cannot marshal multicast message", e);
534 return;
535 }
536
537 if (status == S_IDLE) {
538 acceptMessage(msg, false);
539 } else if (!installationComplete) {
540 if (log.isDebugEnabled())
541 log.debug("View installation for local members not complete");
542 acceptMessage(msg, true);
543 } else {
544 if (log.isDebugEnabled())
545 log.debug(viewDesc);
546 ChainIdentifier chainId = request.getChainId();
547 if (chainId != null) {
548 EndPoint orgSender = chainId.getOriginalSender().getEndPoint();
549 HostData hd = (HostData) thosts.get(orgSender);
550 assert hd != null : "Unknown chain sender";
551 assert hd.hasValidViewIndex() : "Uninitialized chain sender";
552 if (log.isDebugEnabled())
553 log.debug("Received chained message: " + chainId);
554 if (!viewDesc.isNew(hd.getViewIndex(), chainId.getMsgId())) {
555 if (log.isDebugEnabled())
556 log.debug("Accepting chained message: " + chainId);
557
558
559
560
561
562
563
564
565
566 acceptMessage(msg, true);
567 } else {
568 assert false : "Chain message cannot be new";
569 }
570 } else {
571 delayedSent.insert(msg);
572 if (log.isDebugEnabled())
573 log.debug("Message (" + msg.mid + ") put in the delayedSent queue: " + delayedSent);
574 }
575 }
576 }
577
578
579 void handleLocalPrepareAck(PrepareAck event)
580 {
581 if (log.isDebugEnabled())
582 log.debug(event);
583
584 MemberData md = (MemberData) tmembers.get(event.getMemberId().getLocalId());
585 if (!md.isPrepared() && !md.isCrashed()) {
586 md.setPrepared();
587 if (log.isDebugEnabled())
588 log.debug(md + " is prepared");
589 }
590 }
591
592
593 void handleLocalViewAck(InstallAck upcall)
594 {
595 installationComplete = viewDesc.localViewAck(upcall);
596 if (log.isDebugEnabled()) {
597 log.debug("Local view is " + (installationComplete ? "stable" : "not stable"));
598 }
599 if (status == S_ESTIMATE)
600 sendProposal();
601 }
602
603
604 void handleLocalJoin(JoinRequest event, MssUser mssuser)
605 {
606 if (log.isDebugEnabled())
607 log.debug(event);
608
609 MemberId id = event.getMemberId();
610 MemberData md = new MemberData(id, event.getDispatcher());
611 md.schedulePingTimer(mssuser);
612 if (tmembers.put(id.getLocalId(), md) == null) {
613 nmembers++;
614 runAgreementProtocol();
615 }
616 }
617
618
619
620
621
622 void handleLeaveRequest(LeaveRequest event)
623 {
624 if (log.isDebugEnabled())
625 log.debug(event);
626
627
628 MemberId id = event.getMemberId();
629 MemberData md = (MemberData) tmembers.get(id.getLocalId());
630 md.cancelPingTimer();
631
632
633
634
635
636
637 if (!md.isLeaving() && !md.isCrashed()) {
638 md.setLeaving();
639 if (log.isDebugEnabled())
640 log.debug(md + " is leaving");
641 nmembers--;
642 runAgreementProtocol();
643 }
644 }
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660 void handleMsgMcast(MsgMcast msg, EndPoint src)
661 {
662 if (log.isDebugEnabled()) {
663 log.debug("Handle mcast msg (" + msg.mid + ") received from " + src);
664 }
665
666 if (status == S_ESTIMATE) {
667 delayedRcvd.insert(msg);
668 if (log.isDebugEnabled()) {
669 log.debug("Message buffered in receive queue: " + delayedRcvd);
670 log.debug(viewDesc);
671 }
672 } else if (pvid == msg.vid) {
673 boolean newCrashed = viewDesc.notifyMessage(gid, msg);
674 if (log.isDebugEnabled())
675 log.debug("Multicast message being delivered (stable=false)");
676 msgsStable = false;
677
678
679
680
681 if (viewDesc.size() > 1) {
682 try {
683 MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
684 viewDesc.getAcks(), viewDesc.size());
685 mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
686 } catch (IOException e) {
687 log.error("Cannot marshal ACK message", e);
688 }
689 }
690
691
692
693
694
695 if (newCrashed) {
696 if (log.isDebugEnabled()) {
697 log.debug("Detected new crashed host; running agreement protocol");
698 }
699 runAgreementProtocol();
700 }
701 } else {
702 log.warn("Received a multicast message with incorrect view id: "
703 + msg.vid + " (" + pvid + ")");
704 }
705 }
706
707
708
709
710
711
712
713 void handleMsgAck(MsgAck msg, EndPoint src)
714 {
715 if (log.isDebugEnabled())
716 log.debug("handleMsgAck: " + msg+ " from src = " + src);
717
718
719 if (pvid == msg.vid) {
720 viewDesc.remoteMessageAck(msg);
721 } else {
722 log.warn("Received an ack message with incorrect view id: " + msg.vid + " (" + pvid + ")");
723 }
724 }
725
726
727
728
729
730
731
732
733
734
735
736
737
738 void handleMsgDlvrd(MsgMcast msg, EndPoint src)
739 {
740 if (log.isDebugEnabled())
741 log.debug("handleMsgDlvrd: " + msg + " from src = " + src);
742
743 if ( pvid == msg.vid && estimate.contains(src) && viewDesc.isNew(msg.hpos, msg.mid) ) {
744 boolean newCrashed = viewDesc.notifyMessage(gid, msg);
745 if (log.isDebugEnabled())
746 log.debug("Delivered multicast message (stable=false)");
747 msgsStable = false;
748
749
750
751
752
753 if (viewDesc.size() > 1) {
754
755 try {
756 MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
757 viewDesc.getAcks(), viewDesc.size());
758 mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
759 } catch (IOException e) {
760 log.error("Cannot marshal ACK message", e);
761 }
762 }
763
764
765
766
767
768
769 if (status == S_ESTIMATE && estimate.size() > 1)
770 mss.msend(R_FORWARD, msg, estimate.getEndPoints());
771
772
773
774
775
776
777 if (newCrashed) {
778 if (log.isDebugEnabled()) {
779 log.debug("Detected new crashed host; running agreement protocol");
780 }
781 runAgreementProtocol();
782 }
783 }
784 }
785
786
787
788
789
790
791 void handleMsgSynch(MsgSynch msg, EndPoint src)
792 {
793 HostData sender = thosts.get(src);
794 if (sender != null) {
795 if (log.isDebugEnabled()) {
796 log.debug(msg);
797 log.debug(" Sender: " + sender);
798 log.debug("Receiver: " + me);
799 }
800
801 switch(status) {
802
803
804 case S_IDLE:
805 if (log.isDebugEnabled())
806 log.debug("Group: Idle phase");
807 if (sender.getVersion() < msg.vsend) {
808 sender.setVersion(msg.vsend);
809 if (Util.in(rset, src))
810 synchronizationPhaseInit();
811 }
812 break;
813
814
815 case S_SYNCH:
816 if (log.isDebugEnabled()) {
817 log.debug("Group: S phase sender version: " + sender.getVersion() + " <? " + msg.vsend);
818 log.debug("Group: S phase local version: " + me.getVersion() + " ==? " + msg.vdest);
819 }
820 if (me.getVersion() == msg.vdest) {
821 estimate.setSynchronized(src);
822 if (log.isDebugEnabled())
823 log.debug(estimate);
824
825
826
827
828
829
830 sender.setAgreed(msg.vsend);
831 }
832 if (sender.getVersion() < msg.vsend) {
833
834
835
836 sender.setVersion(msg.vsend);
837 EndPoint[] symset = grouptable.getSymset(src);
838 try {
839 MsgSynch msynch = MsgSynch.marshal(gid, me.getVersion(), sender.getVersion(), symset);
840 if (log.isDebugEnabled())
841 log.debug("Sending R_SYNCH to " + src);
842 mss.send(R_SYNCH, msynch, src);
843 } catch (IOException e) {
844 log.error("Unable to marshal synchronization message", e);
845 return;
846 }
847 }
848 checkSynchronization();
849 break;
850
851
852 case S_ESTIMATE:
853 if (log.isDebugEnabled())
854 log.debug("Group: EE phase ");
855 if (sender.getVersion() < msg.vsend)
856 sender.setVersion(msg.vsend);
857 if (log.isDebugEnabled())
858 log.debug("Group: sender.agreed " + sender.getAgreed() + " msg.vsend " + msg.vsend);
859 if (sender.getAgreed() < msg.vsend && estimate.contains(src)) {
860 estimate.remove(msg.rset);
861 if (log.isDebugEnabled())
862 log.debug(estimate);
863 sendEstimate();
864 }
865
866 }
867 }
868 }
869
870
871
872
873
874
875
876 void handleMsgSymm(MsgSymm msg, EndPoint src)
877 {
878 if (log.isDebugEnabled())
879 log.debug(msg + " from src=" + src);
880
881
882
883 if (status == S_IDLE)
884 return;
885
886 int pos = Util.valueAt(msg.hosts, myaddress);
887 if (pos >= 0) {
888 int vdest = msg.version[pos];
889
890 if (status == S_SYNCH) {
891 if (log.isDebugEnabled())
892 log.debug("Group:MsgSymm: S PHASE");
893 if (estimate.contains(src) && me.getVersion() == vdest) {
894 if (log.isDebugEnabled())
895 log.debug("Group:MsgSymm: accepted");
896 estimate.remove(msg.rset);
897 if (log.isDebugEnabled())
898 log.debug(estimate);
899 checkSynchronization();
900 }
901
902 } else if (status == S_ESTIMATE) {
903 if (log.isDebugEnabled())
904 log.debug("Group:MsgSymm: EE PHASE");
905 HostData sender = estimate.get(src);
906
907
908
909 if (sender != null && me.getAgreed() == vdest && sender.getAgreed() == msg.vsend) {
910 if (log.isDebugEnabled()) {
911 log.debug("Group:MsgSymm: accepted " + estimate);
912 for (EndPoint endp : msg.rset) {
913 log.debug("removing from estimate: " + endp);
914 }
915 }
916 estimate.remove(msg.rset);
917 if (log.isDebugEnabled())
918 log.debug(estimate);
919 sendEstimate();
920 }
921 }
922 }
923 }
924
925
926
927
928
929
930 void handleMsgEstim(MsgEstim msg, EndPoint src)
931 {
932 if (log.isDebugEnabled())
933 log.debug("handleMsgEstim: " + msg + " from src = " + src);
934
935
936
937
938
939
940 int pos = Util.valueAt(msg.hosts, myaddress);
941 if (pos >= 0) {
942 int vdest = msg.agreed[pos];
943 if (log.isDebugEnabled()) {
944 log.debug("Group:vdest " + vdest);
945 log.debug("Group:me.getVersion() " + me.getVersion());
946 }
947
948 if (status == S_SYNCH) {
949
950 HostData sender = thosts.get(src);
951
952 if (sender != null) {
953 if (log.isDebugEnabled())
954 log.debug(sender + " is valid");
955 if (sender.getVersion() < msg.vsend)
956 sender.setVersion(msg.vsend);
957 if (!estimate.contains(src)) {
958 if (log.isDebugEnabled())
959 log.debug("Sending MsgSymm to " + src);
960 try {
961 MsgSymm tosend = MsgSymm.marshal(gid, me.getVersion(), sender,
962 estimate.getEndPoints(), 1);
963 mss.send(R_SYMM, tosend, src);
964 } catch (IOException e) {
965 log.error("Unable to marshal symmetrical reachability message", e);
966 return;
967 }
968 } else if (me.getVersion() == vdest) {
969 if (log.isDebugEnabled())
970 log.debug("Before intersect: " + estimate);
971 estimate.intersect(msg.hosts);
972 estimate.updateAgreed(msg.hosts, msg.agreed);
973 if (log.isDebugEnabled())
974 log.debug(" After intersect: " + estimate);
975 checkSynchronization();
976 }
977 } else {
978 if (log.isDebugEnabled())
979 log.debug(sender + " is not valid");
980 }
981
982 } else if (status == S_ESTIMATE) {
983
984 if (estimate.contains(src) && estimate.checkAgreed(msg.hosts, msg.agreed)) {
985 estimate.intersect(msg.hosts);
986 if (log.isDebugEnabled())
987 log.debug(estimate);
988 sendEstimate();
989 } else {
990
991 if (log.isDebugEnabled()) {
992 log.debug(estimate);
993 if (!estimate.contains(src))
994 log.debug("Estimate does not contain " + src);
995 }
996 }
997 }
998 } else if (status == S_ESTIMATE && estimate.contains(src)) {
999
1000
1001
1002 if (log.isDebugEnabled())
1003 log.debug("p not in P");
1004 HostData sender = thosts.get(src);
1005
1006 if (sender != null && sender.getAgreed() <= msg.vsend) {
1007 estimate.intersect(msg.hosts);
1008 if (log.isDebugEnabled())
1009 log.debug(estimate);
1010 sendEstimate();
1011 }
1012 }
1013 }
1014
1015
1016
1017
1018
1019
1020
1021 void handleMsgProp(MsgProp msg, EndPoint src)
1022 throws IOException, ClassNotFoundException
1023 {
1024 if (log.isDebugEnabled())
1025 log.debug("handleMsgProp: " + msg + " from src = " + src);
1026
1027
1028 HostData sender = thosts.get(src);
1029
1030
1031 if ( status != S_IDLE && sender != null && estimate.contains(src) ) {
1032 sender.setProposal(msg);
1033 if (log.isDebugEnabled())
1034 log.debug("handleMsgProp: sender is valid");
1035
1036 if (status == S_ESTIMATE && checkAgreement()) {
1037 if (log.isDebugEnabled())
1038 log.debug("handleMsgProp: ESTIMATE PHASE; agreement reached");
1039
1040
1041 long cvid = ViewId.create(myaddress, ++ncoordinated);
1042 EndPoint[] cvcomp = me.getProposal().decodeHostProp_hosts();
1043
1044
1045
1046
1047
1048
1049 int len = cvcomp.length;
1050
1051
1052 MsgProp[] props = new MsgProp[len];
1053
1054
1055 long[] prev_vid = new long[len];
1056
1057
1058 EndPoint[][] prev_vcomp = new EndPoint[len][];
1059
1060
1061 LocalId[][] members = new LocalId[len][];
1062
1063
1064 EndPoint[] dlist = new EndPoint[len];
1065
1066
1067 StringBuilder buf = new StringBuilder("Proposal: ");
1068
1069 int i,j,k;
1070
1071
1072 for (i=0; i < len; i++) {
1073 HostData host = (HostData) thosts.get(cvcomp[i]);
1074 if (host == null)
1075 throw new IllegalStateException("Corrupted proposal; unknown host in proposal");
1076 if (log.isDebugEnabled()) {
1077 buf.append(cvcomp[i]);
1078 buf.append(" ");
1079 }
1080 props[i] = host.getProposal();
1081 dlist[i] = host.getEndPoint();
1082
1083
1084 prev_vid[i] = props[i].cvid;
1085 prev_vcomp[i] = props[i].decodeLastProp();
1086 members[i] = props[i].decodeMembProp();
1087 }
1088 if (log.isDebugEnabled())
1089 log.debug(buf.toString());
1090
1091
1092
1093 boolean partial = false;
1094 for (i=0; i < len && !partial; i++)
1095 for (j=0; j < prev_vcomp[i].length && !partial; j++) {
1096 k = Util.valueAt(cvcomp, prev_vcomp[i][j]);
1097 partial = ( k >= 0 && prev_vid[i] != prev_vid[k] );
1098 }
1099
1100
1101 int[] pvids = new int[len];
1102 int pvid = 0;
1103 if (partial) {
1104 int npartial = 0;
1105 long[] diff_vid = new long[len];
1106 for (i=0; i < len; i++) {
1107 for (j=0; j < npartial && prev_vid[i] != diff_vid[j]; j++);
1108 if (j == npartial) {
1109 diff_vid[j] = prev_vid[i];
1110 npartial++;
1111 }
1112 pvids[i] = j;
1113 if (cvcomp[i].equals(myaddress)) {
1114 pvid = j;
1115 }
1116 }
1117 ncoordinated += npartial;
1118 }
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138 if (dlist.length > 1) {
1139 MsgView tosend = MsgView.marshal(gid, cvid, me.getProposal(),
1140 pvids, props, dlist.length);
1141 mss.msend(R_VIEW, tosend, dlist);
1142 }
1143
1144
1145 int[] incarns = me.getProposal().decodeHostProp_incarns();
1146 installView(cvid, cvcomp, incarns, members, pvids, pvid);
1147 }
1148 }
1149 }
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159 void handleMsgView(MsgView msg, EndPoint src)
1160 {
1161 if (log.isDebugEnabled()) {
1162 log.debug("handleMsgView: " + msg + " from src = " + src);
1163 log.debug(toString(true));
1164 }
1165 if (status == S_ESTIMATE && estimate.contains(src)) {
1166 int pos = Util.valueAt(msg.cvcomp, myaddress);
1167 if (pos >= 0 && msg.agreed[pos] == ninstalled+1) {
1168
1169
1170
1171
1172
1173 mss.msend(R_VIEW, msg.marshal(), msg.cvcomp);
1174
1175 installView(msg.cvid, msg.cvcomp, msg.incarns, msg.members, msg.pvids, msg.pvids[pos]);
1176 }
1177 }
1178 }
1179
1180
1181
1182
1183
1184
1185 void handleMsgResult(MsgResult msg, EndPoint src)
1186 {
1187 if (log.isDebugEnabled())
1188 log.debug("handleMsgResult: " + msg + " from src = " + src);
1189 sbuffer.insertMsgResult(msg);
1190 }
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203 boolean handleMsgJoin(MsgGroup msg, EndPoint src)
1204 {
1205 if (log.isDebugEnabled())
1206 log.debug("JOIN:" + msg);
1207
1208
1209
1210
1211 try {
1212 MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1213 mss.send(R_JOINACK, tosend, src);
1214 } catch (IOException e) {
1215 log.error("Unable to marshal group message", e);
1216 }
1217
1218 return handleMsgJoinAck(msg, src);
1219 }
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232 boolean handleMsgJoinAck(MsgGroup msg, EndPoint src)
1233 {
1234 if (log.isDebugEnabled())
1235 log.debug("JOINACK: " + msg);
1236
1237
1238 MssHost[] hosts = new MssHost[msg.hosts.length];
1239 for (int i=0; i < hosts.length; i++)
1240 hosts[i] = hosttable.lookup(msg.hosts[i]);
1241 MssHost srcHost = hosttable.lookup(src);
1242
1243
1244 return join(srcHost, hosts);
1245 }
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257 private void removeLeavingMembers()
1258 {
1259
1260
1261
1262
1263
1264 MemberLeftEvent event = null;
1265
1266
1267 for (Iterator iter = getMembers().iterator(); iter.hasNext(); ) {
1268 MemberData member = (MemberData) iter.next();
1269 if (log.isDebugEnabled()) {
1270 log.debug("Checking member: " + member);
1271 }
1272
1273 if (member.isLeaving()) {
1274
1275 if (event == null)
1276 event = new MemberLeftEvent(gid);
1277 boolean success = DaemonInteraction.addEvent(member, event);
1278 if (!success)
1279 member.setCrashed();
1280
1281 iter.remove();
1282 } else if (member.isCrashed()) {
1283 iter.remove();
1284 } else {
1285
1286 member.resetPrepared();
1287 }
1288 }
1289 }
1290
1291
1292
1293
1294
1295
1296 private void removeLeavingHosts()
1297 {
1298 for (Iterator<HostData> iter = getHosts().iterator(); iter.hasNext(); ) {
1299 HostData host = iter.next();
1300 if (log.isDebugEnabled())
1301 log.debug("Checking host: " + host);
1302 if (host.isLeaving() && !host.getEndPoint().isLocal()) {
1303 if (log.isDebugEnabled())
1304 log.debug("Sending R_LEAVE to: " + host);
1305 try {
1306 MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1307 mss.send(R_LEAVE, tosend, host.getEndPoint());
1308 } catch (IOException e) {
1309 log.error("Unable to marshal group message", e);
1310 }
1311
1312 iter.remove();
1313 }
1314 }
1315 }
1316
1317
1318
1319
1320
1321 void sendLeave()
1322 {
1323 for (EndPoint endpoint : thosts.keySet()) {
1324 if (!endpoint.isLocal()) {
1325 if (log.isDebugEnabled())
1326 log.debug("Sending R_LEAVE to: " + endpoint);
1327 try {
1328 MsgGroup tosend = MsgGroup.marshal(gid, getHosts(), 1);
1329 mss.send(R_LEAVE, tosend, endpoint);
1330 } catch (IOException e) {
1331 log.error("Unable to marshal group message", e);
1332 }
1333 }
1334 }
1335
1336
1337
1338
1339 if (log.isDebugEnabled())
1340 log.debug("Leaving: " + myaddress + "; it is the localhost");
1341 leave(myaddress);
1342 }
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352 private void runAgreementProtocol()
1353 {
1354 if (log.isDebugEnabled())
1355 log.debug("Running agreement protocol");
1356 if (status == S_IDLE) {
1357 synchronizationPhaseInit();
1358 } else if (status == S_ESTIMATE) {
1359 try {
1360 proposal.encodeMembProp(tmembers, nmembers);
1361 } catch (IOException e) {
1362 log.error("Unable to encode membership proposal", e);
1363 return;
1364 }
1365 sendProposal();
1366 }
1367 }
1368
1369
1370
1371
1372
1373
1374 private void synchronizationPhaseInit()
1375 {
1376 if (log.isDebugEnabled())
1377 log.debug("Preparing ESTIMATE for SYNCHRONIZATION phase");
1378
1379 if (estimate == null) {
1380 estimate = new Estimate(sbuffer, thosts, rset);
1381 } else {
1382 estimate.reset(thosts, rset);
1383 }
1384
1385
1386 status = S_SYNCH;
1387 if (log.isDebugEnabled()) {
1388 MDC.put("group", toString());
1389 log.debug("Entering SYNCHRONIZATION phase");
1390 }
1391
1392
1393
1394
1395
1396 PrepareEvent prepareEvent = null;
1397 for (Iterator iter = getMembers().iterator(); iter.hasNext();) {
1398 MemberData md = (MemberData) iter.next();
1399 if (!md.isPrepared() && !md.isLeaving() && !md.isCrashed()) {
1400
1401 if (prepareEvent == null)
1402 prepareEvent = new PrepareEvent(gid);
1403 if (log.isDebugEnabled())
1404 log.debug("Sending PrepareEvent to " + md);
1405 boolean success = DaemonInteraction.addEvent(md, prepareEvent);
1406 if (!success)
1407 md.setCrashed();
1408 }
1409 }
1410
1411
1412 me.newVersion();
1413
1414
1415 Iterator iterator = estimate.getHosts().iterator();
1416 while (iterator.hasNext()) {
1417 HostData host = (HostData) iterator.next();
1418 if (!host.equals(me)) {
1419 EndPoint[] symset = grouptable.getSymset(host.getEndPoint());
1420 try {
1421 MsgSynch tosend = MsgSynch.marshal(gid, me.getVersion(), host.getVersion(), symset);
1422 if (log.isDebugEnabled())
1423 log.debug("Sending R_SYNCH to " + host.getEndPoint());
1424 mss.send(R_SYNCH, tosend, host.getEndPoint());
1425 } catch (IOException e) {
1426 log.error("Unable to marshal synchronization message", e);
1427 return;
1428 }
1429 }
1430 }
1431
1432
1433 checkSynchronization();
1434 }
1435
1436
1437
1438
1439
1440
1441
1442 private void checkSynchronization()
1443 {
1444 if (estimate.isSynchronized())
1445 estimateExchangePhaseInit();
1446 }
1447
1448
1449
1450
1451
1452 private void estimateExchangePhaseInit()
1453 {
1454
1455 status = S_ESTIMATE;
1456 if (log.isDebugEnabled()) {
1457 MDC.put("group", toString());
1458 log.debug("Group:estimateExchangePhaseInit: start");
1459 log.debug(estimate);
1460 }
1461
1462 try {
1463
1464
1465
1466
1467
1468
1469 if (estimate.size() > 1) {
1470 MsgEstim tosend = MsgEstim.marshal(gid, me.getVersion(), estimate.getHosts());
1471 mss.msend(R_ESTIM, tosend, estimate.getEndPoints());
1472 }
1473
1474
1475 proposal.encodeLastProp(viewDesc.getEndPoints());
1476 proposal.encodeHostProp(estimate.getHosts());
1477 proposal.encodeMembProp(tmembers, nmembers);
1478 if (msgsStable) {
1479 int[] delivered = viewDesc.getDelivered(estimate.getHosts());
1480 proposal.encodeMsgsProp(delivered);
1481 sendProposal();
1482 } else {
1483 if (log.isDebugEnabled()) {
1484 log.debug("Messages are not stable: ");
1485 log.debug(viewDesc);
1486 }
1487 }
1488 } catch (IOException e) {
1489 log.error("Unable to encode proposal for the EE-phase", e);
1490 return;
1491 }
1492
1493
1494 viewDesc.forwardMessages(mss);
1495
1496 if (log.isDebugEnabled()) {
1497 log.debug("Group:estimateExchangePhaseInit: stop");
1498 }
1499 }
1500
1501
1502
1503
1504
1505 private void sendProposal()
1506 {
1507
1508
1509
1510
1511
1512 for (MemberData md : tmembers.values()) {
1513 log.assertLog(!(md.hasOutstandingAcks() && msgsStable), md + " has outstanding ACKs while msgs are stable");
1514 }
1515 if (log.isDebugEnabled())
1516 log.debug("Messages are " + (msgsStable ? "stable": "not stable"));
1517 if (msgsStable && installationComplete) {
1518
1519 try {
1520 MsgProp msg = proposal.marshal(gid, cvid, pvid);
1521
1522
1523
1524
1525
1526 EndPoint coordinator = estimate.getCoordinator();
1527 if (coordinator.isLocal()) {
1528
1529
1530
1531
1532 if (log.isDebugEnabled())
1533 log.debug("Group:sendProposal: local coordinator");
1534 handleMsgProp(new MsgProp(msg.getOutMessage()), coordinator);
1535
1536 } else {
1537
1538
1539
1540
1541 if (log.isDebugEnabled())
1542 log.debug("Group:sendProposal: remote coordinator: " + coordinator);
1543 mss.send(R_PROP, msg, coordinator);
1544
1545 }
1546 } catch (IOException e) {
1547 log.error("Unable to marshal and send proposal message", e);
1548 } catch (ClassNotFoundException e) {
1549 log.error("Unable to unmarshal (local) proposal message", e);
1550 }
1551 }
1552 if (log.isDebugEnabled())
1553 log.debug("Group:sendProposal: end");
1554 }
1555
1556
1557
1558
1559
1560
1561
1562
1563 private void sendEstimate()
1564 {
1565 if (estimate.hasChanged() && !estimate.isEmpty()) {
1566 try {
1567
1568
1569
1570
1571
1572 if (estimate.size() > 1) {
1573 MsgEstim tosend = MsgEstim.marshal(gid, me.getVersion(), estimate.getHosts());
1574 mss.msend(R_ESTIM, tosend, estimate.getEndPoints());
1575 }
1576 proposal.encodeHostProp(estimate.getHosts());
1577 if (msgsStable) {
1578 int[] delivered = viewDesc.getDelivered(estimate.getHosts());
1579 proposal.encodeMsgsProp(delivered);
1580 sendProposal();
1581 } else {
1582 if (log.isDebugEnabled()) {
1583 log.debug("Messages are not stable: ");
1584 log.debug(viewDesc);
1585 }
1586 }
1587 } catch (IOException e) {
1588 log.error("Unable to marshal estimate proposal message", e);
1589 }
1590 }
1591 }
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605 private boolean checkAgreement()
1606 {
1607 MsgProp proposalp = me.getProposal();
1608 if (proposalp == null) {
1609 if (log.isDebugEnabled())
1610 log.debug("checkAgreement: no local proposal found");
1611 return false;
1612 }
1613 if (log.isDebugEnabled())
1614 log.debug("checkAgreement: found my proposal " + me);
1615 for (HostData hostq : estimate.getHosts()) {
1616 MsgProp proposalq = hostq.getProposal();
1617 if (!proposalp.checkEstimate(proposalq)) {
1618 if (log.isDebugEnabled()) {
1619 log.debug("PROPOSAL DIFFERS FROM LOCAL ESTIMATE");
1620 log.debug("proposalp: " + proposalp);
1621 log.debug("proposalq: " + proposalq);
1622 }
1623 return false;
1624 }
1625 if (log.isDebugEnabled())
1626 log.debug("checkAgreement: ok estimate from " + hostq);
1627 for (HostData hostr : estimate.getHosts()) {
1628
1629 if (hostq == hostr)
1630 continue;
1631 if (log.isDebugEnabled()) {
1632 log.debug("Checking proposals from: ");
1633 log.debug(hostq);
1634 log.debug(hostr);
1635 }
1636 MsgProp proposalr = hostr.getProposal();
1637 if (!proposalq.checkMessages(proposalr)) {
1638 if (log.isDebugEnabled()) {
1639 log.debug("PROPOSALS DIFFERS (MESSAGES)");
1640 log.debug("proposalq: " + proposalq);
1641 log.debug("proposalr: " + proposalr);
1642 }
1643 return false;
1644 }
1645 }
1646 }
1647 if (log.isDebugEnabled())
1648 log.debug("checkAgreement: all ok");
1649 return true;
1650 }
1651
1652
1653
1654
1655
1656 private void acceptMessage(MsgMcast msg, boolean late)
1657 {
1658 if (log.isDebugEnabled())
1659 log.debug("acceptMessage; " + (late ? "late" : ""));
1660 try {
1661 msg.complete(viewDesc.getLocalIndex(), pvid, sbuffer.getMid());
1662 } catch (IOException e) {
1663 log.error("Cannot complete marshalling multicast message", e);
1664 return;
1665 }
1666
1667
1668 sbuffer.insertMsgMcast(msg);
1669
1670
1671 boolean newCrashed = viewDesc.notifyMessage(gid, msg);
1672 if (log.isDebugEnabled())
1673 log.debug("Accept multicast message (stable=false)");
1674
1675
1676 msgsStable = false;
1677
1678
1679
1680
1681
1682 if (viewDesc.size() > 1) {
1683 if (late) {
1684 mss.msend(R_FORWARD, msg, viewDesc.getEndPoints());
1685 } else {
1686 mss.msend(R_MCAST, msg, viewDesc.getEndPoints());
1687 }
1688 }
1689
1690
1691
1692
1693
1694
1695 if (newCrashed) {
1696 if (log.isDebugEnabled()) {
1697 log.debug("Detected new crashed host; running agreement protocol");
1698 }
1699 runAgreementProtocol();
1700 }
1701 }
1702
1703
1704
1705
1706
1707 private void installView(long cvid, EndPoint[] hosts, int[] incarns,
1708 LocalId[][] members, int[] pvids, int pid)
1709 {
1710
1711 this.cvid = cvid;
1712
1713
1714
1715
1716
1717
1718
1719 for (HostData hd : thosts.values()) {
1720 hd.resetViewIndex();
1721 }
1722
1723
1724
1725
1726
1727
1728 List<MemberId> memberSet = new ArrayList<MemberId>(hosts.length);
1729 Set<Integer> uniquePids = new HashSet<Integer>(pvids.length);
1730 LocalId[] localMembers = null;
1731 int hostCount = 0;
1732 int hpos = 0;
1733 for (int i = 0; i < hosts.length; i++) {
1734 uniquePids.add(pvids[i]);
1735 if (pvids[i] == pid) {
1736 if (hosts[i].equals(myaddress)) {
1737 hpos = hostCount;
1738 localMembers = members[i];
1739 }
1740 HostData hd = (HostData) thosts.get(hosts[i]);
1741 if (hd == null)
1742 throw new IllegalStateException("Estimate contains an unknown host");
1743 for (int j=0; j < members[i].length; j++) {
1744 memberSet.add(new MemberIdImpl(hd.getEndPoint(), incarns[i], members[i][j]));
1745 }
1746
1747
1748
1749
1750
1751 hosts[hostCount] = hosts[i];
1752 hd.setViewIndex(hostCount++);
1753 }
1754 }
1755
1756
1757 viewDesc = new ViewDescription(thosts, hosts, rset, tmembers, localMembers, hpos);
1758
1759
1760 msgsStable = true;
1761 installationComplete = false;
1762 status = S_IDLE;
1763 if (log.isDebugEnabled()) {
1764 MDC.put("group", toString());
1765 log.debug("New view installed (stable=true)");
1766 }
1767
1768
1769 pvid = ViewId.createPartial(cvid, pid);
1770 MemberId[] memberIds = (MemberId[]) memberSet.toArray(new MemberId[memberSet.size()]);
1771
1772
1773 View view = new ViewImpl(gid, pvid, uniquePids.size(), memberIds);
1774 ninstalled++;
1775 viewDesc.notifyView(gid, view);
1776
1777
1778 removeLeavingMembers();
1779 removeLeavingHosts();
1780
1781 if (!tmembers.isEmpty()) {
1782
1783
1784 for (Iterator iter = thosts.values().iterator(); iter.hasNext(); ) {
1785 HostData hd = (HostData) iter.next();
1786 hd.setProposal(null);
1787
1788 }
1789
1790
1791 sbuffer.setView(view, hostCount);
1792 MsgMcast scan;
1793 while ((scan = (MsgMcast) delayedSent.removeFirst()) != null)
1794 acceptMessage(scan, false);
1795 while ((scan = (MsgMcast) delayedRcvd.removeFirst()) != null)
1796 if (scan.vid == pvid)
1797 viewDesc.notifyMessage(gid, scan);
1798
1799
1800
1801
1802
1803 if (!sbuffer.isEmpty() && viewDesc.size() > 1) {
1804 try {
1805 MsgAck tosend = MsgAck.marshal(gid, pvid, viewDesc.getLocalIndex(),
1806 viewDesc.getAcks(), viewDesc.size());
1807 mss.msend(R_ACK, tosend, viewDesc.getEndPoints());
1808 } catch (IOException e) {
1809 log.error("Cannot marshal ACK message", e);
1810 return;
1811 }
1812 }
1813
1814
1815 if (!viewDesc.isStable() || viewDesc.containsCrashedMembers())
1816 synchronizationPhaseInit();
1817 }
1818 }
1819
1820 }