1 /*
2 * Copyright (c) 2003 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19
20 package jgroup.relacs.gm;
21
22 import java.lang.reflect.Method;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import jgroup.core.JgroupException;
31 import jgroup.core.MemberId;
32 import jgroup.core.MemberTable;
33 import jgroup.core.MembershipListener;
34 import jgroup.core.MembershipService;
35 import jgroup.core.View;
36 import jgroup.core.MembershipListener.AllowDuplicateViews;
37 import jgroup.relacs.config.AppConfig;
38 import jgroup.relacs.daemon.DaemonInteraction;
39 import jgroup.relacs.events.DaemonFailureEvent;
40 import jgroup.relacs.events.Event;
41 import jgroup.relacs.events.EventTags;
42 import jgroup.relacs.events.InstallAck;
43 import jgroup.relacs.events.InstallEvent;
44 import jgroup.relacs.events.JoinRequest;
45 import jgroup.relacs.events.LeaveRequest;
46 import jgroup.relacs.events.MemberLeftEvent;
47 import jgroup.relacs.events.PrepareAck;
48 import jgroup.relacs.events.PrepareEvent;
49
50 import org.apache.log4j.Logger;
51 import org.apache.log4j.MDC;
52
53 /**
54 * The <code>MembershipLayer</code> class implements the group membership
55 * service interface and provide membership events to listeners.
56 *
57 * @author Alberto Montresor
58 * @author Hein Meling
59 * @since Jgroup 2.2
60 */
61 public final class MembershipLayer
62 implements MembershipService, DispatcherListener, EventTags
63 {
64
65 ////////////////////////////////////////////////////////////////////////////////////////////
66 // Logger
67 ////////////////////////////////////////////////////////////////////////////////////////////
68
69 /** Obtain logger for this class */
70 private final static Logger log = Logger.getLogger(MembershipLayer.class);
71
72
73 ////////////////////////////////////////////////////////////////////////////////////////////
74 // Constants
75 ////////////////////////////////////////////////////////////////////////////////////////////
76
77 /** Used in status to identify that no group is joined at the moment */
78 private static final int S_UNUSED = 0;
79
80 /** Used in status to identify that the member is joining a group */
81 private static final int S_JOINING = 1;
82
83 /** Used in status to identify that the member is member of a group */
84 private static final int S_NORMAL = 2;
85
86 /** Used in status to identify that the member is leaving a group */
87 private static final int S_LEAVING = 3;
88
89
90 ////////////////////////////////////////////////////////////////////////////////////////////
91 // Information about the group and other objects used to provide group membership
92 ////////////////////////////////////////////////////////////////////////////////////////////
93
94 /** The lock preventing concurrent event handling */
95 private final Lock lock = new ReentrantLock();
96
97 /** Condition for awaiting the group to be joined */
98 private final Condition joined = lock.newCondition();
99
100 /** Condition for awaiting the handling of some event */
101 private final Condition eventHandlingCompleted = lock.newCondition();
102
103 /** Condition variable for enabling/disabling event handling */
104 private boolean handlingEvent = false;
105
106 /** Member identifier */
107 private final MemberId me;
108
109 /** Dispatcher layer */
110 private final DispatcherService dispatcher;
111
112 /** MemberTable provided to upper layers */
113 private final MemberTable membertable;
114
115 /**
116 * Dynamic list of membership listeners. Note that the ordering of
117 * these listeners are important.
118 */
119 private final List<MembershipListener> membershipListeners =
120 new ArrayList<MembershipListener>();
121
122 /** Current status of membership */
123 private volatile int status;
124
125 /** Group identifier */
126 private int gid;
127
128 /**
129 * Position index of this member in the array containing
130 * the members of the current view.
131 */
132 private int viewIndex;
133
134 /**
135 * Position index of the local host in the array
136 * containing the hosts of the current view.
137 */
138 private int hostIndex;
139
140 /**
141 * Position index of this member in the array containing
142 * the local members of the current view.
143 */
144 private int memberIndex;
145
146 /**
147 * The number of members of this group.
148 */
149 private int numMembers;
150
151 /**
152 * Field holding the previous view installed; it is used to suppress identical views
153 * so that the application do not see them. Duplicate suppression is the default.
154 * If the <code>viewChange()</code> method is annotated with the
155 * <code>@AllowDuplicateViews</code> marker, then applications or layers will
156 * receive duplicate views. This is sometimes necessary, when the
157 * <code>prepareChange()</code> method is used to block some processing; and the
158 * <code>viewChange()</code> is used to unblock things again.
159 */
160 private View previousView = null;
161
162
163 ////////////////////////////////////////////////////////////////////////////////////////////
164 // Constructor
165 ////////////////////////////////////////////////////////////////////////////////////////////
166
167 /**
168 * Initialize a new <code>BaseLayer</code> object using information
169 * contained in the configuration objects.
170 *
171 * @param dispatcher
172 * The dispatcher service.
173 * @exception JgroupException
174 * Raised if the distributed system configuration was not available.
175 */
176 private MembershipLayer(DispatcherService dispatcher)
177 throws JgroupException
178 {
179 /* Stores initialization information */
180 this.dispatcher = dispatcher;
181
182 /*
183 * Initialize the Jgroup daemon, either locally or remotely
184 * depending on the configuration.
185 */
186 DaemonInteraction.initDaemon();
187
188 /* Obtain an identifier for this member */
189 me = DaemonInteraction.getMemberId();
190
191 /* Initialize local data structures */
192 membertable = new MemberTable();
193 status = S_UNUSED;
194 }
195
196
197 ////////////////////////////////////////////////////////////////////////////////////////////
198 // Static factory
199 ////////////////////////////////////////////////////////////////////////////////////////////
200
201 public static MembershipLayer getLayer(DispatcherService dispatcher)
202 throws JgroupException
203 {
204 return new MembershipLayer(dispatcher);
205 }
206
207
208 ////////////////////////////////////////////////////////////////////////////////////////////
209 // Layer interface methods
210 ////////////////////////////////////////////////////////////////////////////////////////////
211
212 /**
213 * Add a listener object for this layer to provide upcalls to,
214 * in response to membership events.
215 *
216 * @see jgroup.core.Layer#addListener(java.lang.Object)
217 */
218 public void addListener(Object listener)
219 {
220 if (listener instanceof MembershipListener && !membershipListeners.contains(listener))
221 membershipListeners.add((MembershipListener)listener);
222 }
223
224
225 ////////////////////////////////////////////////////////////////////////////////////////////
226 // FinalizeLayer interface methods (invoked to complete the group manager construction)
227 ////////////////////////////////////////////////////////////////////////////////////////////
228
229 /**
230 * Post initialization of the <code>MembershipLayer</code>; invoked once
231 * all group manager layers have been constructed. This method is defined
232 * in the <code>FinalizeLayer</code> inherited through the
233 * <code>MembershipSerivce</code>. <p>
234 *
235 * This method is used to determine if the provided server should join the
236 * group automatically, as soon as the group manager layer stack has been
237 * completed. By default, automatic join is disabled. To enable auto join
238 * you must configure it in the <code>applications.xml</code> file.
239 */
240 public void complete(Object server)
241 throws JgroupException
242 {
243 /* Check if the server application has auto join enabled */
244 AppConfig thisApp = AppConfig.getApplication(server);
245 if (thisApp != null && thisApp.getBooleanParam("PGMS.auto")) {
246 int gid = thisApp.getGroupId();
247 if (log.isDebugEnabled())
248 log.debug("Automatic join enabled for group: " + gid);
249 /* Join the application group */
250 join(gid);
251 }
252 }
253
254
255 ////////////////////////////////////////////////////////////////////////////////////////////
256 // Methods from MembershipService
257 ////////////////////////////////////////////////////////////////////////////////////////////
258
259 /* (non-Javadoc)
260 * @see jgroup.core.MembershipService#join()
261 */
262 public void join()
263 throws JgroupException
264 {
265 join1(-1);
266 }
267
268 /* (non-Javadoc)
269 * @see jgroup.core.MembershipService#join(int)
270 */
271 public void join(int gid)
272 throws JgroupException
273 {
274 if (gid < 0)
275 throw new IllegalArgumentException("Negative group identifier: " + gid);
276 join1(gid);
277 }
278
279 /**
280 * The actual implementation of join.
281 *
282 * @param gid
283 * Non-negative integer identifying with the group
284 * @exception JgroupException
285 * Raised if the group manager receives a request to join a group
286 * for which it is already a member.
287 */
288 private void join1(int gid)
289 throws JgroupException
290 {
291 if (log.isDebugEnabled())
292 log.debug("Joining: " + gid);
293
294 // Acquire the lock
295 lock.lock();
296 try {
297 if (status != S_UNUSED)
298 throw new JgroupException("Already member of group " + gid);
299 this.gid = gid;
300 status = S_JOINING;
301
302 dispatcher.start(gid);
303 JoinRequest joinRequest = new JoinRequest(gid, me, dispatcher.getRemoteDispatcher());
304 DaemonInteraction.addEvent(joinRequest);
305 if (log.isDebugEnabled()) {
306 log.debug("Sent to Daemon: " + joinRequest);
307 }
308
309 /*
310 * The join method blocks pending a response from the membership service
311 * (the Jgroup daemon). However, if there is no response within ten
312 * seconds, we throw an exception.
313 */
314 while (status != S_NORMAL) {
315 try {
316 boolean success = joined.await(10, TimeUnit.SECONDS);
317 if (!success)
318 throw new JgroupException("Join timeout elapsed for group " + gid);
319 } catch (InterruptedException e1) { }
320 }
321 if (log.isDebugEnabled())
322 log.debug("Join completed");
323 } finally {
324 // Release the lock
325 lock.unlock();
326 }
327 }
328
329 /* (non-Javadoc)
330 * @see jgroup.core.MembershipService#leave()
331 */
332 public void leave()
333 throws JgroupException
334 {
335 if (log.isDebugEnabled()) {
336 log.debug("leave");
337 }
338 // Acquire the lock
339 lock.lock();
340 try {
341 if (status == S_UNUSED)
342 throw new JgroupException("The gm is not member of any group");
343 if (status == S_LEAVING)
344 throw new JgroupException("The gm has already received a request to leave the group");
345 status = S_LEAVING;
346 } finally {
347 // Release the lock
348 lock.unlock();
349 }
350 DaemonInteraction.addEvent(new LeaveRequest(gid, me));
351 if (log.isDebugEnabled()) {
352 log.debug("leave completed");
353 }
354 }
355
356 /* (non-Javadoc)
357 * @see jgroup.core.MembershipService#isMember()
358 */
359 public synchronized boolean isMember()
360 {
361 return (status == S_NORMAL);
362 }
363
364 /* (non-Javadoc)
365 * @see jgroup.core.MembershipService#isJoining()
366 */
367 public synchronized boolean isJoining()
368 {
369 return (status == S_JOINING);
370 }
371
372 /* (non-Javadoc)
373 * @see jgroup.core.MembershipService#isMemberOrJoining()
374 */
375 public boolean isMemberOrJoining()
376 {
377 lock.lock();
378 try {
379 return (status == S_NORMAL || status == S_JOINING);
380 } finally {
381 lock.unlock();
382 }
383 }
384
385 // Not accessible through the MembershipService interface (maybe added later)
386 public synchronized boolean isUnused()
387 {
388 return (status == S_UNUSED);
389 }
390
391 /* (non-Javadoc)
392 * @see jgroup.core.MembershipService#isLeader()
393 */
394 public synchronized boolean isLeader()
395 {
396 return viewIndex == 0;
397 }
398
399 /* (non-Javadoc)
400 * @see jgroup.core.MembershipService#getMyIdentifier()
401 */
402 public MemberId getMyIdentifier()
403 {
404 return me;
405 }
406
407 /* (non-Javadoc)
408 * @see jgroup.core.MembershipService#getGid()
409 */
410 public int getGid()
411 {
412 if (isUnused())
413 throw new IllegalStateException("The membership service has not yet joined a group");
414 return gid;
415 }
416
417 /* (non-Javadoc)
418 * @see jgroup.core.MembershipService#getMemberTable()
419 */
420 public MemberTable getMemberTable()
421 {
422 return membertable;
423 }
424
425 /* (non-Javadoc)
426 * @see jgroup.core.MembershipService#members()
427 */
428 public synchronized int members()
429 {
430 return numMembers;
431 }
432
433 /* (non-Javadoc)
434 * @see jgroup.core.MembershipService#getViewIndex()
435 */
436 public synchronized int getViewIndex()
437 {
438 return viewIndex;
439 }
440
441 /* (non-Javadoc)
442 * @see jgroup.core.MembershipService#getMemberIndex()
443 */
444 public synchronized int getMemberIndex()
445 {
446 return memberIndex;
447 }
448
449
450 ////////////////////////////////////////////////////////////////////////////////////////////
451 // Methods from DispatcherListener
452 ////////////////////////////////////////////////////////////////////////////////////////////
453
454 /* (non-Javadoc)
455 * @see jgroup.relacs.gm.DispatcherListener#eventTypes()
456 */
457 public int[] eventTypes()
458 {
459 return new int[] {
460 INSTALL_EVENT,
461 PREPARE_EVENT,
462 MEMBER_LEFT_EVENT,
463 DAEMON_FAILURE_EVENT
464 };
465 }
466
467
468 /* (non-Javadoc)
469 * @see jgroup.relacs.gm.DispatcherListener#notifyEvent(jgroup.relacs.events.Event)
470 */
471 public void notifyEvent(Event event)
472 {
473 /*
474 * Block waiting for completion of the current event, if any. This is
475 * so as to serialize the events received from the DispatcherService.
476 * These events are all membership events, and should only be processed
477 * in sequence, while however, the events processed by the MulticastLayer
478 * is not required to be blocked while completing one of these events.
479 */
480 lock.lock();
481 try {
482 while (handlingEvent) {
483 try {
484 eventHandlingCompleted.await();
485 } catch (InterruptedException e) { }
486 }
487 handlingEvent = true;
488 } finally {
489 lock.unlock();
490 }
491
492 switch (event.getTag()) {
493 case INSTALL_EVENT:
494 handleInstallEvent((InstallEvent) event);
495 break;
496 case PREPARE_EVENT:
497 handlePrepareEvent((PrepareEvent) event);
498 break;
499 case MEMBER_LEFT_EVENT:
500 handleMemberLeftEvent((MemberLeftEvent) event);
501 break;
502 case DAEMON_FAILURE_EVENT:
503 handleDaemonFailureEvent((DaemonFailureEvent) event);
504 break;
505 }
506
507 /*
508 * Signal the completion of the current event.
509 */
510 lock.lock();
511 try {
512 handlingEvent = false;
513 eventHandlingCompleted.signal();
514 } finally {
515 lock.unlock();
516 }
517 }
518
519
520 ////////////////////////////////////////////////////////////////////////////////////////////
521 // Private methods
522 ////////////////////////////////////////////////////////////////////////////////////////////
523
524 /**
525 * This method is invoked after a view install event is sent has been
526 * notified by the daemon. This method is only partially synchronized
527 * for updating the state of this object; it makes calls to application
528 * developed code, and we cannot keep the lock for these calls, as we
529 * have no control over what the application might do.
530 */
531 private void handleInstallEvent(final InstallEvent event)
532 {
533 if (log.isDebugEnabled()) {
534 MDC.put("group", "[Group: " + event.getGid() + "]");
535 log.debug("handleInstallEvent gid: " + event.getGid());
536 }
537
538 final View view = event.getView();
539
540 if (isMemberOrJoining()) {
541 lock.lock();
542 try {
543 // Update view information while holding the lock
544 numMembers = view.size();
545 viewIndex = view.getMemberPosition(me);
546 // FIXME hostIndex is never used. Do we need it?
547 hostIndex = event.getHostIndex();
548 memberIndex = event.getMemberIndex();
549 } finally {
550 lock.unlock();
551 }
552
553 // Update member table
554 membertable.viewChange(view);
555
556 notifyViewListeners(view, previousView, membershipListeners);
557
558 // Sends ack to daemon
559 InstallAck installAck = new InstallAck(gid, memberIndex);
560 DaemonInteraction.addEvent(installAck);
561
562 // A few more things needs to be done while we hold the lock
563 lock.lock();
564 try {
565 previousView = view;
566 if (isJoining()) {
567 // Notify the member waiting for join to complete
568 joined.signal();
569 }
570 // Now we can enter the normal membership status
571 status = S_NORMAL;
572 } finally {
573 lock.unlock();
574 }
575 }
576 }
577
578
579 /**
580 * Here we check if the new view has the same set of members as the previous view.
581 * If the two views have the same set of members, then the default is to suppress
582 * calls to the <code>listener.viewChange()</code> methods. If a particular listener
583 * marks the <code>viewChange()</code> method with the <code>@AllowDuplicateViews</code>
584 * marker, then that method will still be called even if the new view is a duplicate
585 * of the pervious.
586 *
587 * Note that, the Daemon layer could perhaps be improved to generate less such
588 * identical views, however we cannot exclude them entirely since the agreement
589 * protocol will be reexecuted if a member did not yet discover the failure of
590 * an excluded member; that is before the conclusion of the agreement protocol.
591 *
592 * @param view the new view to be installed
593 * @param prevView the previously installed view to compare against to identify duplicate views
594 * @param listeners the set of listeners
595 */
596 static void notifyViewListeners(final View view, final View prevView,
597 final List<MembershipListener> listeners)
598 {
599 if (view.hasSameMembers(prevView)) {
600 if (log.isDebugEnabled())
601 log.debug("GOT IDENTICAL VIEW");
602 for (MembershipListener listener : listeners) {
603 if (allowDuplicateViewsFor(listener)) {
604 if (log.isDebugEnabled())
605 log.debug("DELIVERING IDENTICAL VIEW TO " + listener.getClass().getSimpleName());
606 notifyListener(view, listener);
607 } else {
608 if (log.isDebugEnabled())
609 log.debug("DISCARDING IDENTICAL VIEW TO " + listener.getClass().getSimpleName());
610 }
611 }
612 } else {
613 for (MembershipListener listener : listeners) {
614 notifyListener(view, listener);
615 }
616 }
617 }
618
619 /**
620 * Returns true if the given listener should have its <code>viewChange()</code>
621 * method called to deliver duplicate views. If duplicates should be suppressed
622 * for this listener, then false is returned. Suppression is the default.
623 */
624 private static boolean allowDuplicateViewsFor(MembershipListener listener)
625 {
626 try {
627 Method m = listener.getClass().getMethod("viewChange", new Class[] { View.class } );
628 return m.isAnnotationPresent(AllowDuplicateViews.class);
629 } catch (NoSuchMethodException e) {
630 e.printStackTrace();
631 log.error("Cannot happen unless MembershipListener.viewChange() has changed its signature", e);
632 return false;
633 }
634 }
635
636 /**
637 * Notify the give membership listener of the given view.
638 *
639 * Log any exception trace that may occur when executing code in a
640 * <code>viewChange</code> handler method. Continue to invoke the
641 * remaining listeners, if an exception occurred.
642 */
643 private static void notifyListener(View view, MembershipListener listener)
644 {
645 try {
646 if (log.isDebugEnabled())
647 log.debug("notifying: " + listener.getClass().getSimpleName());
648 listener.viewChange(view);
649 } catch (Exception e) {
650 e.printStackTrace();
651 log.warn("A viewChange() implementation has thrown an exception", e);
652 }
653 }
654
655 /**
656 * This is method is invoked after a daemon has sent a notification
657 * for a PREPARE event. The membership listeners are notified about
658 * this event, and an acknowledgement for it is sent to the daemon.
659 *
660 * This method must not be synchronized since it calls external code
661 * which we do not have control over. The isMember() and isJoining()
662 * methods however does perform synchronized access to state variables
663 * of this class; only those need to be synchronized.
664 */
665 private void handlePrepareEvent(PrepareEvent event)
666 {
667 if (log.isDebugEnabled())
668 log.debug("handlePrepareEvent");
669
670 if (isMemberOrJoining()) {
671 /*
672 * We should not synchronize on calls to external code such as prepareChange()
673 * since we have no control over what such code might dream up to do.
674 */
675 for (MembershipListener listener : membershipListeners) {
676 try {
677 listener.prepareChange();
678 } catch (Exception e) {
679 e.printStackTrace();
680 log.warn("A prepareChange() implementation has thrown an exception", e);
681 }
682 }
683 // No need to synchronize here either since gid and me never change after join
684 DaemonInteraction.addEvent(new PrepareAck(gid, me));
685 }
686 }
687
688
689 /**
690 * This method is invoked after a daemon has sent a notification that
691 * the current member has left. The membership listeners are
692 * notified and the dispatcher service is halted.
693 */
694 private void handleMemberLeftEvent(MemberLeftEvent event)
695 {
696 if (log.isDebugEnabled())
697 log.debug("handleMemberLeftEvent");
698 if (!isUnused()) {
699 /*
700 * We should not synchronize on calls to external code such as hasLeft()
701 * since we have no control over what such code might dream up to do.
702 */
703 for (MembershipListener listener : membershipListeners) {
704 try {
705 listener.hasLeft();
706 } catch (Exception e) {
707 e.printStackTrace();
708 log.warn("A hasLeft() implementation has thrown an exception", e);
709 }
710 }
711 lock.lock();
712 try {
713 // Mark member as unused
714 status = S_UNUSED;
715 } finally {
716 lock.unlock();
717 }
718 }
719 dispatcher.halt();
720 }
721
722
723 /**
724 * This is invoked when this member detects that the Jgroup daemon has failed.
725 * We try to rejoin the group on a restarted daemon.
726 */
727 private void handleDaemonFailureEvent(DaemonFailureEvent event)
728 {
729 if (log.isDebugEnabled())
730 log.warn("handleDaemonFailureEvent");
731 lock.lock();
732 try {
733 // Mark member as unused
734 status = S_UNUSED;
735 } finally {
736 lock.unlock();
737 }
738 dispatcher.halt();
739 try {
740 join(gid);
741 } catch (JgroupException e) {
742 log.warn("Cannot recover from daemon failure");
743 }
744 }
745
746
747 ////////////////////////////////////////////////////////////////////////////////////////////
748 // Object methods
749 ////////////////////////////////////////////////////////////////////////////////////////////
750
751 /**
752 * Returns a string representation of this object; currently only the
753 * status and group identifier.
754 */
755 public String toString()
756 {
757 StringBuilder buf = new StringBuilder();
758 buf.append("[MembershipLayer: ");
759 buf.append("groupId=");
760 buf.append(gid);
761 buf.append(", status=");
762 switch (status) {
763 case S_UNUSED:
764 buf.append("UNUSED");
765 break;
766 case S_JOINING:
767 buf.append("JOINING");
768 break;
769 case S_NORMAL:
770 buf.append("MEMBER");
771 break;
772 case S_LEAVING:
773 buf.append("LEAVING");
774 break;
775 default:
776 buf.append("INCORRECT STATUS");
777 break;
778 }
779 buf.append("]");
780 return buf.toString();
781 }
782
783 } // END MembershipLayer