1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.gm;
20
21 import static jgroup.util.log.ViewEvent.Type.Merge;
22
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import javax.naming.ConfigurationException;
30
31 import jgroup.core.JgroupException;
32 import jgroup.core.MemberId;
33 import jgroup.core.MemberTable;
34 import jgroup.core.MembershipListener;
35 import jgroup.core.MembershipService;
36 import jgroup.core.MergingListener;
37 import jgroup.core.MergingService;
38 import jgroup.core.View;
39 import jgroup.core.multicast.MulticastListener;
40 import jgroup.core.multicast.MulticastService;
41 import jgroup.relacs.rmi.MarshalInputStream;
42 import jgroup.relacs.rmi.MarshalOutputStream;
43 import jgroup.util.log.Eventlogger;
44 import jgroup.util.log.ViewEvent;
45
46 import org.apache.log4j.Logger;
47
48
49
50
51
52
53
54
55
56
57
58 public class MergingLayer
59 implements MergingService, MembershipService, MembershipListener, MulticastListener
60 {
61
62
63
64
65
66
67 private static final Logger log = Logger.getLogger(MergingLayer.class);
68
69
70
71
72
73
74
75 private static final String PROTOCOL_NAME = "SMS";
76
77
78
79
80
81
82
83 private final MembershipService membershipService;
84
85
86 private final MulticastService multicastService;
87
88
89
90
91
92 private final List<MembershipListener> membershipListeners =
93 new ArrayList<MembershipListener>();
94
95
96
97
98
99 private final List<MergingListener> mergingListeners =
100 new ArrayList<MergingListener>();
101
102
103 private final MemberTable table;
104
105
106
107
108
109 private final MemberTable providedTable = new MemberTable();
110
111
112 private final MemberId me;
113
114
115 private View view;
116
117
118 private MemberId[] members;
119
120
121 private int remainingPutStates;
122
123
124 private View previousView;
125
126
127
128
129
130
131
132
133
134
135
136
137
138 private MergingLayer(MembershipService membershipService, MulticastService multicastService)
139 throws JgroupException
140 {
141 this.membershipService = membershipService;
142 this.multicastService = multicastService;
143
144 table = membershipService.getMemberTable();
145 me = membershipService.getMyIdentifier();
146 }
147
148
149
150
151
152
153 public static MergingLayer getLayer(MembershipService membService, MulticastService mcastService)
154 throws JgroupException
155 {
156 return new MergingLayer(membService, mcastService);
157 }
158
159
160
161
162
163
164
165
166
167
168
169 public void addListener(Object listener)
170 {
171 if (listener instanceof MembershipListener && !membershipListeners.contains(listener))
172 membershipListeners.add((MembershipListener) listener);
173 if (listener instanceof MergingListener && !mergingListeners.contains(listener))
174 mergingListeners.add((MergingListener) listener);
175 if (!(listener instanceof MergingListener) && !(listener instanceof MembershipListener))
176 throw new IllegalArgumentException("Specified listener does not implement a MembershipListener or MergingListener");
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190 public void complete(Object server)
191 throws JgroupException
192 {
193 membershipService.complete(server);
194 }
195
196
197
198
199
200
201
202
203
204 public void join(int gid)
205 throws JgroupException
206 {
207 membershipService.join(gid);
208 }
209
210
211
212
213 public void join()
214 throws JgroupException
215 {
216 membershipService.join();
217 }
218
219
220
221
222 public void leave()
223 throws JgroupException
224 {
225 membershipService.leave();
226 }
227
228
229
230
231 public boolean isMember()
232 {
233 return membershipService.isMember();
234 }
235
236
237
238
239 public boolean isJoining()
240 {
241 return membershipService.isJoining();
242 }
243
244
245
246
247 public boolean isMemberOrJoining()
248 {
249 return membershipService.isMemberOrJoining();
250 }
251
252
253
254
255 public boolean isLeader()
256 {
257 return membershipService.isLeader();
258 }
259
260
261
262
263 public MemberId getMyIdentifier()
264 {
265 return me;
266 }
267
268
269
270
271 public int getGid()
272 {
273 return membershipService.getGid();
274 }
275
276
277
278
279 public MemberTable getMemberTable()
280 {
281 return providedTable;
282 }
283
284 public int members()
285 {
286 return membershipService.members();
287 }
288
289
290
291
292 public int getViewIndex()
293 {
294 return membershipService.getViewIndex();
295 }
296
297
298
299
300
301 public int getMemberIndex()
302 {
303 return membershipService.getMemberIndex();
304 }
305
306
307
308
309
310
311
312
313
314 @AllowDuplicateViews public void viewChange(final View view)
315 {
316 if (log.isDebugEnabled())
317 log.debug("MergingLayer: viewChange");
318
319 providedTable.viewChange(view);
320
321 this.view = view;
322 remainingPutStates = view.mergingViews();
323 if (log.isDebugEnabled())
324 log.debug("REMAINING PUTSTATES: " + remainingPutStates);
325
326
327
328
329
330 members = view.getMembers();
331 for (int i=0; i < members.length; i++) {
332 int status = table.getStatus(members[i]);
333 if (status == MemberTable.NEWMEMBER || status == MemberTable.RECOVERING)
334 table.put(members[i], new MemberInfo(members[i], members[i].equals(me)));
335 }
336
337
338
339
340
341 Object[] objs = table.elements();
342 MemberInfo[] mdata = new MemberInfo[objs.length];
343 for (int i=0; i < objs.length; i++)
344 mdata[i] = (MemberInfo) objs[i];
345
346
347
348
349
350 boolean needupdate = false;
351 for (int i=0; i < mdata.length; i++) {
352 if (!table.isMember(mdata[i].id)) {
353 mdata[i].mset = false;
354 mdata[i].lset = false;
355 mdata[i].tset = false;
356 } else {
357 mdata[i].tset = true;
358 if (!mdata[i].mset && !mdata[i].id.equals(me))
359 needupdate = true;
360 }
361 if (log.isDebugEnabled()) {
362 StringBuilder buf = new StringBuilder();
363 buf.append(mdata[i]);
364 buf.append(", needupdate=");
365 buf.append(needupdate);
366 log.debug(buf.toString());
367 }
368 }
369
370 if (needupdate) {
371 if (Eventlogger.ENABLED)
372 Eventlogger.logEventFlush(new ViewEvent(Merge, view));
373 if (hasLocalCoordinator(mdata)) {
374 if (log.isDebugEnabled())
375 log.debug("I'm merging coordinator");
376 try {
377 OutputStream stream = multicastService.getMessage(PROTOCOL_NAME);
378 MarshalOutputStream msg = new MarshalOutputStream(stream);
379 int[] owners = getOwners(mdata);
380 int[] dests = getDests(mdata);
381 MemberId[] destIds = new MemberId[dests.length];
382 for (int j = 0; j < dests.length; j++)
383 destIds[j] = members[dests[j]];
384
385 msg.writeInt(owners.length);
386 for (int j=0; j < owners.length; j++)
387 msg.writeInt(owners[j]);
388
389 msg.writeInt(dests.length);
390 for (int j=0; j < dests.length; j++)
391 msg.writeInt(dests[j]);
392
393 try {
394 int mergingSize = mergingListeners.size();
395 msg.writeInt(mergingSize);
396 if (log.isDebugEnabled())
397 log.debug("Number of states to get (mergingListeners): " + mergingSize);
398 for (int j = 0; j < mergingSize; j++) {
399 MergingListener mergingListener = (MergingListener) mergingListeners.get(j);
400 msg.writeObject(mergingListener.getState(destIds));
401 }
402 } catch (Exception e) {
403 log.warn("Exception caught when getting state from members", e);
404 }
405 msg.flush();
406 multicastService.mcast(stream, null);
407 } catch(Exception e) {
408 log.warn("Exception during view change events of MergingData", e);
409 }
410 }
411 } else {
412
413 notifyView();
414 }
415 }
416
417
418
419
420 public void hasLeft()
421 {
422 for (MembershipListener listener : membershipListeners) {
423 listener.hasLeft();
424 }
425 }
426
427
428
429
430 public void prepareChange()
431 {
432 for (MembershipListener listener : membershipListeners) {
433 listener.prepareChange();
434 }
435 }
436
437
438
439
440
441
442
443
444
445 public String getProtocolName()
446 {
447 return PROTOCOL_NAME;
448 }
449
450
451
452
453 public Object deliverStream(InputStream stream, MemberId sender, int seqNo)
454 {
455 if (log.isDebugEnabled()) {
456 log.debug("MergingLayer: deliverStream from " + sender);
457 log.debug(stream);
458 }
459
460 try {
461 MarshalInputStream in = new MarshalInputStream(stream);
462
463 MemberId[] owners = new MemberId[in.readInt()];
464 for (int i = 0; i < owners.length; i++) {
465 owners[i] = members[in.readInt()];
466 if (log.isDebugEnabled()) {
467 log.debug("owners[" + i + "]: " + owners[i] );
468 }
469
470
471
472
473 if (owners[i].equals(me)) {
474 for (int k = 0; k < members.length; k++) {
475 MemberInfo md = (MemberInfo) table.get(members[k]);
476 md.mset = true;
477 }
478 }
479
480
481 MemberInfo md = (MemberInfo) table.get(owners[i]);
482 md.lset = true;
483 }
484
485 MemberId[] dests = new MemberId[in.readInt()];
486 boolean updateState = false;
487 for (int i = 0; i < dests.length; i++) {
488 dests[i] = members[in.readInt()];
489 if (dests[i].equals(me)) {
490
491 updateState = true;
492 }
493 if (log.isDebugEnabled()) {
494 log.debug("dests[" + i + "]: " + dests[i] );
495 }
496 }
497
498 if (!updateState) {
499 if (log.isDebugEnabled())
500 log.debug("Message not for me; ignoring the putState message");
501 return null;
502 }
503 Object[] objs = new Object[in.readInt()];
504 for (int j = 0; j < objs.length; j++) {
505 objs[j] = in.readObject();
506 if (log.isDebugEnabled())
507 log.debug("Read STATE: " + objs[j]);
508 }
509
510 for (int j = 0, size = mergingListeners.size(); j < size; j++)
511 ((MergingListener) mergingListeners.get(j)).putState(objs[j], dests);
512 } catch (Exception e) {
513
514
515
516
517
518 log.warn("MergingLayer: Unable to unmarshal stream message", e);
519 }
520
521 remainingPutStates--;
522 if (log.isDebugEnabled())
523 log.debug("Remaining put state calls: " + remainingPutStates);
524 if (remainingPutStates == 0) {
525
526 notifyView();
527 }
528 return null;
529 }
530
531
532
533
534
535 public Object deliverObject(Object msg, MemberId sender, int seqNo)
536 {
537 throw new UnsupportedOperationException();
538 }
539
540
541
542
543
544
545
546
547
548 private void notifyView()
549 {
550 MembershipLayer.notifyViewListeners(view, previousView, membershipListeners);
551 previousView = view;
552 }
553
554
555
556
557 private int[] getOwners(MemberInfo[] mdata)
558 {
559
560 int len = 0;
561 for (int i=0; i < mdata.length; i++)
562 if (mdata[i].lset) len++;
563
564
565 int[] owners = new int[len];
566 len = 0;
567 for (int i=0; i < mdata.length; i++)
568 if (mdata[i].lset) {
569 owners[len] = table.getIndex(mdata[i].id);
570 len++;
571 }
572
573 return owners;
574 }
575
576
577
578
579
580 private int[] getDests(MemberInfo[] mdata)
581 {
582
583 int len = 0;
584 for (int i=0; i < mdata.length; i++)
585 if (!mdata[i].mset && mdata[i].tset) len++;
586
587
588 int[] dests = new int[len];
589 len = 0;
590 for (int i=0; i < mdata.length; i++)
591 if (!mdata[i].mset && mdata[i].tset) {
592 dests[len] = table.getIndex(mdata[i].id);
593 len++;
594 }
595
596 return dests;
597 }
598
599
600
601
602
603
604
605
606 private boolean hasLocalCoordinator(MemberInfo[] mdata)
607 {
608
609
610
611 int min = 0;
612 for (int i=0; i < mdata.length; i++) {
613 if (mdata[i].mset && table.getStatus(mdata[i].id) != MemberTable.CRASHED) {
614 min = i;
615 break;
616 }
617 }
618 if (log.isDebugEnabled())
619 log.debug("Elected Coordinator: " + mdata[min].id);
620 return (mdata[min].id.equals(me));
621 }
622
623
624 private class MemberInfo
625 {
626
627 boolean lset;
628
629
630 boolean mset;
631
632
633 boolean tset;
634
635 MemberId id;
636
637 MemberInfo(MemberId id, boolean set)
638 {
639 this.id = id;
640 lset = set;
641 mset = set;
642 tset = set;
643 }
644
645 @Override
646 public String toString()
647 {
648 StringBuilder buf = new StringBuilder("MergeInfo: ");
649 buf.append(table.toString(id));
650 buf.append(": mset=");
651 buf.append(mset);
652 buf.append(", lset=");
653 buf.append(lset);
654 buf.append(", tset=");
655 buf.append(tset);
656 return buf.toString();
657 }
658
659 }
660
661 }