1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.gm;
20
21 import java.io.Externalizable;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.ObjectInput;
25 import java.io.ObjectInputStream;
26 import java.io.ObjectOutput;
27 import java.io.ObjectOutputStream;
28 import java.io.OutputStream;
29 import java.rmi.RemoteException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.SortedMap;
37 import java.util.TreeMap;
38
39 import jgroup.core.JgroupException;
40 import jgroup.core.MemberId;
41 import jgroup.core.MembershipListener;
42 import jgroup.core.MembershipService;
43 import jgroup.core.View;
44 import jgroup.core.multicast.AckListener;
45 import jgroup.core.multicast.ChainIdentifier;
46 import jgroup.core.multicast.MulticastListener;
47 import jgroup.core.multicast.MulticastService;
48 import jgroup.relacs.types.EndPointImpl;
49 import jgroup.relacs.types.MemberIdImpl;
50 import jgroup.util.InMessage;
51 import jgroup.util.OutMessage;
52
53 import org.apache.log4j.Logger;
54
55
56
57
58
59
60
61
62
63
64
65 public final class TotalOrderLayer
66 implements MulticastService, MulticastListener, MembershipListener
67 {
68
69
70
71
72
73
74 private static final Logger log = Logger.getLogger(TotalOrderLayer.class);
75
76
77
78
79
80
81
82 private static final String PROTOCOL_NAME = "Total";
83
84
85 private static final int REQUEST_SEQ_NO = 1;
86 private static final int AGREED_SEQ_NO = 2;
87
88
89 private static final boolean OBJECT_TYPE = true;
90
91
92 private static final boolean STREAM_TYPE = false;
93
94
95
96
97
98
99
100 private Map<String,MulticastListener> multicastListeners = new HashMap<String,MulticastListener>();
101
102
103 private MulticastService mcast;
104
105
106 private MemberId me;
107
108
109 private SequenceNo largestAgreedSeqNo;
110
111
112 private SequenceNo largestProposedSeqNo;
113
114
115 private Map<MessageID,SequenceNo> proposalsMap =
116 Collections.synchronizedMap(new HashMap<MessageID,SequenceNo>());
117
118
119 private SortedMap<SequenceNo,InputStream> holdBackQueue =
120 Collections.synchronizedSortedMap(new TreeMap<SequenceNo,InputStream>());
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 private TotalOrderLayer(MembershipService pgms, MulticastService mcast)
141 throws JgroupException
142 {
143 me = pgms.getMyIdentifier();
144 largestAgreedSeqNo = new SequenceNo(me);
145 largestProposedSeqNo = new SequenceNo(me);
146 this.mcast = mcast;
147 }
148
149
150
151
152
153
154 public static TotalOrderLayer getLayer(MembershipService pgms, MulticastService mcast)
155 throws JgroupException
156 {
157 return new TotalOrderLayer(pgms, mcast);
158 }
159
160
161
162
163
164
165
166
167
168
169 public void addListener(Object listener)
170 {
171 if (listener instanceof MulticastListener) {
172 MulticastListener mListener = (MulticastListener) listener;
173 String protocol = mListener.getProtocolName();
174 Object currentListener = multicastListeners.get(protocol);
175 if (currentListener == null)
176 multicastListeners.put(protocol, mListener);
177 else if (!currentListener.equals(listener))
178 throw new IllegalArgumentException("Multiple multicast listeners for the same protocol is illegal: " + protocol);
179 } else {
180 throw new IllegalArgumentException("Specified listener does not implement a MulticastListener");
181 }
182 }
183
184
185
186
187
188
189 public void viewChange(View view)
190 {
191
192 }
193
194 public void prepareChange()
195 {
196 log.debug("prepareChange");
197 }
198
199 public void hasLeft() { }
200
201
202
203
204
205
206
207
208
209 public void mcast(OutputStream out, AckListener ackl, ChainIdentifier chId)
210 throws JgroupException
211 {
212 throw new UnsupportedOperationException();
213 }
214
215
216
217
218
219 public void mcast(OutputStream stream, AckListener ackl)
220 throws JgroupException
221 {
222 ProposalCollector pc = new ProposalCollector();
223 mcast.mcast(stream, pc);
224
225 SequenceNo agreed = pc.getGreatestProposal();
226
227 try {
228
229 OutMessage out = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
230
231 out.writeInt(AGREED_SEQ_NO);
232
233 agreed.writeExternal(out);
234 out.flush();
235 ResultCollector rc = new ResultCollector(agreed, ackl);
236 acklMap.put(agreed, rc);
237
238 mcast.mcast(out, rc);
239 } catch (IOException e) {
240 log.warn("Could not send out agreed sequence numbers.", e);
241 }
242 }
243
244
245
246
247 public void mcast(String protocol, Object obj, AckListener ackl)
248 throws JgroupException, IOException
249 {
250 OutMessage bout = getMessage(protocol, OBJECT_TYPE);
251 ObjectOutputStream out = new ObjectOutputStream(bout);
252 out.writeObject(obj);
253 out.close();
254 mcast(bout, ackl);
255 }
256
257
258
259
260 public OutputStream getMessage(String protocol)
261 throws IOException
262 {
263 return getMessage(protocol, STREAM_TYPE);
264 }
265
266
267
268
269
270
271 private OutMessage getMessage(String protocol, boolean msgType)
272 throws IOException
273 {
274 OutMessage msg = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
275 msg.writeInt(REQUEST_SEQ_NO);
276
277 MessageID msgId = new MessageID(me);
278 msgId.writeExternal(msg);
279 msg.writeUTF(protocol);
280 msg.writeBoolean(msgType);
281 return msg;
282 }
283
284
285
286
287
288
289
290
291
292 public String getProtocolName()
293 {
294 return PROTOCOL_NAME;
295 }
296
297
298
299
300
301 public synchronized Object deliverStream(InputStream msg, MemberId sender, int seq)
302 {
303 InMessage inmsg = (InMessage) msg;
304 try {
305
306 int messageType = inmsg.readInt();
307
308
309 switch (messageType) {
310
311 case REQUEST_SEQ_NO:
312 {
313
314 MessageID msgId = new MessageID();
315 msgId.readExternal(inmsg);
316
317 largestProposedSeqNo = SequenceNo.max(largestAgreedSeqNo, largestProposedSeqNo);
318 largestProposedSeqNo.sequenceNo++;
319 largestProposedSeqNo.setMsgId(msgId);
320 proposalsMap.put(msgId, largestProposedSeqNo);
321
322
323
324
325
326
327 holdBackQueue.put(largestProposedSeqNo, inmsg);
328
329
330
331 return largestProposedSeqNo;
332 }
333
334 case AGREED_SEQ_NO:
335 {
336 SequenceNo agreed = new SequenceNo();
337 agreed.readExternal(inmsg);
338
339
340 largestAgreedSeqNo = SequenceNo.max(largestAgreedSeqNo, agreed);
341
342
343 SequenceNo myProposal = (SequenceNo) proposalsMap.remove(agreed.msgId);
344 if (!agreed.equals(myProposal)) {
345
346 log.debug("reordering hold-back queue: myProposal=" + myProposal
347 + " is different from agreed=" + agreed);
348 InputStream stream = holdBackQueue.remove(myProposal);
349 if (stream == null) {
350 log.warn("MESSAGE NOT FOUND IN HOLD-BACK QUEUE: " + myProposal);
351 break;
352 }
353 holdBackQueue.put(agreed, stream);
354 }
355 log.debug("holdBackQueue=" + holdBackQueue.keySet());
356 log.debug(" proposalsMap=" + proposalsMap);
357
358 List<SequenceNo> resultValues = new ArrayList<SequenceNo>(holdBackQueue.size());
359 for (Iterator iter = holdBackQueue.entrySet().iterator(); iter.hasNext();) {
360 Map.Entry entry = (Map.Entry) iter.next();
361 SequenceNo seqno = (SequenceNo) entry.getKey();
362
363 if (seqno.equals(agreed))
364 seqno.msgId.markAsDeliverable();
365 resultValues.add(seqno);
366 if (seqno.isDeliverable()) {
367
368
369 InMessage stream = (InMessage) entry.getValue();
370 iter.remove();
371 String protocol = stream.readUTF();
372 boolean msgType = stream.readBoolean();
373 MulticastListener listener =
374 (MulticastListener) multicastListeners.get(protocol);
375 MemberId theSender = seqno.msgId.getSender();
376 if (msgType == STREAM_TYPE) {
377 seqno.result = listener.deliverStream(stream, theSender, seqno.sequenceNo);
378 } else {
379 seqno.result = listener.deliverObject(stream, theSender, seqno.sequenceNo);
380 }
381 if (log.isDebugEnabled())
382 log.debug("Result of message: " + seqno.msgId + ", assigned seqno: " + seqno);
383 } else {
384
385 if (log.isDebugEnabled())
386 log.debug("Stopping message delivery. " + seqno.msgId + " is undeliverable: " + seqno);
387 break;
388 }
389 }
390 log.debug("resultValues.size=" + resultValues.size());
391 return resultValues;
392 }
393
394 }
395
396 } catch (Exception e) {
397 log.warn("Could not read data from stream.", e);
398 }
399 return null;
400 }
401
402
403
404
405
406 public Object deliverObject(Object msg, MemberId id, int seq)
407 {
408
409
410
411
412 throw new UnsupportedOperationException("Should never happen");
413 }
414
415
416
417
418
419
420
421
422
423
424 private class ProposalCollector
425 implements AckListener
426 {
427
428
429 private static final int STARTING = 1;
430 private static final int WAITING = 2;
431 private static final int COMPLETED = 3;
432
433
434 private volatile int state = STARTING;
435
436
437 private int missing;
438
439
440 private SequenceNo[] proposals;
441
442
443 private boolean[] completed;
444
445
446 private synchronized void updateState()
447 {
448 if (missing==0) {
449 state = COMPLETED;
450 notifyAll();
451 }
452 }
453
454
455
456
457 public void ack(MemberId id, int pos, Object obj)
458 throws RemoteException
459 {
460
461
462
463 proposals[pos] = (SequenceNo) obj;
464 completed[pos] = true;
465 missing--;
466 updateState();
467 }
468
469
470
471
472 public void notifyView(View view)
473 throws RemoteException
474 {
475 missing = view.size();
476 proposals = new SequenceNo[missing];
477 completed = new boolean[missing];
478 state = WAITING;
479 }
480
481
482
483
484 public void viewChange()
485 throws RemoteException
486 {
487 log.warn("ProposalCollector: ViewChange during invocation handling");
488
489 missing = 0;
490 updateState();
491 }
492
493
494
495
496
497
498 public synchronized SequenceNo getGreatestProposal()
499 {
500 while (state != COMPLETED) {
501 try { wait(); } catch (InterruptedException iex) {}
502 }
503
504
505 SequenceNo greatestProposal = proposals[0];
506 for (int i = 0; i < proposals.length; ++i) {
507 if (completed[i]) {
508 greatestProposal = SequenceNo.max(proposals[i], greatestProposal);
509 } else {
510 log.warn("Not all members have provided a PROPOSAL");
511 break;
512 }
513 }
514 return greatestProposal;
515 }
516
517 }
518
519
520
521
522
523
524 private static final Map<SequenceNo,ResultCollector> acklMap =
525 new HashMap<SequenceNo,ResultCollector>();
526
527
528
529
530
531 private class ResultCollector
532 implements AckListener
533 {
534
535
536 private int missing;
537
538
539 private List[] results;
540
541
542 private SequenceNo agreed;
543
544
545 private AckListener ackl;
546
547
548 public ResultCollector(SequenceNo agreed, AckListener ackl)
549 {
550 this.agreed = agreed;
551 this.ackl = ackl;
552 }
553
554
555
556
557 public void ack(MemberId id, int pos, Object obj)
558 throws RemoteException
559 {
560 if (log.isDebugEnabled())
561 log.debug("Recieved result from " + id + ", pos=" + pos + ": " + obj);
562
563
564 List res = (List) obj;
565 for (Iterator iter = res.iterator(); iter.hasNext();) {
566 SequenceNo seqno = (SequenceNo) iter.next();
567 if (seqno.hasResult()) {
568 ResultCollector rc = (ResultCollector) acklMap.get(seqno);
569 log.debug("seqno=" + seqno + ", agreed=" + agreed);
570 rc.missing--;
571 if (rc.missing == 0)
572 acklMap.remove(seqno);
573 log.debug("delivering: " + seqno);
574 rc.ackl.ack(id, pos, seqno.result);
575 } else {
576 log.debug("no result for seqno: " + seqno);
577 }
578 }
579 }
580
581
582
583
584 public void notifyView(View view)
585 throws RemoteException
586 {
587
588
589
590
591 ackl.notifyView(view);
592 missing = view.size();
593 results = new List[missing];
594 }
595
596
597
598
599 public void viewChange()
600 throws RemoteException
601 {
602 if (log.isDebugEnabled())
603 log.debug("ResultCollector: ViewChange during invocation handling: ackl="
604 + ackl + ", agreed=" + agreed);
605 for (Iterator iter = acklMap.values().iterator(); iter.hasNext();) {
606 ResultCollector rc = (ResultCollector) iter.next();
607 log.debug("rc.agreed=" + rc.agreed);
608 rc.ackl.viewChange();
609 rc.missing = 0;
610 }
611
612
613 }
614
615 }
616
617
618
619
620
621
622
623
624
625 private static final class SequenceNo
626 implements Externalizable, Comparable
627 {
628
629 private static final long serialVersionUID = -6394934923836273234L;
630
631
632 private int sequenceNo = 100;
633
634
635 private MemberId member;
636
637
638
639
640
641
642 private MessageID msgId = null;
643
644
645
646
647 private Object result = null;
648
649
650
651 public SequenceNo() {}
652
653 public SequenceNo(MemberId me)
654 {
655 member = me;
656 }
657
658 public SequenceNo(SequenceNo oldSeq)
659 {
660 this.sequenceNo = oldSeq.sequenceNo;
661 this.member = oldSeq.member;
662 this.msgId = oldSeq.msgId;
663 }
664
665 public static SequenceNo max(SequenceNo s1, SequenceNo s2)
666 {
667 if (s1.compareTo(s2) > 0) {
668 return new SequenceNo(s1);
669 } else {
670 return new SequenceNo(s2);
671 }
672 }
673
674
675
676
677 public void setMsgId(MessageID msgId)
678 {
679 this.msgId = msgId;
680 }
681
682 public boolean hasResult()
683 {
684 return result != null;
685 }
686
687 public boolean isDeliverable()
688 {
689 return msgId.deliverable;
690 }
691
692 public int hashCode()
693 {
694 return sequenceNo ^ member.hashCode();
695 }
696
697 public boolean equals(Object obj)
698 {
699 if (obj instanceof SequenceNo) {
700 SequenceNo sno = (SequenceNo) obj;
701 return (sequenceNo == sno.sequenceNo && member.equals(sno.member));
702 } else {
703 return false;
704 }
705 }
706
707 public String toString()
708 {
709 StringBuilder buf = new StringBuilder();
710 buf.append("[");
711 String a = member.getEndPoint().getAddress().getHostAddress().toString();
712 String[] b = a.split("\\.");
713 if (b.length > 0)
714 buf.append(b[b.length-1]);
715 buf.append(".");
716 buf.append(sequenceNo);
717 if (result == null) {
718 buf.append("]");
719 } else {
720 buf.append("]: ");
721 buf.append(result);
722 }
723 return buf.toString();
724 }
725
726
727
728
729 public void readExternal(ObjectInput in)
730 throws IOException, ClassNotFoundException
731 {
732 sequenceNo = in.readInt();
733 member = new MemberIdImpl();
734 member.readExternal(in);
735 msgId = new MessageID();
736 msgId.readExternal(in);
737 ObjectInputStream ois = new ObjectInputStream((InputStream) in);
738 result = ois.readObject();
739 }
740
741
742
743
744 public void writeExternal(ObjectOutput out)
745 throws IOException
746 {
747 out.writeInt(sequenceNo);
748 member.writeExternal(out);
749 msgId.writeExternal(out);
750
751 ObjectOutputStream oos = new ObjectOutputStream((OutputStream) out);
752 oos.writeObject(result);
753 }
754
755
756
757
758
759
760
761
762
763
764
765 public int compareTo(Object o)
766 {
767 if (this == o)
768 return 0;
769 SequenceNo seqno = (SequenceNo) o;
770 if (this.sequenceNo == seqno.sequenceNo)
771 return this.member.compareTo(seqno.member);
772 else if (this.sequenceNo > seqno.sequenceNo)
773 return 1;
774 else
775 return -1;
776 }
777
778 }
779
780
781
782
783
784
785
786
787
788
789
790 private static final class MessageID
791 implements Externalizable
792 {
793
794 private static final long serialVersionUID = -6073546790683304835L;
795
796 private static long nextSequenceNo = 0;
797 private long sequenceNo;
798 private MemberId memberId;
799
800
801 private transient boolean deliverable = false;
802
803
804
805 public MessageID() {}
806
807 public MessageID(MemberId id)
808 {
809 this.sequenceNo = nextSequenceNo++;
810 this.memberId = id;
811 }
812
813 public void markAsDeliverable()
814 {
815 deliverable = true;
816 }
817
818 public boolean isDeliverable()
819 {
820 return deliverable;
821 }
822
823 public MemberId getSender()
824 {
825 return memberId;
826 }
827
828 public boolean equals(Object obj)
829 {
830 if (obj instanceof MessageID) {
831 MessageID msgId = (MessageID) obj;
832 return (sequenceNo == msgId.sequenceNo && memberId.equals(msgId.memberId));
833 } else {
834 return false;
835 }
836 }
837
838 public int hashCode()
839 {
840 return (int)(sequenceNo ^ (sequenceNo >>> 32)) ^ memberId.hashCode();
841 }
842
843 public String toString()
844 {
845 StringBuilder buf = new StringBuilder();
846 buf.append("[");
847 String a = memberId.getEndPoint().getAddress().getHostAddress().toString();
848 String[] b = a.split("\\.");
849 if (b.length > 0)
850 buf.append(b[b.length-1]);
851 buf.append("*");
852 buf.append(sequenceNo);
853 buf.append("]");
854 return buf.toString();
855 }
856
857
858
859
860 public void readExternal(ObjectInput in)
861 throws IOException, ClassNotFoundException
862 {
863 sequenceNo = in.readLong();
864 memberId = new MemberIdImpl();
865 memberId.readExternal(in);
866 }
867
868
869
870
871 public void writeExternal(ObjectOutput out)
872 throws IOException
873 {
874 out.writeLong(sequenceNo);
875 memberId.writeExternal(out);
876 }
877
878 }
879
880
881 public static void main(String[] args)
882 {
883 MemberId m1 = new MemberIdImpl(new EndPointImpl(239239, 1239), 12, new EndPointImpl(122323, 2323), 0);
884 MemberId m2 = new MemberIdImpl(new EndPointImpl(339239, 1239), 12, new EndPointImpl(132323, 2323), 0);
885 MemberId m3 = new MemberIdImpl(new EndPointImpl(439239, 1239), 12, new EndPointImpl(142323, 2323), 0);
886 SequenceNo s1 = new SequenceNo(m1);
887 SequenceNo s2 = new SequenceNo(m2);
888 SequenceNo s3 = new SequenceNo(m3);
889 s1.sequenceNo++; s2.sequenceNo++;
890 System.out.println("s1.compareTo(s1)=" + s1.compareTo(s1));
891 System.out.println("s2.compareTo(s1)=" + s2.compareTo(s1));
892 System.out.println("s3.compareTo(s1)=" + s3.compareTo(s1));
893 System.out.println("s1.compareTo(s2)=" + s1.compareTo(s2));
894 System.out.println("s1.compareTo(s3)=" + s1.compareTo(s3));
895 System.out.println("s2.compareTo(s3)=" + s2.compareTo(s3));
896 System.out.println("s1=" + s1);
897 System.out.println("s2=" + s2);
898 System.out.println("s3=" + s3);
899 System.out.println("max(s1, s2)=s2=" + SequenceNo.max(s1, s2));
900 System.out.println("max(s1, s3)=s1=" + SequenceNo.max(s1, s3));
901 System.out.println("max(s2, s3)=s2=" + SequenceNo.max(s2, s3));
902 }
903
904 }