1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.arm;
20
21 import static jgroup.util.log.ReplicaEvent.Type.ForcedRemove;
22 import static jgroup.util.log.ReplicaEvent.Type.Leaving;
23 import static jgroup.util.log.ViewEvent.Type.Server;
24
25 import java.rmi.RemoteException;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import jgroup.core.JgroupException;
32 import jgroup.core.MemberId;
33 import jgroup.core.MembershipListener;
34 import jgroup.core.MembershipService;
35 import jgroup.core.View;
36 import jgroup.core.arm.RecoveryService;
37 import jgroup.core.arm.ReplicationManager;
38 import jgroup.core.arm.ShutdownListener;
39 import jgroup.core.registry.DependableRegistry;
40 import jgroup.core.registry.RegistryFactory;
41 import jgroup.relacs.config.AppConfig;
42 import jgroup.relacs.config.DistributedSystemConfig;
43 import jgroup.relacs.config.Domain;
44 import jgroup.relacs.config.DomainSet;
45 import jgroup.relacs.config.Host;
46 import jgroup.util.Util;
47 import jgroup.util.log.Eventlogger;
48 import jgroup.util.log.ReplicaEvent;
49 import jgroup.util.log.ViewEvent;
50
51 import org.apache.log4j.Logger;
52
53 /**
54 * Recovery layer for providing the replication manager with view
55 * change events from groups assoicated with the replication manager.
56 *
57 * Note that the <code>RecoveryListener</code> does not provide any
58 * upcalls (it is only a marker interface), thus to listen for view
59 * change events, the replica should implement the
60 * <code>MembershipListener</code>. A server using the recovery layer
61 * need not implement the <code>RecoveryListener</code>, instead it
62 * simply needs to specify "Recovery" in the layer stack.
63 *
64 * @author Hein Meling
65 * @since Jgroup 1.2
66 */
67 public class RecoveryLayer
68 implements RecoveryService, MembershipListener
69 {
70
71 ////////////////////////////////////////////////////////////////////////////////////////////
72 // Logger
73 ////////////////////////////////////////////////////////////////////////////////////////////
74
75 /** Obtain logger for this class */
76 private static final Logger log = Logger.getLogger(RecoveryLayer.class);
77
78
79 ////////////////////////////////////////////////////////////////////////////////////////////
80 // Constants
81 ////////////////////////////////////////////////////////////////////////////////////////////
82
83 /** The default delay before removing a replica */
84 private static final int DEFAULT_REMOVE_DELAY = 2000;
85
86 /** The default max time before canceling a removal */
87 private static final int DEFAULT_REMOVE_LATENCY = 5000;
88
89
90 ////////////////////////////////////////////////////////////////////////////////////////////
91 // Fields
92 ////////////////////////////////////////////////////////////////////////////////////////////
93
94 /** Underlying group membership service. */
95 private MembershipService membershipService;
96
97 /**
98 * The shutdown listener; note that the server (listener) is not required
99 * to implement the ShutdownListener interface, but if it does this field
100 * will be set to the implementor allowing us to invoke the shutdown method.
101 */
102 private ShutdownListener shutdownListener = null;
103
104 /** Reference to the replication manager. */
105 private ReplicationManager replicaManager = null;
106
107 /** Application info object for the replication manager. */
108 private AppConfig rmApp;
109
110 /** Application info object for the server object associated with this group manager. */
111 private AppConfig thisApp;
112
113 /** My member identifier. */
114 private MemberId thisMember = null;
115
116 /** This applications group identifier */
117 private int gid;
118
119 /** The local host */
120 private Host localHost;
121
122 /** Timer used to delay the remove task when new views keep arriving */
123 private Timer timer = null;
124
125 /** The delay before removing a replica after it has been elected */
126 private int removeDelay;
127
128 /**
129 * The largest time allowed between canceling view changes; that is, the
130 * amount of time to wait before starting the remove replica task. If a
131 * view change arrive before the timer expires, the timer will be canceled
132 * and a new one instansiated.
133 */
134 private int removeLatency;
135
136 /**
137 * Timer used to periodically renew the lease for this application.
138 */
139 private Timer livenessTimer = null;
140
141 /** Lock object for the exclusive access to notify the RM */
142 private final Lock lock = new ReentrantLock();
143
144
145 ////////////////////////////////////////////////////////////////////////////////////////////
146 // Constructor
147 ////////////////////////////////////////////////////////////////////////////////////////////
148
149 /**
150 *
151 */
152 private RecoveryLayer(MembershipService pgms)
153 throws JgroupException, RemoteException
154 {
155 membershipService = pgms;
156 thisMember = membershipService.getMyIdentifier();
157
158 /* Obtain the replication manager application object */
159 rmApp = AppConfig.getApplication(ReplicaManagerImpl.class);
160 localHost = DistributedSystemConfig.getLocalHost();
161 }
162
163
164 ////////////////////////////////////////////////////////////////////////////////////////////
165 // Static factory
166 ////////////////////////////////////////////////////////////////////////////////////////////
167
168 public static RecoveryLayer getLayer(MembershipService pgms)
169 throws JgroupException, RemoteException
170 {
171 return new RecoveryLayer(pgms);
172 }
173
174
175 ////////////////////////////////////////////////////////////////////////////////////////////
176 // Layer interface methods (inherited from RecoveryService interface)
177 ////////////////////////////////////////////////////////////////////////////////////////////
178
179 /**
180 * Add a listener for membership events.
181 *
182 * Since this layer sends view change events to the replication
183 * manager, we must provide special handling to use this layer
184 * from the replication manager. The replication manager must
185 * provide self-recovery through its local view changes. This
186 * layer is used by the replication manager to remove excessive
187 * replicas from the RM group.
188 */
189 public void addListener(Object listener)
190 {
191 if (listener == null)
192 throw new NullPointerException("No replica specified for the RecoveryLayer");
193 if (listener instanceof DependableRegistry)
194 throw new IllegalArgumentException("RecoveryLayer cannot be used by the dependable registry");
195 if (listener instanceof ShutdownListener)
196 shutdownListener = (ShutdownListener) listener;
197 String className = listener.getClass().getName();
198 thisApp = AppConfig.getApplication(className);
199 gid = thisApp.getGroupId();
200 removeLatency = thisApp.getIntParam("Recovery.RemoveLatency", DEFAULT_REMOVE_LATENCY);
201 removeDelay = thisApp.getIntParam("Recovery.RemoveDelay", DEFAULT_REMOVE_DELAY);
202
203 /* Check if this is the server is the replication manager or not. */
204 if (!className.equals(rmApp.getClassName())) {
205 try {
206 /* Obtain a remote reference for the dependable registry */
207 DependableRegistry dregistry = RegistryFactory.getRegistry();
208 /* Obtain a remote reference for the replication manager. */
209 replicaManager = (ReplicationManager) dregistry.lookup(rmApp.getRegistryName());
210 if (log.isDebugEnabled()) {
211 log.debug("Replication Manager is available");
212 }
213 } catch (Exception e) {
214 log.error("Replication Manager is unavailable", e);
215 throw new IllegalStateException("Replication manager is unavailable", e);
216 }
217 }
218 }
219
220
221 ////////////////////////////////////////////////////////////////////////////////////////////
222 // MembershipListener interface methods
223 ////////////////////////////////////////////////////////////////////////////////////////////
224
225 /**
226 * Upcall that is invoked by the MembershipLayer when a view change
227 * occur in this replica's object group.
228 *
229 * @param view
230 * The new view object group.
231 */
232 public void viewChange(final View view)
233 {
234 if (Eventlogger.ENABLED)
235 Eventlogger.logEventFlush(new ViewEvent(Server, view));
236 if (log.isDebugEnabled())
237 log.debug("--- RecoveryLayer.viewChange: ---" + view);
238
239 /*
240 * If the application using this layer is the replication manager,
241 * we simply avoid to execute this code, since the replication manager
242 * should not notify itself of neither view changes or ping events.
243 */
244 if (replicaManager != null) {
245
246 /* Recompute ping rate for the application */
247 recomputePingRate(view.size());
248
249 /*
250 * Only the leader will notify the replication manager (correlator)
251 * of view changes related to this replica's object group.
252 */
253 if (view.memberHasPosition(0, thisMember)) {
254 // doNotify(view);
255 doNotifyThread(view);
256 }
257 }
258
259 /*
260 * Always cancel the remove timer if we receive a new view; it should only
261 * remain active if the view size exceeds the initial redundancy level.
262 */
263 if (timer != null) {
264 if (log.isDebugEnabled())
265 log.debug("timer.cancel");
266 timer.cancel();
267 }
268 /*
269 * Determine if the current number of replicas (view size) exceed
270 * the initial (max) redundancy for this application. If it does,
271 * we will remove one of these replicas at a time, until the
272 * redundancy level is back to normal. But first we wait to see
273 * if there are more views.
274 */
275 if (thisApp.getInitialRedundancy() < view.size()) {
276 timer = new Timer("RemoveReplicaTask-" + gid, true);
277 if (log.isDebugEnabled())
278 log.debug("timer.schedule");
279 timer.schedule(new RemoveReplicaTask(view), removeLatency);
280 }
281
282 } // end of viewChange()
283
284
285 /**
286 * I'm the member to notify the replication manager of this view change.
287 * We do it in a separate thread to avoid length delays (due to communication
288 * latency) of the view install phase.
289 *
290 * @param view
291 */
292 private void doNotifyThread(final View view)
293 {
294 Thread viewNotifyThread = new Thread("ViewNotifyThread-" + gid) {
295 public void run() {
296 if (log.isDebugEnabled())
297 log.debug("Notifying the Replication Manager: Lock pending");
298 lock.lock();
299 try {
300 doNotify(view);
301 } finally {
302 lock.unlock();
303 }
304 }
305 };
306 viewNotifyThread.setDaemon(true);
307 // This thread should have priority in order to receive the result from the RM
308 viewNotifyThread.setPriority(Thread.NORM_PRIORITY+2);
309 viewNotifyThread.start();
310 }
311
312
313 /**
314 * I'm the member to notify the replication manager of this view change.
315 * We do it inline of the viewChange() method to ensure that views recorded
316 * at the replication manager are not reordered. That is we want to ensure
317 * that the replication manager has received this new view, before we allow
318 * the membership service to continue installing another view.
319 *
320 * @param view
321 */
322 private void doNotify(final View view)
323 {
324 try {
325 if (log.isDebugEnabled())
326 log.debug("Notifying the Replication Manager");
327 replicaManager.notifyEvent(new ViewChangeEvent(view));
328 if (log.isDebugEnabled())
329 log.debug("Notified the Replication Manager");
330
331 } catch (Exception e) {
332 /*
333 * The replication manager seems to be unavailable, or it was
334 * not prepared to accept the above view change notification.
335 */
336 log.error("Failed to notify the Replication Manager of the new view", e);
337 }
338 }
339
340 /* (non-Javadoc)
341 * @see jgroup.core.MembershipListener#hasLeft()
342 */
343 public void hasLeft()
344 {
345 if (log.isDebugEnabled())
346 log.debug("I've left the group.");
347 removeReplica();
348 }
349
350 /* (non-Javadoc)
351 * @see jgroup.core.MembershipListener#prepareChange()
352 */
353 public void prepareChange()
354 {
355 if (log.isDebugEnabled())
356 log.debug("Invalid view; preparing for new view");
357 }
358
359
360 ////////////////////////////////////////////////////////////////////////////////////////////
361 // Private Methods and Inner Class
362 ////////////////////////////////////////////////////////////////////////////////////////////
363
364 /**
365 * Remove the replica instance of the associated application on the localhost.
366 */
367 private void removeReplica()
368 {
369 if (log.isDebugEnabled())
370 log.debug("Removing replica instance from localhost");
371 boolean removed = localHost.removeReplica(thisApp);
372 if (log.isDebugEnabled())
373 log.debug(removed ? "Replica removed" : "Replica already removed");
374 }
375
376 /**
377 * Update the ping rate according to the current group size, and
378 * restart the liveness timer.
379 */
380 private void recomputePingRate(int groupSize)
381 {
382 ReplicaPingEvent pingEvent = ReplicaPingEvent.getNewPingEvent(gid, groupSize);
383 if (pingEvent != null) {
384 if (livenessTimer != null) {
385 livenessTimer.cancel();
386 }
387 livenessTimer = new Timer("LivenessTimer-" + gid, true);
388 int pingRate = pingEvent.getPingRate();
389 livenessTimer.schedule(new LivenessTask(pingEvent), pingRate, pingRate);
390 if (log.isDebugEnabled())
391 log.debug("Recomputed ping rate: " + pingRate + "ms.");
392 }
393 }
394
395
396 /**
397 * A timer task used periodically renew the lease when only a single
398 * replica remains in the group.
399 */
400 private class LivenessTask
401 extends TimerTask
402 {
403 private ReplicaPingEvent pingEvent;
404
405 public LivenessTask(ReplicaPingEvent pingEvent)
406 {
407 this.pingEvent = pingEvent;
408 }
409
410 /* (non-Javadoc)
411 * @see java.util.TimerTask#run()
412 */
413 public void run()
414 {
415 if (replicaManager != null) {
416 try {
417 if (log.isDebugEnabled())
418 log.debug("Sending pingEvent: " + pingEvent);
419 replicaManager.notifyEvent(pingEvent);
420 } catch (Exception e) {
421 /*
422 * The replication manager seems to be unavailable.
423 */
424 log.error("Failed to renew the lease with the Replication Manager", e);
425 }
426 }
427 }
428 } // END of LivenessTask
429
430
431 /**
432 * A timer task used to remove the local replica if selected.
433 */
434 private class RemoveReplicaTask
435 extends TimerTask
436 {
437
438 /* Period to wait before a force remove, halting the JVM */
439 private static final long FORCE_REMOVE_DELAY = 10000;
440
441 /* The most recently installed view on which this task will operate */
442 private View view;
443
444 /**
445 * Constructs a new timer task object for the provided view.
446 */
447 public RemoveReplicaTask(View view)
448 {
449 super();
450 this.view = view;
451 }
452
453 /**
454 * The run method is invoked at the scheduled time, unless it is canceled.
455 */
456 public void run()
457 {
458 if (log.isDebugEnabled())
459 log.debug("RemoveReplicaTask.run");
460 if (removeMeNext(view)) {
461 if (log.isDebugEnabled())
462 log.debug("RemovingMe");
463 Util.sleep(removeDelay);
464 if (log.isDebugEnabled())
465 log.debug("Replica leaving from localhost: " + localHost);
466
467 /*
468 * If for some reason the "clean" leave from the group fails to
469 * complete, we do a force remove.
470 */
471 Timer forceRemove = new Timer("ForceRemove-" + gid, true);
472 TimerTask removeTask = new TimerTask() {
473 public void run() {
474 if (Eventlogger.ENABLED) {
475 Eventlogger.logEventFlush(new ReplicaEvent(ForcedRemove, gid));
476 }
477 log.warn("ForcedRemove");
478 removeReplica();
479 }
480 };
481 forceRemove.schedule(removeTask, FORCE_REMOVE_DELAY);
482
483 try {
484 /*
485 * Exit the server group; note that we only have one group for each
486 * group manager, thus we don't need to qualify this method with a
487 * group identifier.
488 */
489 membershipService.leave();
490 if (Eventlogger.ENABLED) {
491 Eventlogger.logEventFlush(new ReplicaEvent(Leaving, gid));
492 }
493 /*
494 * If the server implements the ShutdownListener interface, invoke
495 * the shutdown method in order to allow for application specific
496 * clean up code.
497 */
498 if (shutdownListener != null)
499 shutdownListener.shutdown();
500 } catch (JgroupException e) {
501 log.error("Failed to shutdown local replica", e);
502 }
503 }
504 }
505
506
507 //////////////////////////////////////////////////////////////////////////////////////////
508 // Private Methods
509 //////////////////////////////////////////////////////////////////////////////////////////
510
511 /**
512 * Determine if the local replica should be removed next, yet maintaining
513 * the policy of replica distribution among domains. We do this by
514 * selecting the domain who has got the largest number of replicas in
515 * this particular view, and from that selecting an arbitrary host.
516 * If that host is the localhost we can release this replica.
517 *
518 * @param view The current view to check if the local member is to be removed.
519 * @return True is returned if the local member should be removed; otherwise
520 * false is returned.
521 */
522 private boolean removeMeNext(View view)
523 {
524 Domain largestReplicaDomain = DomainSet.getLargestReplicaDomain(view.getMembers());
525 /* The host from which to remove a replica */
526 Host host = largestReplicaDomain.getHostSet().removeFirst();
527 if (log.isDebugEnabled())
528 log.debug("Selected host: " + host);
529 return localHost.equals(host);
530 }
531
532 } // END of inner class RemoveReplicaTask
533
534 } // END RecoveryLayer