1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.upgrade;
20
21 import java.rmi.NotBoundException;
22 import java.rmi.RemoteException;
23
24 import jgroup.core.ExternalGMIListener;
25 import jgroup.core.ExternalGMIService;
26 import jgroup.core.JgroupException;
27 import jgroup.core.Layer;
28 import jgroup.core.MemberId;
29 import jgroup.core.MembershipListener;
30 import jgroup.core.MembershipService;
31 import jgroup.core.View;
32 import jgroup.core.arm.ExecException;
33 import jgroup.core.arm.UnknownGroupException;
34 import jgroup.core.protocols.Multicast;
35 import jgroup.core.registry.RegistryService;
36 import jgroup.relacs.config.AppConfig;
37 import jgroup.relacs.config.DistributedSystemConfig;
38 import jgroup.relacs.config.Host;
39 import jgroup.relacs.types.EndPointImpl;
40 import jgroup.relacs.types.ViewImpl;
41
42 import org.apache.log4j.Logger;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class UpgradeLayer
66 implements MembershipListener, UpgradeService
67 {
68
69
70
71
72
73
74 private static final Logger log = Logger.getLogger(UpgradeLayer.class);
75
76
77 private static final int S_IDLE = 0;
78 private static final int S_IDLE_UPGRADING = 1;
79 private static final int S_UPGRADING = 2;
80 private static final int S_LEAVING = 3;
81 private static final int S_EXIT = 4;
82
83 private static final String S_names[] = new String[] {
84 "S_IDLE",
85 "S_IDLE_UPGRADING",
86 "S_UPGRADING",
87 "S_LEAVING",
88 "S_EXIT"
89 };
90
91
92 private static final boolean MEASUREMENT = true;
93
94
95 private long times[] = new long [S_EXIT+1];
96
97
98
99
100
101
102
103 private int state = S_IDLE;
104
105
106 private UpgradeListener upgradeListener;
107
108 private MemberId thisMember;
109
110
111 private View currentView = new ViewImpl();
112
113
114 private MemberId[] initialMembers = null;
115
116 private Host localHost = DistributedSystemConfig.getLocalHost();
117
118 private AppConfig thisApp;
119
120 private MembershipService membershipService;
121
122 private RegistryService registryService;
123
124
125 private boolean upgradable = false;
126
127
128 private boolean leavingTime = false;
129
130
131
132
133
134
135
136
137
138 private UpgradeLayer(MembershipService pgms, RegistryService regs, ExternalGMIService egmi)
139 throws JgroupException, RemoteException
140 {
141 membershipService = pgms;
142 thisMember = pgms.getMyIdentifier();
143 registryService = regs;
144 if (log.isDebugEnabled())
145 log.debug("UpgradeLayer constructed");
146 }
147
148
149
150
151
152
153 public static UpgradeLayer getLayer(MembershipService pgms, RegistryService regs,
154 ExternalGMIService egmi) throws JgroupException, RemoteException
155 {
156 return new UpgradeLayer(pgms, regs, egmi);
157 }
158
159
160
161
162
163
164
165
166
167 public void addListener(Object listener)
168 {
169 if (log.isDebugEnabled())
170 log.debug("Adding a listener of the UpgradeLayer");
171 if (listener == null)
172 throw new NullPointerException("No replica specified for addListener");
173 if (!(listener instanceof UpgradeListener))
174 throw new IllegalArgumentException("Specified listener do not implement the UpgradeListener interface");
175 if (upgradeListener != null)
176 throw new IllegalStateException("Only one upgrade listener is allowed for each replica.");
177
178 thisApp = AppConfig.getApplication(listener);
179 final String upgradeServiceName = UPGRADE_SERVICE + "-" + thisApp.getGroupId();
180
181 try {
182 registryService.bind(upgradeServiceName, this);
183 if (log.isDebugEnabled())
184 log.debug("Bound the UpgradeLayer in the registry: " + upgradeServiceName);
185 } catch (Exception e) {
186 throw new IllegalStateException("Unable to bind " + upgradeServiceName +
187 " in the dependable registry", e);
188 }
189
190
191 upgradeListener = (UpgradeListener) listener;
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 @Multicast public synchronized void upgradeRequest(AppConfig newApp)
212 throws RemoteException, UnknownGroupException, ExecException
213 {
214 if (MEASUREMENT) {
215 times[S_IDLE_UPGRADING] = System.currentTimeMillis();
216 if (log.isDebugEnabled())
217 log.debug("S_IDLE_UPGRADING@" + thisMember + "time = " + times[S_IDLE_UPGRADING]);
218 }
219 if (log.isDebugEnabled())
220 log.debug("UpgradeRequest@UpgradeLayer: " + newApp);
221 if (state == S_UPGRADING) {
222 if (log.isDebugEnabled())
223 log.warn("Upgrade request ignored; already upgrading " + newApp);
224 return;
225 }
226
227
228 state = S_IDLE_UPGRADING;
229 if (newApp.getGroupId() != currentView.getGid())
230 throw new UnknownGroupException("This replica has GroupId: ", currentView.getGid());
231
232
233
234
235
236 prepareSelection();
237
238
239 new Thread(new ReplicaUpgradingThread(this, newApp), "UPGRDADING_THREAD").start();
240 }
241
242
243
244
245
246
247 private synchronized void wait_for_new_notifications()
248 {
249 try {
250 wait();
251 } catch (InterruptedException e) {
252 if (log.isDebugEnabled())
253 log.debug("After the UPGRADING_THREAD notification");
254 }
255
256
257
258
259
260
261
262 private void prepareSelection()
263 {
264 initialMembers = currentView.getMembers();
265 if (log.isDebugEnabled())
266 log.debug("Initial view = " + currentView);
267 upgradable = amItheNext(currentView);
268 }
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283 private boolean amItheNext(View view)
284 {
285 MemberId[] members = view.commonMembers(initialMembers);
286
287 if (members.length == 0)
288 return false;
289 MemberId smallest = members[0];
290
291 for (int i = 1; i < members.length; i++) {
292 if (smallest.compareTo(members[i]) > 0)
293 smallest = members[i];
294 }
295
296 if (log.isDebugEnabled())
297 log.debug("amItheNext: Smallest member = " + smallest + " am I the next one = " + smallest.equals(thisMember));
298 if (smallest.equals(thisMember))
299 return true;
300 else
301 return false;
302 }
303
304
305
306
307
308
309
310
311 private boolean isItTimeToLeave(View view)
312 {
313 if (initialMembers == null)
314 return false;
315 int vl = view.size();
316 if (vl > initialMembers.length) {
317 int localReplicas = 0;
318 for (int i = 0; i < vl; i++) {
319 EndPointImpl ep = (EndPointImpl) view.getMembers()[i].getEndPoint();
320 if (ep.compareTo(thisMember.getEndPoint()) == 0)
321 localReplicas++;
322 }
323 if (log.isDebugEnabled())
324 log.debug("Number of replicas on the local host: " + localReplicas);
325 if (localReplicas > 1)
326 return true;
327 }
328 return false;
329
330 }
331
332
333
334
335 private void showTimes() {
336 log.info("State transition times for replica at " + thisMember.getEndPoint().getIntAddress());
337 StringBuilder b = new StringBuilder();
338 for(int i=S_IDLE_UPGRADING; i<times.length-1; i++)
339 b.append(S_names[i] + "\t");
340 log.info(b);
341
342 b = new StringBuilder();
343 for(int i=S_IDLE_UPGRADING; i<times.length-1; i++)
344 b.append(times[i] + "\t");
345 log.info(b);
346 log.info(" Time Diffs:");
347 b = new StringBuilder();
348 for(int i=S_IDLE_UPGRADING; i<times.length-1; i++)
349 b.append((times[i]- times[i-1]) + "\t");
350 log.info(b);
351 }
352
353 private boolean upgradeEnabled(AppConfig newApp)
354 {
355 int vs = currentView.size();
356 if (log.isDebugEnabled())
357 log.debug("Checking the upgrade enabling condition: View.size= " + vs + ", upgradable: " + upgradable);
358 return (vs > newApp.getMinimalRedundancy() && upgradable);
359
360 }
361
362
363
364
365
366
367
368
369
370
371
372
373
374 public synchronized void viewChange(View view)
375 {
376 if (log.isDebugEnabled())
377 log.debug("UpgradeLayer:viewChange: " + view);
378
379 currentView = view;
380
381 upgradable = amItheNext(view);
382 if (log.isDebugEnabled())
383 log.debug("Checking if I'm the replica to be upgraded next: " + upgradable);
384
385
386 leavingTime = isItTimeToLeave(view);
387
388 notifyAll();
389 }
390
391
392
393
394
395
396 public void hasLeft()
397 {
398 if (log.isDebugEnabled()) log.debug("Replica upgraded");
399 upgradeListener.upgraded();
400 boolean removed = localHost.removeReplica(thisApp);
401 }
402
403
404
405
406
407 public void prepareChange()
408 {
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422
423 private class ReplicaUpgradingThread
424 implements Runnable
425 {
426 private AppConfig newApp;
427 private UpgradeLayer monitor;
428
429 ReplicaUpgradingThread(UpgradeLayer monitor, AppConfig newApp)
430 {
431 this.monitor=monitor;
432 this.newApp=newApp;
433 }
434
435 public void run()
436 {
437 if (log.isDebugEnabled())
438 log.debug("UpgradeLayer: Entering the loop checking the enabling condition upgrading of this replica");
439 while (!upgradeEnabled(newApp)) {
440 if (log.isDebugEnabled())
441 log.debug("UpgradeLayer: in the loop after having checked the condition");
442 wait_for_new_notifications();
443 }
444
445
446
447
448
449 if (log.isDebugEnabled())
450 log.debug("UpgradeLayer: Leaving the loop checking the enabling condition upgrading of this replica");
451
452
453
454 state = S_UPGRADING;
455 if (MEASUREMENT) {
456 times[S_UPGRADING] = System.currentTimeMillis();
457 if (log.isDebugEnabled())
458 log.debug("S_UPGRADING@" + thisMember + "time = " + times[S_UPGRADING]);
459 }
460
461 try {
462 localHost.createReplica(newApp);
463
464
465 } catch (Exception ex) {
466 log.error("Cannot start a new replica", ex);
467 }
468 if (log.isDebugEnabled())
469 log.debug("After a replica is triggered to be created");
470
471
472 while (!leavingTime) {
473 if (log.isDebugEnabled())
474 log.debug("UpgradeLayer: S_UPGRADING loop");
475 wait_for_new_notifications();
476 }
477
478 state = S_LEAVING;
479 if (MEASUREMENT) {
480 times[S_LEAVING] = System.currentTimeMillis();
481 if (log.isDebugEnabled())
482 log.debug("S_LEAVING@" + thisMember + "time = " + times[S_LEAVING]);
483 }
484 showTimes();
485 if (log.isDebugEnabled())
486 log.debug("Releasing the replica ...");
487 try {
488 membershipService.leave();
489 registryService.unbind();
490 if (log.isDebugEnabled())
491 log.debug("Replica successfully released.");
492 state = S_EXIT;
493 if (MEASUREMENT) {
494 times[S_EXIT] = System.currentTimeMillis();
495 if (log.isDebugEnabled())
496 log.debug("S_EXIT@" + thisMember + "time = " + times[S_EXIT]);
497 }
498
499
500 } catch (JgroupException e) {
501 log.error("An exception while releasing a replica", e);
502 } catch (RemoteException e) {
503 log.error("An exception while unbinding replica", e);
504 } catch (NotBoundException e) {
505 log.error("An exception while unbinding replica", e);
506 }
507 }
508 }
509
510
511 }