1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.gmi.protocols;
20
21 import java.io.Externalizable;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.ObjectInput;
25 import java.io.ObjectOutput;
26 import java.io.OutputStream;
27 import java.rmi.RemoteException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.NoSuchElementException;
35 import java.util.SortedMap;
36 import java.util.TreeMap;
37 import java.util.concurrent.locks.Lock;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import jgroup.core.MemberId;
41 import jgroup.core.MembershipService;
42 import jgroup.core.multicast.MulticastListener;
43 import jgroup.core.multicast.MulticastService;
44 import jgroup.relacs.gmi.GroupAckListener;
45 import jgroup.relacs.gmi.GroupInvocationDispatcher;
46 import jgroup.relacs.gmi.InvocationResult;
47 import jgroup.relacs.gmi.MethodSemantics;
48 import jgroup.relacs.types.MemberIdImpl;
49 import jgroup.util.InMessage;
50 import jgroup.util.OutMessage;
51
52 import org.apache.log4j.Logger;
53 import org.apache.log4j.MDC;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class Atomic
71 extends BasicDispatcher
72 implements MulticastListener
73 {
74
75
76
77
78
79
80 private static final Logger log = Logger.getLogger(Atomic.class);
81
82
83
84
85
86
87
88 private static final String PROTOCOL_NAME = MethodSemantics.ATOMIC.toString();
89
90
91 private enum MsgType { REQUEST_SEQ_NO, AGREED_SEQ_NO };
92
93
94
95
96
97
98
99 private final Lock lock = new ReentrantLock();
100
101
102 private final MulticastService mcast;
103
104
105 private final MembershipService pgms;
106
107
108 private final MemberId me;
109
110
111 private SequenceNo largestAgreedSeqNo;
112
113
114 private SequenceNo largestProposedSeqNo;
115
116
117 private final Map<MessageID, SequenceNo> proposalsMap =
118 new HashMap<MessageID, SequenceNo>();
119
120
121 private final SortedMap<SequenceNo, InputStream> holdBackQueue =
122 new TreeMap<SequenceNo, InputStream>();
123
124
125 private final Map<SequenceNo, ResultCollector> resultCollectors =
126 Collections.synchronizedMap(new HashMap<SequenceNo, ResultCollector>());
127
128
129
130
131
132
133 public Atomic(GroupInvocationDispatcher dispatcher,
134 MulticastService mcast, MembershipService pgms)
135 {
136 super(dispatcher);
137 this.mcast = mcast;
138 this.pgms = pgms;
139 this.me = pgms.getMyIdentifier();
140 largestAgreedSeqNo = new SequenceNo(me);
141 largestProposedSeqNo = new SequenceNo(me);
142
143
144
145
146
147 mcast.addListener(this);
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 public InvocationResult dispatch(InputStream in)
167 throws IOException
168 {
169
170 if (pgms.members() == 1)
171 return super.dispatch(in);
172
173 SequenceNo agreed = getAgreed(in);
174 if (agreed == null)
175 throw new IOException("Could not obtain agreed sequence number; Proposals not received from all members");
176
177 OutputStream agreedMsg = prepareAgreedMessage(agreed);
178 ResultCollector rc = new ResultCollector(agreed);
179 resultCollectors.put(agreed, rc);
180
181
182 try {
183 mcast.mcast(agreedMsg, rc.getRemoteAckListener());
184 } catch (Exception e) {
185 log.warn("Failed multicast agreed sequence numbers to the group", e);
186 IOException ioex = new IOException("Failed multicast agreed sequence numbers to the group");
187 ioex.initCause(e);
188 throw ioex;
189 }
190
191 return (InvocationResult) rc.getResult();
192 }
193
194
195
196
197
198
199
200 private OutputStream prepareAgreedMessage(SequenceNo agreed)
201 throws IOException
202 {
203
204 OutMessage out = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
205
206 out.writeObject(MsgType.AGREED_SEQ_NO);
207
208 agreed.writeExternal(out);
209 out.flush();
210 return out;
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225 private SequenceNo getAgreed(InputStream in)
226 throws IOException
227 {
228 OutputStream reqMsg = prepareReqMessage(in);
229
230 ProposalCollector pc = new ProposalCollector();
231 if (log.isDebugEnabled())
232 log.debug("Atomic: multicasting invocation to group members");
233 try {
234 mcast.mcast(reqMsg, pc.getRemoteAckListener());
235 } catch (Exception e) {
236 log.warn("Invocation failed", e);
237 IOException ioex = new IOException("Failed to send invocation to group");
238 ioex.initCause(e);
239 throw ioex;
240 }
241
242 return pc.getGreatestProposal();
243 }
244
245
246
247
248
249
250
251
252 private OutputStream prepareReqMessage(InputStream in)
253 throws IOException
254 {
255 OutMessage mout = (OutMessage) mcast.getMessage(PROTOCOL_NAME);
256 mout.writeObject(MsgType.REQUEST_SEQ_NO);
257
258 MessageID msgId = new MessageID(me);
259 msgId.writeExternal(mout);
260
261 byte[] buf = new byte[1500];
262 int bytesRead;
263 while ((bytesRead = in.read(buf)) != -1) {
264 mout.write(buf, 0, bytesRead);
265 }
266 return mout;
267 }
268
269
270
271
272
273
274
275
276
277 public String getProtocolName()
278 {
279 return PROTOCOL_NAME;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294 public Object deliverStream(InputStream msg, MemberId sender, int seqNo)
295 {
296 if (log.isDebugEnabled()) {
297 MDC.put("group", "[Group: " + pgms.getGid() + "]");
298 log.debug("deliverStream() from " + sender);
299 }
300 InMessage inmsg = (InMessage) msg;
301 try {
302
303 MsgType msgType = (MsgType) inmsg.readObject();
304
305 switch (msgType) {
306 case REQUEST_SEQ_NO:
307 return computeProposedSeqNo(inmsg);
308
309 case AGREED_SEQ_NO:
310 return dispatchAgreedInvocations(inmsg);
311
312 default:
313 log.error("Unknown message type: " + msgType);
314 break;
315 }
316 } catch (IOException e) {
317 log.warn("Failed to read from input stream", e);
318 } catch (ClassNotFoundException e) {
319 log.warn("Class not found", e);
320 }
321
322 return null;
323 }
324
325
326 private Object dispatchAgreedInvocations(InMessage inmsg)
327 throws IOException, ClassNotFoundException
328 {
329
330 SequenceNo agreed = new SequenceNo();
331 agreed.readExternal(inmsg);
332 if (log.isDebugEnabled())
333 log.debug("Received AGREED: " + agreed + " for msg " + agreed.msgId);
334
335
336 reorderQueue(agreed);
337 if (log.isDebugEnabled()) {
338 log.debug("holdBackQueue=" + holdBackQueue.keySet());
339 log.debug(" proposalsMap=" + proposalsMap);
340 }
341
342 return dispatchFromQueue(agreed);
343 }
344
345
346
347
348
349
350 private Object dispatchFromQueue(SequenceNo agreed)
351 {
352 List<SequenceNo> results = new ArrayList<SequenceNo>(holdBackQueue.size());
353 lock.lock();
354 try {
355 for (Iterator<Map.Entry<SequenceNo, InputStream>> iter = holdBackQueue.entrySet().iterator(); iter.hasNext();) {
356 Map.Entry<SequenceNo, InputStream> entry = iter.next();
357 SequenceNo seqno = entry.getKey();
358
359 if (seqno.equals(agreed))
360 seqno.markAsDeliverable();
361 if (seqno.isDeliverable()) {
362 InputStream stream = entry.getValue();
363 iter.remove();
364 if (log.isDebugEnabled())
365 log.debug("Delivering message: " + seqno.msgId + ", assigned seqno: " + seqno);
366 try {
367
368 seqno.result = super.dispatch(stream);
369
370
371
372
373 results.add(seqno);
374 } catch (IOException e) {
375 log.warn("Failed to dispatch atomic invocation", e);
376 }
377 } else {
378
379 if (log.isDebugEnabled())
380 log.debug("Stopping message delivery. " + seqno.msgId + " is undeliverable: " + seqno);
381 break;
382 }
383 }
384 } finally {
385 lock.unlock();
386 }
387 if (log.isDebugEnabled())
388 log.debug("results.size=" + results.size());
389 return results;
390 }
391
392
393
394
395
396
397
398
399 private void reorderQueue(SequenceNo agreed)
400 {
401 lock.lock();
402 try {
403
404 largestAgreedSeqNo = SequenceNo.max(largestAgreedSeqNo, agreed);
405
406 SequenceNo myProposal = proposalsMap.remove(agreed.msgId);
407 if (!agreed.equals(myProposal)) {
408
409 if (log.isDebugEnabled())
410 log.debug("Reordering hold-back queue: myProposal=" + myProposal
411 + " is different from agreed=" + agreed);
412 InputStream stream = holdBackQueue.remove(myProposal);
413 if (stream == null) {
414 log.error("MESSAGE NOT FOUND IN HOLD-BACK QUEUE: " + myProposal);
415
416 throw new NoSuchElementException("Message " + myProposal + " not found in hold-back queue");
417 } else {
418 holdBackQueue.put(agreed, stream);
419 }
420 }
421 } finally {
422 lock.unlock();
423 }
424 }
425
426
427
428
429
430
431
432
433
434 private SequenceNo computeProposedSeqNo(InMessage inmsg)
435 throws IOException, ClassNotFoundException
436 {
437
438 MessageID msgId = new MessageID();
439 msgId.readExternal(inmsg);
440 lock.lock();
441 try {
442
443 largestProposedSeqNo = SequenceNo.max(largestAgreedSeqNo, largestProposedSeqNo);
444 largestProposedSeqNo.updateSeqNo(msgId);
445
446 proposalsMap.put(msgId, largestProposedSeqNo);
447
448
449
450
451 holdBackQueue.put(largestProposedSeqNo, inmsg);
452 if (log.isDebugEnabled())
453 log.debug("Computed PROPOSAL: " + largestProposedSeqNo + " for msg " + msgId);
454 return largestProposedSeqNo;
455 } finally {
456 lock.unlock();
457 }
458
459 }
460
461
462
463
464
465 public Object deliverObject(Object msg, MemberId sender, int seqNo)
466 {
467 throw new UnsupportedOperationException();
468 }
469
470
471
472
473
474
475
476
477
478
479 private class ProposalCollector
480 extends GroupAckListener
481 {
482
483 public ProposalCollector()
484 {
485 super(new Object());
486 }
487
488 private boolean isCompleted(int index)
489 {
490 return (completed[index] && !(results[index] instanceof RemoteException));
491 }
492
493
494
495
496
497
498 public SequenceNo getGreatestProposal()
499 {
500 pendingCompletion();
501
502
503 SequenceNo greatestProposal = null;
504 if (isCompleted(0)) {
505 greatestProposal = (SequenceNo) results[0];
506 for (int i = 1; i < results.length; i++) {
507 if (isCompleted(i)) {
508 SequenceNo theProposal = (SequenceNo) results[i];
509 greatestProposal = SequenceNo.max(theProposal, greatestProposal);
510 } else {
511 greatestProposal = null;
512 break;
513 }
514 }
515 }
516 if (greatestProposal == null)
517 log.warn("Not all members have provided a PROPOSAL");
518 return greatestProposal;
519 }
520
521 }
522
523
524
525
526
527
528 private class ResultCollector
529 extends GroupAckListener
530 {
531
532 private SequenceNo agreed;
533
534 public ResultCollector(SequenceNo agreed)
535 {
536 super(agreed);
537 this.agreed = agreed;
538 }
539
540
541
542
543 @SuppressWarnings("unchecked")
544 @Override
545 public void ack(MemberId id, int pos, Object obj)
546 throws RemoteException
547 {
548 if (log.isDebugEnabled())
549 log.debug("Recieved result from " + id + ", pos=" + pos + ": " + obj);
550 if (log.isDebugEnabled())
551 log.debug("resultCollectors: " + resultCollectors.keySet());
552
553 List<SequenceNo> results = (List) obj;
554 if (results.isEmpty()) {
555
556
557
558
559 if (log.isDebugEnabled())
560 log.debug("Empty results list when agreed=" + agreed);
561 ResultCollector rc = resultCollectors.get(agreed);
562
563 rc.doAck(id, pos, null);
564 return;
565 }
566 for (SequenceNo seqno : results) {
567 ResultCollector rc = resultCollectors.get(seqno);
568 if (log.isDebugEnabled())
569 log.debug("Delivering seqno=" + seqno + ", when agreed=" + agreed);
570 rc.doAck(id, pos, seqno.result);
571 }
572 }
573
574 private void doAck(MemberId id, int pos, Object obj)
575 throws RemoteException
576 {
577 super.ack(id, pos, obj);
578 }
579
580 @Override
581 public Object getResult()
582 {
583
584 Object result = super.getResult();
585
586 resultCollectors.remove(agreed);
587 return result;
588 }
589
590 }
591
592
593
594
595
596
597
598
599
600 private static final class SequenceNo
601 implements Externalizable, Comparable<SequenceNo>
602 {
603
604 private static final long serialVersionUID = -6394934923836273234L;
605
606
607 private int sequenceNo = 100;
608
609
610 private MemberId member;
611
612
613
614
615
616
617 private MessageID msgId = null;
618
619
620
621
622 private Object result = null;
623
624
625 private transient boolean deliverable = false;
626
627
628
629 public SequenceNo() {}
630
631 public SequenceNo(MemberId me)
632 {
633 member = me;
634 }
635
636 public SequenceNo(SequenceNo oldSeq)
637 {
638 this.sequenceNo = oldSeq.sequenceNo;
639 this.member = oldSeq.member;
640 this.msgId = oldSeq.msgId;
641 }
642
643 public static SequenceNo max(SequenceNo s1, SequenceNo s2)
644 {
645 if (s1.compareTo(s2) > 0) {
646 return new SequenceNo(s1);
647 } else {
648 return new SequenceNo(s2);
649 }
650 }
651
652
653
654
655
656
657
658 public void updateSeqNo(MessageID msgId)
659 {
660 sequenceNo++;
661 this.msgId = msgId;
662 }
663
664 public boolean isDeliverable()
665 {
666 return deliverable;
667 }
668
669 public void markAsDeliverable()
670 {
671 deliverable = true;
672 }
673
674 public int hashCode()
675 {
676 return sequenceNo ^ member.hashCode();
677 }
678
679 public boolean equals(Object obj)
680 {
681 if (obj instanceof SequenceNo) {
682 SequenceNo sno = (SequenceNo) obj;
683 return (sequenceNo == sno.sequenceNo && member.equals(sno.member));
684 } else {
685 return false;
686 }
687 }
688
689 public String toString()
690 {
691 StringBuilder buf = new StringBuilder();
692 buf.append("[");
693 String a = member.getEndPoint().getAddress().getHostAddress().toString();
694 String[] b = a.split("\\.");
695 if (b.length > 0)
696 buf.append(b[b.length-1]);
697 buf.append(".");
698 buf.append(sequenceNo);
699 buf.append("]");
700 return buf.toString();
701 }
702
703
704
705
706 public void readExternal(ObjectInput in)
707 throws IOException, ClassNotFoundException
708 {
709 sequenceNo = in.readInt();
710 member = new MemberIdImpl();
711 member.readExternal(in);
712 msgId = new MessageID();
713 msgId.readExternal(in);
714 result = in.readObject();
715 }
716
717
718
719
720 public void writeExternal(ObjectOutput out)
721 throws IOException
722 {
723 out.writeInt(sequenceNo);
724 member.writeExternal(out);
725 msgId.writeExternal(out);
726 out.writeObject(result);
727 }
728
729
730
731
732
733
734
735
736
737
738
739 public int compareTo(SequenceNo seqno)
740 {
741 if (this == seqno)
742 return 0;
743 if (this.sequenceNo == seqno.sequenceNo)
744 return this.member.compareTo(seqno.member);
745 else if (this.sequenceNo > seqno.sequenceNo)
746 return 1;
747 else
748 return -1;
749 }
750
751 }
752
753
754
755
756
757
758
759
760
761
762
763 private static final class MessageID
764 implements Externalizable
765 {
766
767 private static final long serialVersionUID = -6073546790683304835L;
768
769 private static long nextSequenceNo = 0;
770 private long sequenceNo;
771 private MemberId memberId;
772
773
774 public MessageID() {}
775
776 public MessageID(MemberId id)
777 {
778 this.sequenceNo = nextSequenceNo++;
779 this.memberId = id;
780 }
781
782 public boolean equals(Object obj)
783 {
784 if (obj instanceof MessageID) {
785 MessageID msgId = (MessageID) obj;
786 return (sequenceNo == msgId.sequenceNo && memberId.equals(msgId.memberId));
787 } else {
788 return false;
789 }
790 }
791
792 public int hashCode()
793 {
794 return (int)(sequenceNo ^ (sequenceNo >>> 32)) ^ memberId.hashCode();
795 }
796
797 public String toString()
798 {
799 StringBuilder buf = new StringBuilder();
800 buf.append("[");
801 String a = memberId.getEndPoint().getAddress().getHostAddress().toString();
802 String[] b = a.split("\\.");
803 if (b.length > 0)
804 buf.append(b[b.length-1]);
805 buf.append("*");
806 buf.append(sequenceNo);
807 buf.append("]");
808 return buf.toString();
809 }
810
811
812
813
814 public void readExternal(ObjectInput in)
815 throws IOException, ClassNotFoundException
816 {
817 sequenceNo = in.readLong();
818 memberId = new MemberIdImpl();
819 memberId.readExternal(in);
820 }
821
822
823
824
825 public void writeExternal(ObjectOutput out)
826 throws IOException
827 {
828 out.writeLong(sequenceNo);
829 memberId.writeExternal(out);
830 }
831
832 }
833
834 }