1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.experiment.runnables;
20
21 import java.rmi.RemoteException;
22 import java.util.Timer;
23 import java.util.TimerTask;
24
25 import jgroup.core.ConfigurationException;
26 import jgroup.core.View;
27 import jgroup.core.arm.ARMEvent;
28 import jgroup.core.arm.ReplicationManager;
29 import jgroup.core.arm.ReplicationManager.ManagementCallback;
30 import jgroup.core.registry.DependableRegistry;
31 import jgroup.core.registry.RegistryFactory;
32 import jgroup.experiment.PropertyDefinition;
33 import jgroup.experiment.Runnable;
34 import jgroup.relacs.config.AppConfig;
35 import jgroup.relacs.config.ExperimentConfig;
36 import net.jini.export.Exporter;
37 import net.jini.jeri.BasicILFactory;
38 import net.jini.jeri.BasicJeriExporter;
39 import net.jini.jeri.tcp.TcpServerEndpoint;
40
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48
49
50
51 public class DeployReplicas
52 implements Runnable, ManagementCallback
53 {
54
55
56
57
58
59
60 private static final Logger log = Logger.getLogger(DeployReplicas.class);
61
62
63
64
65
66
67
68 private static final int WAIT_TIME_PER_REPLICA = 5000;
69
70
71
72
73
74
75
76 private AppConfig app;
77
78
79
80
81
82 private volatile boolean fullyReplicated;
83
84
85
86
87
88
89 private boolean supportCallbacks;
90
91
92
93
94
95
96
97
98
99 public void run(ExperimentConfig ec)
100 throws ConfigurationException
101 {
102 fullyReplicated = false;
103 String clazz = ec.getProperty(this, "class");
104 app = AppConfig.getApplication(clazz);
105 int minimal = ec.getIntProperty(this, "minimal.redundancy", app.getMinimalRedundancy());
106 int initial = ec.getIntProperty(this, "initial.redundancy", app.getInitialRedundancy());
107 app.setRedundancy(minimal, initial);
108 app.setProperties(ec.getProperties(this));
109 int maxWait = initial*WAIT_TIME_PER_REPLICA;
110 maxWait = ec.getIntProperty(this, "max.waiting.time", maxWait);
111 supportCallbacks = ec.getBooleanProperty(this, "supports.rmi.callbacks", true);
112 try {
113 deployApp(maxWait);
114 } catch (Exception e) {
115 throw new ConfigurationException("Could not deploy class: " + clazz, e);
116 }
117 }
118
119
120
121
122 public PropertyDefinition[] getProperties()
123 {
124 return null;
125 }
126
127
128
129
130
131
132 private void deployApp(int maxWait)
133 throws Exception
134 {
135
136
137
138
139 DependableRegistry dregistry = RegistryFactory.getRegistry();
140
141
142
143
144
145
146 final ReplicationManager replicaManager =
147 (ReplicationManager) dregistry.lookup("Jgroup/ReplicationManager");
148 log.info("Found ReplicationManager");
149 int gid = replicaManager.createGroup(app);
150 log.info("Created group: " + gid);
151
152 if (supportCallbacks) {
153
154
155
156
157 Exporter exporter =
158 new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
159 ManagementCallback mcProxy = (ManagementCallback) exporter.export(this);
160 replicaManager.subscribe(mcProxy, gid);
161 TimerTask awaitFullGroup = new TimerTask() {
162 public void run() {
163 log.warn("Group did not reach full replication level");
164 fullyReplicated = true;
165 }
166 };
167 Timer timer = new Timer("AwaitFullyReplicatedGroup", true);
168 timer.schedule(awaitFullGroup, maxWait);
169 log.info("Waiting for full replication level");
170 while (!fullyReplicated) {
171 Thread.yield();
172 }
173 timer.cancel();
174 replicaManager.unsubscribe(gid);
175 exporter.unexport(true);
176 }
177 }
178
179
180
181
182
183 public void notifyEvent(ARMEvent event)
184 throws RemoteException
185 {
186 log.info("Received event: " + event);
187 if (event == null)
188 return;
189 Object obj = event.getObject();
190 if (obj != null && obj instanceof View) {
191 View view = (View) obj;
192 if (app.getInitialRedundancy() == view.size()) {
193 fullyReplicated = true;
194 log.info("Group (" + view.getGid() + ") fully replicated");
195 }
196 }
197 }
198
199 }