View Javadoc

1   /*
2    * Copyright (c) 2003 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  
20  package jgroup.relacs.gm;
21  
22  import java.lang.reflect.Method;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import jgroup.core.JgroupException;
31  import jgroup.core.MemberId;
32  import jgroup.core.MemberTable;
33  import jgroup.core.MembershipListener;
34  import jgroup.core.MembershipService;
35  import jgroup.core.View;
36  import jgroup.core.MembershipListener.AllowDuplicateViews;
37  import jgroup.relacs.config.AppConfig;
38  import jgroup.relacs.daemon.DaemonInteraction;
39  import jgroup.relacs.events.DaemonFailureEvent;
40  import jgroup.relacs.events.Event;
41  import jgroup.relacs.events.EventTags;
42  import jgroup.relacs.events.InstallAck;
43  import jgroup.relacs.events.InstallEvent;
44  import jgroup.relacs.events.JoinRequest;
45  import jgroup.relacs.events.LeaveRequest;
46  import jgroup.relacs.events.MemberLeftEvent;
47  import jgroup.relacs.events.PrepareAck;
48  import jgroup.relacs.events.PrepareEvent;
49  
50  import org.apache.log4j.Logger;
51  import org.apache.log4j.MDC;
52  
53  /**
54   *  The <code>MembershipLayer</code> class implements the group membership
55   *  service interface and provide membership events to listeners.
56   *
57   *  @author Alberto Montresor
58   *  @author Hein Meling
59   *  @since Jgroup 2.2
60   */
61  public final class MembershipLayer
62    implements MembershipService, DispatcherListener, EventTags
63  {
64  
65    ////////////////////////////////////////////////////////////////////////////////////////////
66    // Logger
67    ////////////////////////////////////////////////////////////////////////////////////////////
68  
69    /** Obtain logger for this class */
70    private final static Logger log = Logger.getLogger(MembershipLayer.class);
71  
72  
73    ////////////////////////////////////////////////////////////////////////////////////////////
74    // Constants
75    ////////////////////////////////////////////////////////////////////////////////////////////
76  
77    /** Used in status to identify that no group is joined at the moment */ 
78    private static final int S_UNUSED = 0;
79    
80    /** Used in status to identify that the member is joining a group */
81    private static final int S_JOINING = 1;
82    
83    /** Used in status to identify that the member is member of a group */
84    private static final int S_NORMAL = 2;
85    
86    /** Used in status to identify that the member is leaving a group */
87    private static final int S_LEAVING = 3;
88  
89  
90    ////////////////////////////////////////////////////////////////////////////////////////////
91    // Information about the group and other objects used to provide group membership
92    ////////////////////////////////////////////////////////////////////////////////////////////
93  
94    /** The lock preventing concurrent event handling */
95    private final Lock lock = new ReentrantLock();
96  
97    /** Condition for awaiting the group to be joined */
98    private final Condition joined = lock.newCondition();
99  
100   /** Condition for awaiting the handling of some event */
101   private final Condition eventHandlingCompleted = lock.newCondition();
102 
103   /** Condition variable for enabling/disabling event handling */
104   private boolean handlingEvent = false;
105 
106   /** Member identifier */
107   private final MemberId me;
108 
109   /** Dispatcher layer */
110   private final DispatcherService dispatcher;
111 
112   /** MemberTable provided to upper layers */
113   private final MemberTable membertable;
114   
115   /**
116    *  Dynamic list of membership listeners.  Note that the ordering of
117    *  these listeners are important.
118    */
119   private final List<MembershipListener> membershipListeners =
120     new ArrayList<MembershipListener>();
121 
122   /** Current status of membership */
123   private volatile int status;
124 
125   /** Group identifier */
126   private int gid;
127 
128   /**
129    *  Position index of this member in the array containing
130    *  the members of the current view.
131    */
132   private int viewIndex;
133 
134   /**
135    *  Position index of the local host in the array 
136    *  containing the hosts of the current view.
137    */
138   private int hostIndex;
139   
140   /**
141    *  Position index of this member in the array containing 
142    *  the local members of the current view.
143    */
144   private int memberIndex;
145 
146   /**
147    *  The number of members of this group.
148    */
149   private int numMembers;
150 
151   /**
152    * Field holding the previous view installed; it is used to suppress identical views
153    * so that the application do not see them.  Duplicate suppression is the default.
154    * If the <code>viewChange()</code> method is annotated with the
155    * <code>@AllowDuplicateViews</code> marker, then applications or layers will
156    * receive duplicate views.  This is sometimes necessary, when the
157    * <code>prepareChange()</code> method is used to block some processing; and the
158    * <code>viewChange()</code> is used to unblock things again.
159    */
160   private View previousView = null;
161 
162 
163   ////////////////////////////////////////////////////////////////////////////////////////////
164   // Constructor
165   ////////////////////////////////////////////////////////////////////////////////////////////
166 
167   /**
168    *  Initialize a new <code>BaseLayer</code> object using information
169    *  contained in the configuration objects.
170    *
171    *  @param dispatcher
172    *    The dispatcher service.
173    *  @exception JgroupException
174    *    Raised if the distributed system configuration was not available.
175    */
176   private MembershipLayer(DispatcherService dispatcher)
177     throws JgroupException
178   {
179     /* Stores initialization information */
180     this.dispatcher = dispatcher;
181 
182     /*
183      * Initialize the Jgroup daemon, either locally or remotely
184      * depending on the configuration.
185      */
186     DaemonInteraction.initDaemon();
187 
188     /* Obtain an identifier for this member */
189     me = DaemonInteraction.getMemberId();
190 
191     /* Initialize local data structures */
192     membertable = new MemberTable();
193     status = S_UNUSED;
194   }
195 
196 
197   ////////////////////////////////////////////////////////////////////////////////////////////
198   // Static factory
199   ////////////////////////////////////////////////////////////////////////////////////////////
200 
201   public static MembershipLayer getLayer(DispatcherService dispatcher)
202     throws JgroupException
203   {
204     return new MembershipLayer(dispatcher);
205   }
206 
207 
208   ////////////////////////////////////////////////////////////////////////////////////////////
209   // Layer interface methods
210   ////////////////////////////////////////////////////////////////////////////////////////////
211 
212   /**
213    *  Add a listener object for this layer to provide upcalls to,
214    *  in response to membership events.
215    * 
216    *  @see jgroup.core.Layer#addListener(java.lang.Object)
217    */
218   public void addListener(Object listener)
219   {
220     if (listener instanceof MembershipListener && !membershipListeners.contains(listener))
221       membershipListeners.add((MembershipListener)listener);
222   }
223 
224 
225   ////////////////////////////////////////////////////////////////////////////////////////////
226   // FinalizeLayer interface methods (invoked to complete the group manager construction)
227   ////////////////////////////////////////////////////////////////////////////////////////////
228 
229   /**
230    *  Post initialization of the <code>MembershipLayer</code>; invoked once
231    *  all group manager layers have been constructed.  This method is defined
232    *  in the <code>FinalizeLayer</code> inherited through the
233    *  <code>MembershipSerivce</code>. <p>
234    * 
235    *  This method is used to determine if the provided server should join the
236    *  group automatically, as soon as the group manager layer stack has been
237    *  completed.  By default, automatic join is disabled.  To enable auto join
238    *  you must configure it in the <code>applications.xml</code> file. 
239    */
240   public void complete(Object server)
241     throws JgroupException
242   {
243     /* Check if the server application has auto join enabled */
244     AppConfig thisApp = AppConfig.getApplication(server);
245     if (thisApp != null && thisApp.getBooleanParam("PGMS.auto")) {
246       int gid = thisApp.getGroupId();
247       if (log.isDebugEnabled())
248         log.debug("Automatic join enabled for group: " + gid);
249       /* Join the application group */
250       join(gid);
251     }
252   }
253 
254 
255   ////////////////////////////////////////////////////////////////////////////////////////////
256   // Methods from MembershipService
257   ////////////////////////////////////////////////////////////////////////////////////////////
258 
259   /* (non-Javadoc)
260    * @see jgroup.core.MembershipService#join()
261    */
262   public void join()
263     throws JgroupException
264   {
265     join1(-1);
266   }
267 
268   /* (non-Javadoc)
269    *  @see jgroup.core.MembershipService#join(int)
270    */
271   public void join(int gid)
272     throws JgroupException
273   {
274     if (gid < 0)
275       throw new IllegalArgumentException("Negative group identifier: " + gid);
276     join1(gid);
277   }
278 
279   /**
280    *  The actual implementation of join.
281    *
282    *  @param gid
283    *    Non-negative integer identifying with the group
284    *  @exception JgroupException
285    *    Raised if the group manager receives a request to join a group
286    *    for which it is already a member.
287    */
288   private void join1(int gid)
289     throws JgroupException
290   {
291     if (log.isDebugEnabled())
292       log.debug("Joining: " + gid);
293 
294     // Acquire the lock
295     lock.lock();
296     try {
297       if (status != S_UNUSED)
298         throw new JgroupException("Already member of group " + gid);
299       this.gid = gid;
300       status = S_JOINING;
301 
302       dispatcher.start(gid);
303       JoinRequest joinRequest = new JoinRequest(gid, me, dispatcher.getRemoteDispatcher());
304       DaemonInteraction.addEvent(joinRequest);
305       if (log.isDebugEnabled()) {
306         log.debug("Sent to Daemon: " + joinRequest);
307       }
308 
309       /*
310        * The join method blocks pending a response from the membership service
311        * (the Jgroup daemon).  However, if there is no response within ten
312        * seconds, we throw an exception.
313        */
314       while (status != S_NORMAL) {
315         try {
316           boolean success = joined.await(10, TimeUnit.SECONDS);
317           if (!success)
318             throw new JgroupException("Join timeout elapsed for group " + gid);
319         } catch (InterruptedException e1) { }
320       }
321       if (log.isDebugEnabled())
322         log.debug("Join completed");
323     } finally {
324       // Release the lock
325       lock.unlock();
326     }
327   }
328 
329   /* (non-Javadoc)
330    * @see jgroup.core.MembershipService#leave()
331    */
332   public void leave()
333     throws JgroupException
334   {
335     if (log.isDebugEnabled()) {
336       log.debug("leave");
337     }
338     // Acquire the lock
339     lock.lock();
340     try {
341       if (status == S_UNUSED)
342         throw new JgroupException("The gm is not member of any group");
343       if (status == S_LEAVING)
344         throw new JgroupException("The gm has already received a request to leave the group");
345       status = S_LEAVING;
346     } finally {
347       // Release the lock
348       lock.unlock();
349     }
350     DaemonInteraction.addEvent(new LeaveRequest(gid, me));
351     if (log.isDebugEnabled()) {
352       log.debug("leave completed");
353     }
354   }
355 
356   /* (non-Javadoc)
357    * @see jgroup.core.MembershipService#isMember()
358    */
359   public synchronized boolean isMember()
360   {
361     return (status == S_NORMAL);
362   }
363 
364   /* (non-Javadoc)
365    * @see jgroup.core.MembershipService#isJoining()
366    */
367   public synchronized boolean isJoining()
368   {
369     return (status == S_JOINING);
370   }
371 
372   /* (non-Javadoc)
373    * @see jgroup.core.MembershipService#isMemberOrJoining()
374    */
375   public boolean isMemberOrJoining()
376   {
377     lock.lock();
378     try {
379       return (status == S_NORMAL || status == S_JOINING);
380     } finally {
381       lock.unlock();
382     }
383   }
384 
385   // Not accessible through the MembershipService interface (maybe added later)
386   public synchronized boolean isUnused()
387   {
388     return (status == S_UNUSED);
389   }
390 
391   /* (non-Javadoc)
392    * @see jgroup.core.MembershipService#isLeader()
393    */
394   public synchronized boolean isLeader()
395   {
396     return viewIndex == 0;
397   }
398 
399   /* (non-Javadoc)
400    * @see jgroup.core.MembershipService#getMyIdentifier()
401    */
402   public MemberId getMyIdentifier()
403   {
404     return me;
405   }
406 
407   /* (non-Javadoc)
408    * @see jgroup.core.MembershipService#getGid()
409    */
410   public int getGid()
411   {
412     if (isUnused())
413       throw new IllegalStateException("The membership service has not yet joined a group");
414     return gid;
415   }
416 
417   /* (non-Javadoc)
418    * @see jgroup.core.MembershipService#getMemberTable()
419    */
420   public MemberTable getMemberTable()
421   {
422     return membertable;
423   }
424 
425   /* (non-Javadoc)
426    * @see jgroup.core.MembershipService#members()
427    */
428   public synchronized int members()
429   {
430     return numMembers;
431   }
432 
433   /* (non-Javadoc)
434    * @see jgroup.core.MembershipService#getViewIndex()
435    */
436   public synchronized int getViewIndex()
437   {
438     return viewIndex;
439   }
440 
441   /* (non-Javadoc)
442    * @see jgroup.core.MembershipService#getMemberIndex()
443    */
444   public synchronized int getMemberIndex()
445   {
446     return memberIndex;
447   }
448 
449 
450   ////////////////////////////////////////////////////////////////////////////////////////////
451   // Methods from DispatcherListener
452   ////////////////////////////////////////////////////////////////////////////////////////////
453 
454   /* (non-Javadoc)
455    * @see jgroup.relacs.gm.DispatcherListener#eventTypes()
456    */
457   public int[] eventTypes()
458   {
459     return new int[] {
460       INSTALL_EVENT,
461       PREPARE_EVENT,
462       MEMBER_LEFT_EVENT,
463       DAEMON_FAILURE_EVENT
464     };
465   }
466 
467 
468   /* (non-Javadoc)
469    * @see jgroup.relacs.gm.DispatcherListener#notifyEvent(jgroup.relacs.events.Event)
470    */
471   public void notifyEvent(Event event)
472   {
473     /*
474      * Block waiting for completion of the current event, if any.  This is
475      * so as to serialize the events received from the DispatcherService.
476      * These events are all membership events, and should only be processed
477      * in sequence, while however, the events processed by the MulticastLayer
478      * is not required to be blocked while completing one of these events.
479      */
480     lock.lock();
481     try {
482       while (handlingEvent) {
483         try {
484           eventHandlingCompleted.await();
485         } catch (InterruptedException e) { }
486       }
487       handlingEvent = true;
488     } finally {
489       lock.unlock();
490     }
491 
492     switch (event.getTag()) {
493       case INSTALL_EVENT:
494         handleInstallEvent((InstallEvent) event);
495         break;
496       case PREPARE_EVENT:
497         handlePrepareEvent((PrepareEvent) event);
498         break;
499       case MEMBER_LEFT_EVENT:
500         handleMemberLeftEvent((MemberLeftEvent) event);
501         break;
502       case DAEMON_FAILURE_EVENT:
503         handleDaemonFailureEvent((DaemonFailureEvent) event);
504         break;
505     }
506 
507     /*
508      * Signal the completion of the current event.
509      */
510     lock.lock();
511     try {
512       handlingEvent = false;
513       eventHandlingCompleted.signal();
514     } finally {
515       lock.unlock();
516     }
517   }
518 
519 
520   ////////////////////////////////////////////////////////////////////////////////////////////
521   // Private methods
522   ////////////////////////////////////////////////////////////////////////////////////////////
523 
524   /**
525    *  This method is invoked after a view install event is sent has been
526    *  notified by the daemon.  This method is only partially synchronized
527    *  for updating the state of this object; it makes calls to application
528    *  developed code, and we cannot keep the lock for these calls, as we
529    *  have no control over what the application might do.
530    */
531   private void handleInstallEvent(final InstallEvent event)
532   {
533     if (log.isDebugEnabled()) {
534       MDC.put("group", "[Group: " + event.getGid() + "]");
535       log.debug("handleInstallEvent gid: " + event.getGid());
536     }
537 
538     final View view = event.getView();
539 
540     if (isMemberOrJoining()) {
541       lock.lock();
542       try {
543         // Update view information while holding the lock
544         numMembers = view.size();
545         viewIndex = view.getMemberPosition(me);
546         // FIXME hostIndex is never used.  Do we need it?
547         hostIndex = event.getHostIndex();
548         memberIndex = event.getMemberIndex();
549       } finally {
550         lock.unlock();
551       }
552 
553       // Update member table
554       membertable.viewChange(view);
555 
556       notifyViewListeners(view, previousView, membershipListeners);
557 
558       // Sends ack to daemon
559       InstallAck installAck = new InstallAck(gid, memberIndex);
560       DaemonInteraction.addEvent(installAck);
561 
562       // A few more things needs to be done while we hold the lock
563       lock.lock();
564       try {
565         previousView = view;
566         if (isJoining()) {
567           // Notify the member waiting for join to complete
568           joined.signal();
569         }
570         // Now we can enter the normal membership status
571         status = S_NORMAL;
572       } finally {
573         lock.unlock();
574       }
575     }
576   }
577 
578 
579   /**
580    * Here we check if the new view has the same set of members as the previous view.
581    * If the two views have the same set of members, then the default is to suppress
582    * calls to the <code>listener.viewChange()</code> methods.  If a particular listener
583    * marks the <code>viewChange()</code> method with the <code>@AllowDuplicateViews</code>
584    * marker, then that method will still be called even if the new view is a duplicate
585    * of the pervious.
586    * 
587    * Note that, the Daemon layer could perhaps be improved to generate less such
588    * identical views, however we cannot exclude them entirely since the agreement
589    * protocol will be reexecuted if a member did not yet discover the failure of
590    * an excluded member; that is before the conclusion of the agreement protocol.
591    * 
592    * @param view the new view to be installed
593    * @param prevView the previously installed view to compare against to identify duplicate views
594    * @param listeners the set of listeners
595    */
596   static void notifyViewListeners(final View view, final View prevView,
597       final List<MembershipListener> listeners)
598   {
599     if (view.hasSameMembers(prevView)) {
600       if (log.isDebugEnabled())
601         log.debug("GOT IDENTICAL VIEW");
602       for (MembershipListener listener : listeners) {
603         if (allowDuplicateViewsFor(listener)) {
604           if (log.isDebugEnabled())
605             log.debug("DELIVERING IDENTICAL VIEW TO " + listener.getClass().getSimpleName());
606           notifyListener(view, listener);
607         } else {
608           if (log.isDebugEnabled())
609             log.debug("DISCARDING IDENTICAL VIEW TO " + listener.getClass().getSimpleName());
610         }
611       }
612     } else {
613       for (MembershipListener listener : listeners) {
614         notifyListener(view, listener);
615       }
616     }
617   }
618 
619   /**
620    * Returns true if the given listener should have its <code>viewChange()</code>
621    * method called to deliver duplicate views.  If duplicates should be suppressed
622    * for this listener, then false is returned.  Suppression is the default.
623    */
624   private static boolean allowDuplicateViewsFor(MembershipListener listener)
625   {
626     try {
627       Method m = listener.getClass().getMethod("viewChange", new Class[] { View.class } );
628       return m.isAnnotationPresent(AllowDuplicateViews.class);
629     } catch (NoSuchMethodException e) {
630       e.printStackTrace();
631       log.error("Cannot happen unless MembershipListener.viewChange() has changed its signature", e);
632       return false;
633     }
634   }
635 
636   /**
637    * Notify the give membership listener of the given view.
638    * 
639    * Log any exception trace that may occur when executing code in a
640    * <code>viewChange</code> handler method.  Continue to invoke the
641    * remaining listeners, if an exception occurred.
642    */
643   private static void notifyListener(View view, MembershipListener listener)
644   {
645     try {
646       if (log.isDebugEnabled())
647         log.debug("notifying: " + listener.getClass().getSimpleName());
648       listener.viewChange(view);
649     } catch (Exception e) {
650       e.printStackTrace();
651       log.warn("A viewChange() implementation has thrown an exception", e);
652     }
653   }
654 
655   /**
656    *  This is method is invoked after a daemon has sent a notification
657    *  for a PREPARE event.  The membership listeners are notified about
658    *  this event, and an acknowledgement for it is sent to the daemon.
659    *  
660    *  This method must not be synchronized since it calls external code
661    *  which we do not have control over.  The isMember() and isJoining()
662    *  methods however does perform synchronized access to state variables
663    *  of this class; only those need to be synchronized.
664    */
665   private void handlePrepareEvent(PrepareEvent event)
666   {
667     if (log.isDebugEnabled())
668       log.debug("handlePrepareEvent");
669 
670     if (isMemberOrJoining()) {
671       /*
672        * We should not synchronize on calls to external code such as prepareChange()
673        * since we have no control over what such code might dream up to do.
674        */
675       for (MembershipListener listener : membershipListeners) {
676         try {
677           listener.prepareChange();
678         } catch (Exception e) {
679           e.printStackTrace();
680           log.warn("A prepareChange() implementation has thrown an exception", e);
681         }
682       }
683       // No need to synchronize here either since gid and me never change after join
684       DaemonInteraction.addEvent(new PrepareAck(gid, me));
685     }
686   }
687 
688 
689   /**
690    *  This method is invoked after a daemon has sent a notification that
691    *  the current member has left.  The membership listeners are
692    *  notified and the dispatcher service is halted.
693    */
694   private void handleMemberLeftEvent(MemberLeftEvent event)
695   {
696     if (log.isDebugEnabled())
697       log.debug("handleMemberLeftEvent");
698     if (!isUnused()) {
699       /*
700        * We should not synchronize on calls to external code such as hasLeft()
701        * since we have no control over what such code might dream up to do.
702        */
703       for (MembershipListener listener : membershipListeners) {
704         try {
705           listener.hasLeft();
706         } catch (Exception e) {
707           e.printStackTrace();
708           log.warn("A hasLeft() implementation has thrown an exception", e);
709         }
710       }
711       lock.lock();
712       try {
713         // Mark member as unused
714         status = S_UNUSED;
715       } finally {
716         lock.unlock();
717       }
718     }
719     dispatcher.halt();
720   }
721 
722 
723   /**
724    *  This is invoked when this member detects that the Jgroup daemon has failed.
725    *  We try to rejoin the group on a restarted daemon.
726    */
727   private void handleDaemonFailureEvent(DaemonFailureEvent event)
728   {
729     if (log.isDebugEnabled())
730       log.warn("handleDaemonFailureEvent");
731     lock.lock();
732     try {
733       // Mark member as unused
734       status = S_UNUSED;
735     } finally {
736       lock.unlock();
737     }
738     dispatcher.halt();
739     try {
740       join(gid);
741     } catch (JgroupException e) {
742       log.warn("Cannot recover from daemon failure");
743     }
744   }
745 
746 
747   ////////////////////////////////////////////////////////////////////////////////////////////
748   // Object methods
749   ////////////////////////////////////////////////////////////////////////////////////////////
750 
751   /**
752    *  Returns a string representation of this object; currently only the
753    *  status and group identifier.
754    */
755   public String toString() 
756   {
757     StringBuilder buf = new StringBuilder();
758     buf.append("[MembershipLayer: ");
759     buf.append("groupId=");
760     buf.append(gid);
761     buf.append(", status=");
762     switch (status) {
763       case S_UNUSED:
764         buf.append("UNUSED");
765         break;
766       case S_JOINING:
767         buf.append("JOINING");
768         break;
769       case S_NORMAL:
770         buf.append("MEMBER");
771         break;
772       case S_LEAVING:
773         buf.append("LEAVING");
774         break;
775       default:
776         buf.append("INCORRECT STATUS");
777         break;
778     }
779     buf.append("]");
780     return buf.toString();
781   }
782 
783 } // END MembershipLayer