View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.arm;
20  
21  import static jgroup.util.log.ReplicaEvent.Type.ForcedRemove;
22  import static jgroup.util.log.ReplicaEvent.Type.Leaving;
23  import static jgroup.util.log.ViewEvent.Type.Server;
24  
25  import java.rmi.RemoteException;
26  import java.util.Timer;
27  import java.util.TimerTask;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import jgroup.core.JgroupException;
32  import jgroup.core.MemberId;
33  import jgroup.core.MembershipListener;
34  import jgroup.core.MembershipService;
35  import jgroup.core.View;
36  import jgroup.core.arm.RecoveryService;
37  import jgroup.core.arm.ReplicationManager;
38  import jgroup.core.arm.ShutdownListener;
39  import jgroup.core.registry.DependableRegistry;
40  import jgroup.core.registry.RegistryFactory;
41  import jgroup.relacs.config.AppConfig;
42  import jgroup.relacs.config.DistributedSystemConfig;
43  import jgroup.relacs.config.Domain;
44  import jgroup.relacs.config.DomainSet;
45  import jgroup.relacs.config.Host;
46  import jgroup.util.Util;
47  import jgroup.util.log.Eventlogger;
48  import jgroup.util.log.ReplicaEvent;
49  import jgroup.util.log.ViewEvent;
50  
51  import org.apache.log4j.Logger;
52  
53  /**
54   *  Recovery layer for providing the replication manager with view
55   *  change events from groups assoicated with the replication manager.
56   *
57   *  Note that the <code>RecoveryListener</code> does not provide any
58   *  upcalls (it is only a marker interface), thus to listen for view
59   *  change events, the replica should implement the
60   *  <code>MembershipListener</code>.  A server using the recovery layer
61   *  need not implement the <code>RecoveryListener</code>, instead it
62   *  simply needs to specify "Recovery" in the layer stack.
63   *
64   *  @author Hein Meling
65   *  @since Jgroup 1.2
66   */
67  public class RecoveryLayer
68    implements RecoveryService, MembershipListener
69  { 
70  
71    ////////////////////////////////////////////////////////////////////////////////////////////
72    // Logger
73    ////////////////////////////////////////////////////////////////////////////////////////////
74  
75    /** Obtain logger for this class */
76    private static final Logger log = Logger.getLogger(RecoveryLayer.class);
77  
78  
79    ////////////////////////////////////////////////////////////////////////////////////////////
80    // Constants
81    ////////////////////////////////////////////////////////////////////////////////////////////
82  
83    /** The default delay before removing a replica */
84    private static final int DEFAULT_REMOVE_DELAY   = 2000;
85  
86    /** The default max time before canceling a removal */
87    private static final int DEFAULT_REMOVE_LATENCY = 5000;
88  
89  
90    ////////////////////////////////////////////////////////////////////////////////////////////
91    // Fields
92    ////////////////////////////////////////////////////////////////////////////////////////////
93  
94    /** Underlying group membership service. */
95    private MembershipService membershipService;
96  
97    /**
98     *  The shutdown listener; note that the server (listener) is not required
99     *  to implement the ShutdownListener interface, but if it does this field
100    *  will be set to the implementor allowing us to invoke the shutdown method.
101    */
102   private ShutdownListener shutdownListener = null;
103 
104   /** Reference to the replication manager. */
105   private ReplicationManager replicaManager = null;
106 
107   /** Application info object for the replication manager. */
108   private AppConfig rmApp;
109 
110   /** Application info object for the server object associated with this group manager. */
111   private AppConfig thisApp;
112 
113   /** My member identifier. */
114   private MemberId thisMember = null;
115 
116   /** This applications group identifier */
117   private int gid;
118 
119   /** The local host */
120   private Host localHost;
121 
122   /** Timer used to delay the remove task when new views keep arriving */
123   private Timer timer = null;
124 
125   /** The delay before removing a replica after it has been elected */
126   private int removeDelay;
127 
128   /**
129    * The largest time allowed between canceling view changes; that is, the
130    * amount of time to wait before starting the remove replica task.  If a
131    * view change arrive before the timer expires, the timer will be canceled
132    * and a new one instansiated.
133    */
134   private int removeLatency;
135 
136   /** 
137    * Timer used to periodically renew the lease for this application.
138    */
139   private Timer livenessTimer = null;
140 
141   /** Lock object for the exclusive access to notify the RM */
142   private final Lock lock = new ReentrantLock();
143 
144 
145   ////////////////////////////////////////////////////////////////////////////////////////////
146   // Constructor
147   ////////////////////////////////////////////////////////////////////////////////////////////
148 
149   /**
150    *
151    */
152   private RecoveryLayer(MembershipService pgms)
153     throws JgroupException, RemoteException
154   {
155     membershipService = pgms;
156     thisMember = membershipService.getMyIdentifier();
157 
158     /* Obtain the replication manager application object */
159     rmApp = AppConfig.getApplication(ReplicaManagerImpl.class);
160     localHost = DistributedSystemConfig.getLocalHost();
161   }
162 
163 
164   ////////////////////////////////////////////////////////////////////////////////////////////
165   // Static factory
166   ////////////////////////////////////////////////////////////////////////////////////////////
167 
168   public static RecoveryLayer getLayer(MembershipService pgms)
169     throws JgroupException, RemoteException
170   {
171     return new RecoveryLayer(pgms);
172   }
173 
174 
175   ////////////////////////////////////////////////////////////////////////////////////////////
176   // Layer interface methods (inherited from RecoveryService interface)
177   ////////////////////////////////////////////////////////////////////////////////////////////
178 
179   /**
180    *  Add a listener for membership events.
181    * 
182    *  Since this layer sends view change events to the replication
183    *  manager, we must provide special handling to use this layer
184    *  from the replication manager.  The replication manager must
185    *  provide self-recovery through its local view changes.  This
186    *  layer is used by the replication manager to remove excessive
187    *  replicas from the RM group.
188    */
189   public void addListener(Object listener)
190   {
191     if (listener == null)
192       throw new NullPointerException("No replica specified for the RecoveryLayer");
193     if (listener instanceof DependableRegistry)
194       throw new IllegalArgumentException("RecoveryLayer cannot be used by the dependable registry");
195     if (listener instanceof ShutdownListener)
196       shutdownListener = (ShutdownListener) listener;
197     String className = listener.getClass().getName();
198     thisApp = AppConfig.getApplication(className);
199     gid = thisApp.getGroupId();
200     removeLatency = thisApp.getIntParam("Recovery.RemoveLatency", DEFAULT_REMOVE_LATENCY);
201     removeDelay = thisApp.getIntParam("Recovery.RemoveDelay", DEFAULT_REMOVE_DELAY);
202 
203     /* Check if this is the server is the replication manager or not. */
204     if (!className.equals(rmApp.getClassName())) {
205       try {
206         /* Obtain a remote reference for the dependable registry */
207         DependableRegistry dregistry = RegistryFactory.getRegistry();
208         /* Obtain a remote reference for the replication manager. */
209         replicaManager = (ReplicationManager) dregistry.lookup(rmApp.getRegistryName());
210         if (log.isDebugEnabled()) {
211           log.debug("Replication Manager is available");
212         }
213       } catch (Exception e) {
214         log.error("Replication Manager is unavailable", e);
215         throw new IllegalStateException("Replication manager is unavailable", e);
216       }
217     }
218   }
219 
220 
221   ////////////////////////////////////////////////////////////////////////////////////////////
222   // MembershipListener interface methods
223   ////////////////////////////////////////////////////////////////////////////////////////////
224 
225   /**
226    *  Upcall that is invoked by the MembershipLayer when a view change
227    *  occur in this replica's object group.
228    *
229    *  @param view
230    *    The new view object group.
231    */
232   public void viewChange(final View view)
233   {
234     if (Eventlogger.ENABLED)
235       Eventlogger.logEventFlush(new ViewEvent(Server, view));
236     if (log.isDebugEnabled())
237       log.debug("--- RecoveryLayer.viewChange: ---" + view);
238 
239     /*
240      * If the application using this layer is the replication manager,
241      * we simply avoid to execute this code, since the replication manager
242      * should not notify itself of neither view changes or ping events.
243      */
244     if (replicaManager != null) {
245 
246       /* Recompute ping rate for the application */
247       recomputePingRate(view.size());
248 
249       /*
250        * Only the leader will notify the replication manager (correlator)
251        * of view changes related to this replica's object group.  
252        */
253       if (view.memberHasPosition(0, thisMember)) {
254 //        doNotify(view);
255         doNotifyThread(view);
256       }
257     }
258 
259     /*
260      * Always cancel the remove timer if we receive a new view; it should only
261      * remain active if the view size exceeds the initial redundancy level.
262      */
263     if (timer != null) {
264       if (log.isDebugEnabled())
265         log.debug("timer.cancel");
266       timer.cancel();
267     }
268     /*
269      * Determine if the current number of replicas (view size) exceed
270      * the initial (max) redundancy for this application.  If it does,
271      * we will remove one of these replicas at a time, until the
272      * redundancy level is back to normal.  But first we wait to see
273      * if there are more views.
274      */
275     if (thisApp.getInitialRedundancy() < view.size()) {
276       timer = new Timer("RemoveReplicaTask-" + gid, true);
277       if (log.isDebugEnabled())
278         log.debug("timer.schedule");
279       timer.schedule(new RemoveReplicaTask(view), removeLatency);
280     }
281 
282   } // end of viewChange()
283 
284 
285   /**
286    * I'm the member to notify the replication manager of this view change.
287    * We do it in a separate thread to avoid length delays (due to communication
288    * latency) of the view install phase. 
289    * 
290    * @param view
291    */
292   private void doNotifyThread(final View view)
293   {
294     Thread viewNotifyThread = new Thread("ViewNotifyThread-" + gid) {
295       public void run() {
296         if (log.isDebugEnabled())
297           log.debug("Notifying the Replication Manager: Lock pending");
298         lock.lock();
299         try {
300           doNotify(view);
301         } finally {
302           lock.unlock();
303         }
304       }
305     };
306     viewNotifyThread.setDaemon(true);
307     // This thread should have priority in order to receive the result from the RM
308     viewNotifyThread.setPriority(Thread.NORM_PRIORITY+2);
309     viewNotifyThread.start();
310   }
311 
312 
313   /**
314    * I'm the member to notify the replication manager of this view change.
315    * We do it inline of the viewChange() method to ensure that views recorded
316    * at the replication manager are not reordered.  That is we want to ensure
317    * that the replication manager has received this new view, before we allow
318    * the membership service to continue installing another view.
319    * 
320    * @param view
321    */
322   private void doNotify(final View view)
323   {
324     try {
325       if (log.isDebugEnabled())
326         log.debug("Notifying the Replication Manager");
327       replicaManager.notifyEvent(new ViewChangeEvent(view));
328       if (log.isDebugEnabled())
329         log.debug("Notified the Replication Manager");
330 
331     } catch (Exception e) {
332       /*
333        * The replication manager seems to be unavailable, or it was
334        * not prepared to accept the above view change notification.
335        */
336       log.error("Failed to notify the Replication Manager of the new view", e);
337     }
338   }
339 
340   /* (non-Javadoc)
341    * @see jgroup.core.MembershipListener#hasLeft()
342    */
343   public void hasLeft()
344   {
345     if (log.isDebugEnabled())
346       log.debug("I've left the group.");
347     removeReplica();
348   }
349 
350   /* (non-Javadoc)
351    * @see jgroup.core.MembershipListener#prepareChange()
352    */
353   public void prepareChange()
354   {
355     if (log.isDebugEnabled())
356       log.debug("Invalid view; preparing for new view");
357   }
358 
359 
360   ////////////////////////////////////////////////////////////////////////////////////////////
361   // Private Methods and Inner Class
362   ////////////////////////////////////////////////////////////////////////////////////////////
363 
364   /**
365    * Remove the replica instance of the associated application on the localhost.
366    */
367   private void removeReplica()
368   {
369     if (log.isDebugEnabled())
370       log.debug("Removing replica instance from localhost");
371     boolean removed = localHost.removeReplica(thisApp);
372     if (log.isDebugEnabled())
373       log.debug(removed ? "Replica removed" : "Replica already removed");
374   }
375 
376   /**
377    * Update the ping rate according to the current group size, and
378    * restart the liveness timer.
379    */
380   private void recomputePingRate(int groupSize)
381   {
382     ReplicaPingEvent pingEvent = ReplicaPingEvent.getNewPingEvent(gid, groupSize);
383     if (pingEvent != null) {
384       if (livenessTimer != null) {
385         livenessTimer.cancel();
386       }
387       livenessTimer = new Timer("LivenessTimer-" + gid, true);
388       int pingRate = pingEvent.getPingRate();
389       livenessTimer.schedule(new LivenessTask(pingEvent), pingRate, pingRate);
390       if (log.isDebugEnabled())
391         log.debug("Recomputed ping rate: " + pingRate + "ms.");
392     }
393   }
394 
395 
396   /**
397    * A timer task used periodically renew the lease when only a single
398    * replica remains in the group.
399    */
400   private class LivenessTask
401     extends TimerTask
402   {
403     private ReplicaPingEvent pingEvent;
404 
405     public LivenessTask(ReplicaPingEvent pingEvent)
406     {
407       this.pingEvent = pingEvent;
408     }
409 
410     /* (non-Javadoc)
411      * @see java.util.TimerTask#run()
412      */
413     public void run()
414     {
415       if (replicaManager != null) {
416         try {
417           if (log.isDebugEnabled())
418             log.debug("Sending pingEvent: " + pingEvent);
419           replicaManager.notifyEvent(pingEvent);
420         } catch (Exception e) {
421           /*
422            * The replication manager seems to be unavailable.
423            */
424           log.error("Failed to renew the lease with the Replication Manager", e);
425         }
426       }
427     }
428   } // END of LivenessTask
429 
430 
431   /**
432    * A timer task used to remove the local replica if selected.
433    */
434   private class RemoveReplicaTask
435     extends TimerTask
436   {
437 
438     /* Period to wait before a force remove, halting the JVM */
439     private static final long FORCE_REMOVE_DELAY = 10000;
440 
441     /* The most recently installed view on which this task will operate */
442     private View view;
443 
444     /**
445      * Constructs a new timer task object for the provided view.
446      */
447     public RemoveReplicaTask(View view)
448     {
449       super();
450       this.view = view;
451     }
452 
453     /**
454      * The run method is invoked at the scheduled time, unless it is canceled.
455      */
456     public void run()
457     {
458       if (log.isDebugEnabled())
459         log.debug("RemoveReplicaTask.run");
460       if (removeMeNext(view)) {
461         if (log.isDebugEnabled())
462           log.debug("RemovingMe");
463         Util.sleep(removeDelay);
464         if (log.isDebugEnabled())
465           log.debug("Replica leaving from localhost: " + localHost);
466 
467         /*
468          * If for some reason the "clean" leave from the group fails to
469          * complete, we do a force remove.
470          */
471         Timer forceRemove = new Timer("ForceRemove-" + gid, true);
472         TimerTask removeTask = new TimerTask() {
473           public void run() {
474             if (Eventlogger.ENABLED) {
475               Eventlogger.logEventFlush(new ReplicaEvent(ForcedRemove, gid));
476             }
477             log.warn("ForcedRemove");
478             removeReplica();
479           }
480         };
481         forceRemove.schedule(removeTask, FORCE_REMOVE_DELAY);
482 
483         try {
484           /*
485            * Exit the server group; note that we only have one group for each
486            * group manager, thus we don't need to qualify this method with a
487            * group identifier.
488            */
489           membershipService.leave();
490           if (Eventlogger.ENABLED) {
491             Eventlogger.logEventFlush(new ReplicaEvent(Leaving, gid));
492           }
493           /* 
494            * If the server implements the ShutdownListener interface, invoke
495            * the shutdown method in order to allow for application specific
496            * clean up code.
497            */
498           if (shutdownListener != null)
499             shutdownListener.shutdown();
500         } catch (JgroupException e) {
501           log.error("Failed to shutdown local replica", e);
502         }
503       }
504     }
505 
506 
507     //////////////////////////////////////////////////////////////////////////////////////////
508     // Private Methods
509     //////////////////////////////////////////////////////////////////////////////////////////
510 
511     /**
512      * Determine if the local replica should be removed next, yet maintaining
513      * the policy of replica distribution among domains.  We do this by
514      * selecting the domain who has got the largest number of replicas in
515      * this particular view, and from that selecting an arbitrary host.
516      * If that host is the localhost we can release this replica.
517      * 
518      * @param view The current view to check if the local member is to be removed.
519      * @return True is returned if the local member should be removed; otherwise
520      *    false is returned.
521      */
522     private boolean removeMeNext(View view)
523     {
524       Domain largestReplicaDomain = DomainSet.getLargestReplicaDomain(view.getMembers());
525       /* The host from which to remove a replica */
526       Host host = largestReplicaDomain.getHostSet().removeFirst();
527       if (log.isDebugEnabled())
528         log.debug("Selected host: " + host);
529       return localHost.equals(host);
530     }
531 
532   } // END of inner class RemoveReplicaTask
533 
534 } // END RecoveryLayer