View Javadoc

1   /*
2    * Copyright (c) 1998-2004 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.experiment.runnables;
20  
21  import java.rmi.RemoteException;
22  import java.util.Timer;
23  import java.util.TimerTask;
24  
25  import jgroup.core.ConfigurationException;
26  import jgroup.core.View;
27  import jgroup.core.arm.ARMEvent;
28  import jgroup.core.arm.ReplicationManager;
29  import jgroup.core.arm.ReplicationManager.ManagementCallback;
30  import jgroup.core.registry.DependableRegistry;
31  import jgroup.core.registry.RegistryFactory;
32  import jgroup.experiment.PropertyDefinition;
33  import jgroup.experiment.Runnable;
34  import jgroup.relacs.config.AppConfig;
35  import jgroup.relacs.config.ExperimentConfig;
36  import net.jini.export.Exporter;
37  import net.jini.jeri.BasicILFactory;
38  import net.jini.jeri.BasicJeriExporter;
39  import net.jini.jeri.tcp.TcpServerEndpoint;
40  
41  import org.apache.log4j.Logger;
42  
43  /**
44   * Deploy the application group specified in the properties associated
45   * with this runnable.  Also listen to callbacks from ARM to determine
46   * if the group has been installed, and reached the fully replicated
47   * level.
48   *
49   * @author Hein Meling
50   */
51  public class DeployReplicas
52    implements Runnable, ManagementCallback
53  {
54  
55    ////////////////////////////////////////////////////////////////////////////////////////////
56    // Logger
57    ////////////////////////////////////////////////////////////////////////////////////////////
58  
59    /** Obtain logger for this class */
60    private static final Logger log = Logger.getLogger(DeployReplicas.class);
61  
62  
63    ////////////////////////////////////////////////////////////////////////////////////////////
64    // Constants
65    ////////////////////////////////////////////////////////////////////////////////////////////
66  
67    /** The number of milliseconds to wait (max) per replica being deployed */
68    private static final int WAIT_TIME_PER_REPLICA = 5000;
69  
70  
71    ////////////////////////////////////////////////////////////////////////////////////////////
72    // Fields
73    ////////////////////////////////////////////////////////////////////////////////////////////
74  
75    /** The application being deployed through this runnable */
76    private AppConfig app;
77  
78    /**
79     * Indicating if the application is fully replicated;
80     * after which this runnable can exit
81     */
82    private volatile boolean fullyReplicated;
83  
84    /**
85     * True if the experiment engine node supports RMI callbacks;
86     * that is, the node must not be behind a NAT router and hence
87     * must have an IP address.  Default is true.
88     */
89    private boolean supportCallbacks;
90  
91  
92    ////////////////////////////////////////////////////////////////////////////////////////////
93    // Methods from Runnable
94    ////////////////////////////////////////////////////////////////////////////////////////////
95  
96    /* (non-Javadoc)
97     * @see jgroup.experiment.Runnable#run(jgroup.relacs.config.ExperimentConfig)
98     */
99    public void run(ExperimentConfig ec)
100     throws ConfigurationException
101   {
102     fullyReplicated = false;
103     String clazz = ec.getProperty(this, "class");
104     app = AppConfig.getApplication(clazz);
105     int minimal = ec.getIntProperty(this, "minimal.redundancy", app.getMinimalRedundancy());
106     int initial = ec.getIntProperty(this, "initial.redundancy", app.getInitialRedundancy());
107     app.setRedundancy(minimal, initial);
108     app.setProperties(ec.getProperties(this));
109     int maxWait = initial*WAIT_TIME_PER_REPLICA;
110     maxWait = ec.getIntProperty(this, "max.waiting.time", maxWait);
111     supportCallbacks = ec.getBooleanProperty(this, "supports.rmi.callbacks", true);
112     try {
113       deployApp(maxWait);
114     } catch (Exception e) {
115       throw new ConfigurationException("Could not deploy class: " + clazz, e);
116     }
117   }
118 
119   /* (non-Javadoc)
120    * @see jgroup.experiment.Runnable#getProperties()
121    */
122   public PropertyDefinition[] getProperties()
123   {
124     return null;
125   }
126 
127 
128   ////////////////////////////////////////////////////////////////////////////////////////////
129   // Private Methods
130   ////////////////////////////////////////////////////////////////////////////////////////////
131 
132   private void deployApp(int maxWait)
133     throws Exception
134   {
135     /*
136      * Obtain a proxy for the dependable registry running in the
137      * distributed system described in the config.xml file.
138      */
139     DependableRegistry dregistry = RegistryFactory.getRegistry();
140 
141     /*
142      * Retrieve a proxy for an object group implementing the
143      * ReplicationManager interface registered under the name
144      * "ReplicationManager".
145      */
146     final ReplicationManager replicaManager =
147       (ReplicationManager) dregistry.lookup("Jgroup/ReplicationManager");
148     log.info("Found ReplicationManager");
149     int gid = replicaManager.createGroup(app);
150     log.info("Created group: " + gid);
151 
152     if (supportCallbacks) {
153       /*
154        * Export the management callback interface, and subscribe to events
155        * of the recently created group.
156        */
157       Exporter exporter =
158         new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
159       ManagementCallback mcProxy = (ManagementCallback) exporter.export(this);
160       replicaManager.subscribe(mcProxy, gid);
161       TimerTask awaitFullGroup = new TimerTask() {
162         public void run() {
163           log.warn("Group did not reach full replication level");
164           fullyReplicated = true;
165         }
166       };
167       Timer timer = new Timer("AwaitFullyReplicatedGroup", true);
168       timer.schedule(awaitFullGroup, maxWait);
169       log.info("Waiting for full replication level");
170       while (!fullyReplicated) {
171         Thread.yield();
172       }
173       timer.cancel();
174       replicaManager.unsubscribe(gid);
175       exporter.unexport(true);
176     }
177   }
178 
179 
180   /* (non-Javadoc)
181    * @see jgroup.core.arm.ReplicationManager.ManagementCallback#notifyEvent(jgroup.core.arm.ARMEvent)
182    */
183   public void notifyEvent(ARMEvent event)
184     throws RemoteException
185   {
186     log.info("Received event: " + event);
187     if (event == null)
188       return;
189     Object obj = event.getObject();
190     if (obj != null && obj instanceof View) {
191       View view = (View) obj;
192       if (app.getInitialRedundancy() == view.size()) {
193         fullyReplicated = true;
194         log.info("Group (" + view.getGid() + ") fully replicated");
195       }
196     }
197   }
198 
199 } // END DeployReplicas