1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.net.DatagramPacket;
23
24 import jgroup.core.ConfigurationException;
25 import jgroup.core.EndPoint;
26 import jgroup.core.JgroupException;
27 import jgroup.relacs.config.DistributedSystemConfig;
28 import jgroup.relacs.config.TransportConfig;
29 import jgroup.relacs.daemon.DaemonMsg;
30 import jgroup.relacs.events.Event;
31 import jgroup.util.InMessage;
32
33 import org.apache.log4j.Logger;
34
35
36
37
38
39
40
41
42
43
44 public final class Mss
45 implements EhandlerUser, MssConstants, MssTag
46 {
47
48
49
50
51
52
53 private static final Logger log = Logger.getLogger(Mss.class);
54
55
56
57
58
59
60
61
62
63
64
65
66 private static byte[] sentFrag;
67
68
69
70
71
72
73
74 private MssHost localHost;
75
76
77 private MssUser mssuser;
78
79
80 private MssDS mssds;
81
82
83 private EventHandler ehandler;
84
85
86 private MsgCntrl msgCntrl;
87
88
89
90
91 private long routingTimeout;
92
93
94 private long congestionTimeout;
95
96
97 private ScheduledEvent lastRoutingScheduled;
98
99
100
101
102 private int payload;
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public Mss(MssUser mssuser, DistributedSystemConfig dsc, TransportConfig tconf)
125 throws ConfigurationException, IOException
126 {
127 this.mssuser = mssuser;
128
129
130 routingTimeout = tconf.getRoutingTimeout();
131 congestionTimeout = tconf.getCongestionTimeout();
132
133
134 payload = tconf.getPayload();
135
136
137 ehandler = new EventHandler(this);
138 mssds = new MssDS(ehandler, dsc, tconf);
139 localHost = HostTable.getLocalHost();
140
141
142
143
144
145
146
147 sentFrag = FragmentHeader.marshal(SENT, false, UNDEF);
148
149
150 msgCntrl = new MsgCntrl(ehandler, mssuser, mssds);
151 mssds.setControl(msgCntrl);
152
153
154
155
156
157
158 RandomGenerator.setRoutingTimeout(routingTimeout);
159
160
161
162
163 iamAliveTimeout();
164 routingTimeout();
165
166 congTimeout();
167
168 mssds.doStart();
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182 public void send(byte tag, DaemonMsg msg, EndPoint receiver)
183 {
184 if (receiver.isLocal()) {
185 throw new IllegalArgumentException("Local message sending not allowed");
186 }
187
188
189 MssHost host = mssds.hostLookup(receiver);
190 Cluster cluster = host.getCluster();
191 cluster.addReceiver(receiver);
192
193
194
195
196 MsgJG msgjg = null;
197 try {
198 msgjg = MsgJG.marshal(msg, tag, new EndPoint[] { receiver }, mssds);
199 } catch (IOException e) {
200 log.error("Cannot marshal the mss level data into the message stream", e);
201 return;
202 }
203
204
205
206
207
208 if (cluster.isReachable()) {
209
210
211
212 cluster.send(msgjg);
213
214 } else if (log.isDebugEnabled()) {
215 log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
216 }
217 cluster.clearReceivers();
218 }
219
220
221
222
223
224
225
226 public void msend(byte tag, DaemonMsg msg)
227 {
228
229
230
231 MsgJG msgjg = null;
232 try {
233 msgjg = MsgJG.marshal(msg, tag, mssds);
234 } catch (IOException e) {
235 log.error("Cannot marshal the mss level data into the message stream", e);
236 return;
237 }
238
239
240
241
242 ClusterTable clustertable = mssds.getClusterTable();
243 for (int i = 0, size = clustertable.size(); i < size; i++) {
244 Cluster cluster = clustertable.get(i);
245 if (cluster.isReachable()) {
246
247 cluster.send(msgjg);
248
249 } else if (log.isDebugEnabled()) {
250 log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
251 }
252 }
253 }
254
255
256
257
258
259
260 public void msend(byte tag, DaemonMsg msg, EndPoint[] receivers)
261 {
262
263 for (int i = 0; i < receivers.length; i++) {
264 if (!receivers[i].isLocal()) {
265 MssHost host = mssds.hostLookup(receivers[i]);
266 host.getCluster().addReceiver(receivers[i]);
267 } else if (log.isDebugEnabled()) {
268 log.debug("Local host has been removed from the destination list");
269 }
270 }
271
272
273
274
275 MsgJG msgjg = null;
276 try {
277 msgjg = MsgJG.marshal(msg, tag, receivers, mssds);
278 } catch (IOException e) {
279 log.error("Cannot marshal the mss level data into the message stream", e);
280 return;
281 }
282
283
284
285
286 ClusterTable clustertable = mssds.getClusterTable();
287 for (int i = 0, size = clustertable.size(); i < size; i++) {
288 Cluster cluster = clustertable.get(i);
289
290
291
292
293 if (cluster.hasReceivers()) {
294 if (cluster.isReachable()) {
295
296 cluster.send(msgjg);
297
298 } else if (log.isDebugEnabled()) {
299 log.debug("msend failed: no reachable member for cluster " + cluster.getEndPoint());
300 }
301 }
302 cluster.clearReceivers();
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318 public String toString()
319 {
320 return ehandler.toString();
321 }
322
323
324
325
326
327
328
329
330 public void rreceive(DatagramPacket packet)
331 {
332 byte[] buf = packet.getData();
333 int bufLen = packet.getLength();
334
335 log.assertLog(buf.length <= payload + OVERHEAD_SIZE,
336 "Buffer length (" + buf.length + ") should not exceed the payload plus overhead ("
337 + (payload + OVERHEAD_SIZE) + ")");
338
339 try {
340 FragmentHeader header = FragmentHeader.unmarshal(mssds, buf, bufLen);
341 if (log.isDebugEnabled())
342 log.debug("packet header=" + header);
343
344
345
346
347
348
349 if (mssds.hasNewIncarnation(header))
350 generateSuspect();
351
352 switch(header.getTag()) {
353
354 case SENT:
355 {
356 if (header.isLocal()) {
357 if (log.isDebugEnabled())
358 log.debug("Discarding locally sent SENT message");
359 } else {
360 log.assertLog(lastRoutingScheduled != null, "Last scheduled ROUTING event is null");
361
362 if (log.isDebugEnabled())
363 log.debug("rreceived packet SENT header= " + header);
364
365 ehandler.abortTimeout(lastRoutingScheduled);
366
367
368 long timeout = RandomGenerator.getRoutingTimeout();
369 lastRoutingScheduled = ehandler.setTimeout(timeout, new ScheduledEvent(ROUTING));
370 }
371 }
372 break;
373
374 case IAMALIVE:
375 {
376
377
378
379
380
381
382
383
384
385
386
387
388 InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
389 inmsg.insert(buf, bufLen);
390 handleIamAliveMsg(MsgIamAlive.unmarshal(inmsg, header));
391 }
392 break;
393
394 case ROUTING:
395 {
396 log.assertLog(!header.isLocal(),
397 "Received ROUTING msg sent by me; shouldn't happen.");
398
399
400
401
402
403
404
405
406 header.setTag(FWDROUTING);
407 localHost.getCluster().forward(header.getFragment(), header.getFragmentLength());
408 }
409 break;
410
411 case FWDROUTING:
412 {
413
414
415
416
417 log.assertLog(!header.isLocal(),
418 "Received FWDROUTING msg sent by me; should this happen?");
419
420
421
422
423
424
425
426
427
428
429
430 InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
431 inmsg.insert(buf, bufLen);
432 MsgRouting msg = MsgRouting.unmarshal(inmsg, header);
433 handleRoutingMsg(msg);
434 }
435 break;
436
437 case NACK:
438 {
439 if (header.isLocal()) {
440 if (log.isDebugEnabled())
441 log.debug("Discarding locally sent NACK message");
442 } else {
443
444
445
446
447 InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
448 inmsg.insert(buf, bufLen);
449 MsgNACK nack = MsgNACK.unmarshal(inmsg, header, mssds);
450 handleNACKMsg(nack);
451 }
452 }
453 break;
454
455 case SYN:
456 {
457
458
459
460
461 InMessage inmsg = new InMessage(bufLen, HEADER_SIZE, TRAILER_SIZE, bufLen);
462 inmsg.insert(buf, bufLen);
463 MsgSYN syn = MsgSYN.unmarshal(inmsg, header);
464 handleSYNMsg(syn);
465 }
466 break;
467
468 case NOTLASTFRAGMENT:
469 case JG:
470 {
471
472 if (header.isLocal()) {
473 if (log.isDebugEnabled()) {
474 log.debug("Discarding local upper layer message (" + header + ")");
475 }
476 return;
477 }
478
479
480 if (!handleJGInfo(header)) {
481
482
483
484
485
486
487
488
489
490 ehandler.rnotify(packet);
491 }
492 }
493 break;
494
495 default:
496 log.warn("Invalid packet tag: " + header.getTag());
497 }
498 } catch (IOException e) {
499
500 log.error("rreceive: malformed packet", e);
501 } catch (ClassCastException e) {
502 log.error("rreceive: malformed packet", e);
503 } catch (ClassNotFoundException e) {
504 log.error("rreceive: malformed packet", e);
505 } catch (JgroupException e) {
506 log.error("rreceive: packet from unknown host; all hosts must be in the config.xml", e);
507 }
508 }
509
510
511
512
513
514 public void lreceive(Event event)
515 {
516 mssuser.localReceive(event);
517 }
518
519
520
521
522
523 public void treceive(ScheduledEvent event)
524 {
525 switch (event.getTag()) {
526
527 case IAMALIVE:
528 iamAliveTimeout();
529 break;
530
531 case ROUTING:
532 routingTimeout();
533 break;
534
535 case REMOTENACK:
536 nackTimeout((MsgNACK) event.getData());
537 break;
538
539 case CONGESTION:
540 congTimeout();
541 break;
542
543 default:
544 log.warn("Unknown timeout event received");
545 }
546 }
547
548
549 public void lsend(Event msg)
550 {
551 ehandler.lnotify(msg);
552 }
553
554
555
556
557
558 public HostTable getHostTable()
559 {
560 return mssds.getHostTable();
561 }
562
563
564
565
566
567
568
569
570
571 private void handleSYNMsg(MsgSYN msg)
572 {
573 if (log.isDebugEnabled())
574 log.debug("handleSYNMsg: " + msg);
575
576
577
578 MssHost sender = msg.getSender();
579 switch (msg.getSYNType()) {
580
581 case QSYN:
582
583
584
585
586 try {
587
588
589
590
591
592
593
594 MsgFlowSndrSide msgFlow = msg.getMsgFlow();
595 if (!msgFlow.isInitialized()) {
596 Cluster cluster = msg.getSender().getCluster();
597 cluster.resetMsgFlow();
598 if (log.isDebugEnabled())
599 log.debug("cluster.resetMsgFlow()");
600 }
601 MsgSYN syn = MsgSYN.marshal(ASYN, msg, msgFlow.getLastMsgSent());
602 sender.unicastRouteSend(syn);
603 if (log.isDebugEnabled())
604 log.debug("Sent " + syn);
605 } catch (IOException e) {
606 log.error("Unable to marshal ASYN message; nothing sent", e);
607 }
608 break;
609
610 case ASYN:
611
612
613
614
615 if (sender.checkSynId(msg.getMid())) {
616
617
618 sender.abortTimeout();
619
620
621 sender.resetMsgFlow(msg.getLastMsgSent());
622
623
624
625
626
627
628
629 Cluster cluster = sender.getCluster();
630 if (!cluster.getMsgFlow().isInitialized()) {
631 cluster.resetMsgFlow(msg.getLastMsgSent());
632 }
633
634
635
636
637
638 mssds.getUpperView().setAsReachable(sender);
639 if (log.isDebugEnabled())
640 log.debug("Synchronized to " + sender + " nonce (" + msg.getMid()
641 + ") - Msg flow reset to lastMsgSent=" + msg.getLastMsgSent());
642
643
644 }
645 break;
646
647 default:
648 log.warn("Invalid synchronization message received");
649 }
650 if (log.isDebugEnabled())
651 log.debug("handleSYNMsg end");
652 }
653
654
655
656
657
658 private void handleNACKMsg(MsgNACK nack)
659 {
660 if (log.isDebugEnabled())
661 log.debug(nack);
662
663
664
665 if (NACKSUPPRESSION) {
666
667
668
669
670 MssHost src = nack.getSource();
671 if (src.isLocal()) {
672
673
674
675
676 resendMsg(nack);
677 nack.getMsgFlow().incNackCount();
678
679 } else if (!nack.isRemoteNACK()) {
680
681
682
683
684
685
686
687
688 ScheduledEvent eventNack = src.getMsgFlow().getScheduledEvent(nack.mid);
689 if (eventNack == null) {
690 log.warn("No scheduled NACK event for " + nack.mid);
691 } else {
692
693 ehandler.abortTimeout(eventNack);
694
695
696
697
698 long randomTimeout = RandomGenerator.getRandomTimeout(src.getTimeout());
699 ehandler.setTimeout(randomTimeout, eventNack);
700 }
701 }
702
703 } else if (nack.isRemoteNACK()) {
704
705
706
707
708
709
710 resendMsg(nack);
711
712 }
713 if (log.isDebugEnabled())
714 log.debug("handleNACKMsg end");
715 }
716
717
718
719
720
721
722 private void resendMsg(MsgNACK nack)
723 {
724 if (log.isDebugEnabled())
725 log.debug(nack);
726 Cluster destCluster = nack.getCluster();
727 MsgFlowSndrSide msgFlow = destCluster.getMsgFlow();
728
729 FragmentIterator fragIter = msgFlow.getSentMsgFrag(nack);
730 if (fragIter != null) {
731 if (log.isDebugEnabled()) {
732 log.debug("fragIter=" + fragIter);
733 }
734 destCluster.resend(fragIter, nack.mid);
735 destCluster.clearReceivers();
736
737 if (log.isDebugEnabled()) {
738 log.debug("Resent msg fragment (" + nack.mid + ") to cluster " + destCluster.getEndPoint());
739 }
740 } else {
741 log.warn("Cannot resend msg fragment (" + nack.mid + "); already removed from cluster queue "
742 + destCluster.getEndPoint());
743 }
744 }
745
746
747
748
749
750 private void handleRoutingMsg(MsgRouting msg)
751 {
752 if (log.isDebugEnabled())
753 log.debug("handleRoutingMsg start");
754
755
756
757
758 if (mssds.checkIncarnation(msg))
759 generateSuspect();
760
761
762 mssds.updateRoutingTable(msg);
763
764
765 msg.updateFC();
766 generateSuspect();
767
768 if (log.isDebugEnabled())
769 log.debug("handleRoutingMsg end");
770 }
771
772
773
774
775
776 private void handleIamAliveMsg(MsgIamAlive msg)
777 {
778 if (log.isDebugEnabled())
779 log.debug("handleIamAliveMsg start");
780
781
782 if (!msg.isLocal())
783 mssds.updateReachability(msg);
784
785 msg.updateFC(mssds);
786 generateSuspect();
787
788 if (log.isDebugEnabled())
789 log.debug("handleIamAliveMsg end");
790 }
791
792
793
794
795
796
797
798 private void generateSuspect()
799 {
800
801
802 DSView view = mssds.getView();
803
804 if (log.isDebugEnabled())
805 log.debug("generateSuspect(): view = "+ view);
806
807
808 MssHost[] newReachable = view.getNewReachableHosts();
809 for (int i=0; i < newReachable.length; i++) {
810 if (log.isDebugEnabled())
811 log.debug("newReachableHost: " + newReachable[i]);
812 newReachable[i].scheduleTimeout();
813 }
814
815
816 MssHost[] newUnreachable = view.getNewUnreachableHosts();
817 for (int i=0; i < newUnreachable.length; i++) {
818 if (log.isDebugEnabled())
819 log.debug("newUnreachableHost: " + newUnreachable[i]);
820 newUnreachable[i].abortTimeout();
821 }
822
823
824 DSView upperview = mssds.getUpperView();
825 if (upperview.hasChanged()) {
826 if (log.isDebugEnabled()) {
827 log.debug("generateSuspect(): Upper view changed to: " + upperview);
828 }
829
830 mssuser.remoteSuspect(upperview.getReachableEndPoints(),
831 upperview.getNewReachableEndPoints(),
832 upperview.getNewUnreachableEndPoints(),
833 upperview.getNewIncarnationEndPoints());
834 upperview.clear();
835 }
836
837 }
838
839
840
841
842
843
844
845
846
847
848 private void iamAliveTimeout()
849 {
850
851 mssds.update();
852
853 try {
854 MsgIamAlive msg = MsgIamAlive.marshal(mssds.getAllFCEntry());
855 localHost.getCluster().send(msg);
856 } catch (IOException e) {
857 log.error("Cannot marshal IAMALIVE flow control data into the message stream", e);
858 return;
859 }
860
861
862 ehandler.setTimeout(routingTimeout, new ScheduledEvent(IAMALIVE));
863 if (log.isDebugEnabled())
864 log.debug("Sent IAMALIVE message; set timeout " + routingTimeout);
865
866
867 generateSuspect();
868 }
869
870
871
872
873
874
875
876
877 private void routingTimeout()
878 {
879 log.assertLog(localHost.isReachable(),
880 "Localhost unreachable - alive value=" + localHost.getReachability() +
881 ", live cluster members=" + localHost.getCluster().getReachableCounter());
882
883
884 MsgRouting msg = null;
885 try {
886 msg = MsgRouting.marshal(mssds);
887 } catch (IOException e) {
888 log.error("Cannot marshal the ROUTING data into the message stream", e);
889 return;
890 }
891
892
893
894
895
896 ClusterTable clustertable = mssds.getClusterTable();
897 for (int i = 0, size = clustertable.size(); i < size; i++) {
898 Cluster cluster = clustertable.get(i);
899
900 if (cluster.isLocal())
901 continue;
902
903 EndPoint clusterEP = cluster.getEndPoint();
904
905
906
907
908
909 msg.splitHorizonOn(clusterEP);
910
911
912
913
914
915
916 msg.setFCData(clusterEP);
917
918 cluster.send(msg);
919 if (log.isDebugEnabled())
920 log.debug("Sent FLOW CONTROL and ROUTING info to cluster: " + clusterEP);
921 msg.splitHorizonOff();
922 }
923
924
925 long timeout = RandomGenerator.getRoutingTimeout();
926 lastRoutingScheduled = ehandler.setTimeout(timeout, new ScheduledEvent(ROUTING));
927
928
929
930
931 localHost.getCluster().forward(sentFrag, sentFrag.length);
932 }
933
934
935
936
937
938
939 private void congTimeout()
940 {
941 ClusterTable clustertable = mssds.getClusterTable();
942 for (int i=0; i < clustertable.size(); i++) {
943 MsgFlowSndrSide msgFlow = clustertable.get(i).getMsgFlow();
944 if (msgFlow.isCongested())
945 msgFlow.decCongWin();
946 msgFlow.clearNackCount();
947 }
948 ehandler.setTimeout(congestionTimeout, new ScheduledEvent(CONGESTION));
949 }
950
951
952 private void nackTimeout(MsgNACK nack)
953 {
954 MssHost src = nack.getSource();
955 MsgFlowRcvrSide msgFlow = src.getMsgFlow();
956
957 if (NACKSUPPRESSION) {
958 if (msgFlow.isDelivered(nack.mid)) {
959 log.warn("NACK timeout for already delivered msg (" + nack.mid + ")");
960 msgFlow.removeScheduledEvent(nack.mid);
961 } else {
962 if (src.getCluster().isLocal()) {
963 if (log.isDebugEnabled())
964 log.debug("Sending NACK for msg (" + nack.mid + ") to cluster " + localHost.getCluster());
965 localHost.getCluster().send(nack);
966 } else {
967 if (log.isDebugEnabled())
968 log.debug("Sending NACK for msg (" + nack.mid + ") to " + src);
969 src.unicastRouteSend(nack);
970 try {
971 MsgNACK sentNack = MsgNACK.marshal(SENTNACK, src, nack.mid);
972 if (log.isDebugEnabled())
973 log.debug("Sending SENTNACK to cluster " + localHost.getCluster());
974 localHost.getCluster().send(sentNack);
975 } catch (IOException e) {
976 log.error("Unable to marshal SENTNACK for local cluster: " + localHost.getCluster(), e);
977 return;
978 }
979 }
980 ScheduledEvent eventNack = msgFlow.getScheduledEvent(nack.mid);
981 if (eventNack == null) {
982 log.warn("No scheduled NACK event for " + nack.mid);
983 } else {
984
985 ehandler.abortTimeout(eventNack);
986
987
988
989
990 long randomTimeout = RandomGenerator.getRandomTimeout(src.getTimeout());
991 ehandler.setTimeout(randomTimeout, eventNack);
992 }
993 }
994 } else if (nack.mid <= msgFlow.getLastMsgDlvr()) {
995 log.warn("NACK timeout for already delivered msg (" + nack.mid + ")");
996 msgFlow.removeScheduledEvent(nack.mid);
997 } else {
998 if (log.isDebugEnabled())
999 log.debug("Sending NACK for msg (" + nack.mid + ") to " + src);
1000 src.unicastRouteSend(nack);
1001
1002 msgFlow.removeScheduledEvent(nack.mid);
1003
1004 ScheduledEvent eventNack = new ScheduledEvent(nack.getNACKType(), nack);
1005 ehandler.setTimeout(src.getTimeout(), eventNack);
1006 msgFlow.putScheduledEvent(nack.mid, eventNack);
1007 }
1008 }
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029 private boolean handleJGInfo(FragmentHeader fragment)
1030 {
1031 if (fragment.isBroadcast()) {
1032
1033
1034
1035
1036
1037 Cluster localCluster = ClusterTable.getLocalCluster();
1038 if (log.isDebugEnabled())
1039 log.debug("Broadcasting msg fragment (" + fragment + ") within local cluster "
1040 + localCluster.getEndPoint());
1041 fragment.broadcastOff();
1042 localCluster.forward(fragment.getFragment(), fragment.getFragmentLength());
1043
1044
1045
1046
1047
1048
1049
1050 } else {
1051 if (log.isDebugEnabled())
1052 log.debug("Received msg (" + fragment.fragId + ") from " + fragment.getSender());
1053 return msgCntrl.msgReceive(fragment);
1054 }
1055 return true;
1056 }
1057
1058 }