1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.arm;
20
21 import static jgroup.util.log.ViewEvent.Type.ARM;
22
23 import java.io.Serializable;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantLock;
28
29 import jgroup.core.MemberId;
30 import jgroup.core.MemberTable;
31 import jgroup.core.View;
32 import jgroup.core.arm.RecoveryStrategy;
33 import jgroup.relacs.config.AppConfig;
34 import jgroup.relacs.config.Host;
35 import jgroup.relacs.config.HostSet;
36 import jgroup.util.log.Eventlogger;
37 import jgroup.util.log.ViewEvent;
38
39 import org.apache.log4j.Logger;
40
41
42
43
44 public class GroupData
45 {
46
47
48
49
50
51
52 private static final Logger log = Logger.getLogger(GroupData.class);
53
54
55
56
57
58
59
60 private transient int awaitViewPeriod;
61
62
63
64
65
66
67
68 private final AppConfig app;
69
70
71 private final MemberTable membertable = new MemberTable();
72
73
74 private HostSet assignedHosts = new HostSet();
75
76
77 private final HostSet removedHosts = new HostSet();
78
79
80 private final HostSet newHosts = new HostSet();
81
82
83 private Timer viewMonitorTimer = null;
84
85
86 private boolean iamLeader;
87
88
89 private View view;
90
91
92 private final Lock lock = new ReentrantLock();
93
94
95
96
97
98
99 public GroupData(AppConfig app, int awaitViewPeriod)
100 {
101 this.app = app;
102 this.awaitViewPeriod = awaitViewPeriod;
103 }
104
105
106
107
108
109
110 void addHosts(HostSet hosts)
111 {
112 if (assignedHosts.isEmpty()) {
113 this.assignedHosts = hosts;
114 newHosts.addHosts(hosts);
115 } else {
116 for (Host host : hosts) {
117 addHost(host);
118 }
119 }
120 if (log.isDebugEnabled()) {
121 log.debug("assignedHosts: " + assignedHosts);
122 log.debug(" newHosts: " + newHosts);
123 }
124 }
125
126 void addHost(Host newHost)
127 {
128 if (log.isDebugEnabled())
129 log.debug("adding: " + newHost);
130 if (!removedHosts.containsHost(newHost)) {
131 assignedHosts.addHost(newHost);
132 newHosts.addHost(newHost);
133 ReplicaCount.updateHost(newHost);
134 }
135 if (log.isDebugEnabled()) {
136 log.debug("assignedHosts: " + assignedHosts);
137 log.debug(" newHosts: " + newHosts);
138 }
139 }
140
141 void removeHost(Host host)
142 {
143 if (log.isDebugEnabled())
144 log.debug("removing: " + host);
145 removedHosts.addHost(host);
146 boolean removed = assignedHosts.removeHost(host);
147 if (removed) {
148
149
150
151
152 ReplicaCount.removeReplica(host);
153 }
154 if (log.isDebugEnabled()) {
155 log.debug("assignedHosts: " + assignedHosts);
156 log.debug(" removedHosts: " + removedHosts);
157 }
158 }
159
160 HostSet getAssignedHosts()
161 {
162 return assignedHosts;
163 }
164
165 void viewChangeEvent(View view)
166 {
167
168 app.viewChange(view);
169 logStatus();
170
171
172
173
174
175 if (Eventlogger.ENABLED || log.isDebugEnabled()) {
176 ViewEvent viewEvent = new ViewEvent(ARM, view);
177 if (Eventlogger.ENABLED)
178 Eventlogger.logEventFlush(viewEvent);
179 if (log.isDebugEnabled())
180 log.debug(viewEvent.shortToString());
181 }
182
183 scheduleViewMonitor();
184 if (iamLeader) {
185 System.out.println(view);
186 }
187 }
188
189 void viewChange(View view, boolean iamLeader)
190 {
191 this.view = view;
192 this.iamLeader = iamLeader;
193 if (log.isDebugEnabled())
194 log.debug("Updating group: " + app.getGroupId());
195 membertable.viewChange(view);
196 MemberId[] members = membertable.members();
197 for (int i=0; i < members.length; i++) {
198 switch (membertable.getStatus(members[i])) {
199 case MemberTable.NEWMEMBER:
200
201 if (log.isDebugEnabled())
202 log.debug("New member " + members[i]);
203 membertable.put(members[i], new HostSet());
204 break;
205 case MemberTable.RECOVERING:
206
207 if (log.isDebugEnabled())
208 log.debug("New incarnation " + members[i]);
209 membertable.put(members[i], new HostSet());
210 break;
211 case MemberTable.SURVIVED:
212
213 if (newHosts.isEmpty())
214 break;
215 if (log.isDebugEnabled())
216 log.debug("Inserting new hosts: " + newHosts);
217 HostSet hosts = (HostSet) membertable.get(members[i]);
218 if (log.isDebugEnabled())
219 log.debug("Before: " + members[i] + " = " + hosts);
220 hosts.addHosts(newHosts);
221 if (log.isDebugEnabled())
222 log.debug("After : " + members[i] + " = " + hosts);
223 break;
224 }
225 }
226 newHosts.removeAllHosts();
227 }
228
229
230
231
232
233
234 public static class GroupMergeRecord
235 implements Serializable
236 {
237
238 private static final long serialVersionUID = 932291238562617927L;
239
240
241
242
243
244
245
246 private AppConfig app;
247
248
249
250
251 private HostSet removedHosts;
252
253
254
255
256 private HostSet hosts;
257
258 GroupMergeRecord(AppConfig app, HostSet removedHosts, HostSet hosts)
259 {
260 this.app = app;
261 this.removedHosts = (HostSet) removedHosts.clone();
262
263 this.hosts = hosts;
264 if (log.isDebugEnabled()) {
265 log.debug("getState: gid: " + getGroupId() +
266 ", toadd: " + hosts +
267 ", toremove: " + removedHosts);
268 }
269 }
270
271 public int getGroupId()
272 {
273 return app.getGroupId();
274 }
275
276 }
277
278 GroupMergeRecord getMergeRecord(MemberId[] dests, MemberId me)
279 {
280 synchronized (this) {
281 return new GroupMergeRecord(app, removedHosts, getState(dests, me));
282 }
283 }
284
285 void putMergeRecord(GroupMergeRecord merge)
286 {
287 if (log.isDebugEnabled()) {
288 logStatus();
289 log.debug("putState: gid: " + merge.getGroupId());
290 log.debug("toremove: " + merge.removedHosts);
291 log.debug(" toadd: " + merge.hosts);
292 }
293 putState(merge.removedHosts, merge.hosts);
294 logStatus();
295 }
296
297 private HostSet getState(MemberId[] dests, MemberId me)
298 {
299 HostSet hosts = ((HostSet) membertable.get(me));
300 hosts = (HostSet) hosts.clone();
301 if (log.isDebugEnabled())
302 log.debug("Starting slist: " + hosts);
303 for (int i = 0; i < dests.length; i++) {
304 if (dests[i].equals(me))
305 hosts.removeHosts((HostSet) membertable.get(dests[i]));
306 }
307 if (log.isDebugEnabled())
308 log.debug("Local removed: " + removedHosts);
309 hosts.removeHosts(removedHosts);
310 if (log.isDebugEnabled())
311 log.debug("Final: " + hosts);
312 return hosts;
313 }
314
315 synchronized private void putState(HostSet toremove, HostSet toadd)
316 {
317 toremove.removeHosts(removedHosts);
318 removedHosts.addHosts(toremove);
319 for (Host host : toremove) {
320 if (log.isDebugEnabled())
321 log.debug("Removing " + host + " from assigned host set");
322 boolean removed = assignedHosts.removeHost(host);
323 if (removed)
324 ReplicaCount.removeReplica(host);
325 }
326
327 for (Host host : toadd) {
328 if (!removedHosts.containsHost(host) && !assignedHosts.containsHost(host)) {
329 if (log.isDebugEnabled())
330 log.debug("Adding " + host + " to assigned host set");
331 assignedHosts.addHost(host);
332
333 newHosts.addHost(host);
334 ReplicaCount.updateHost(host);
335 }
336 }
337 }
338
339
340
341
342
343
344 public void logStatus()
345 {
346 if (log.isDebugEnabled()) {
347 log.debug("GroupData: " + app);
348 log.debug(" assignedHosts=" + assignedHosts);
349 log.debug(" removedHosts =" + removedHosts);
350 log.debug(" newHosts =" + newHosts);
351 log.debug(" ViewMonitor is " + (viewMonitorTimer != null ? "" : "not ") + "scheduled");
352 }
353 }
354
355
356
357
358
359
360
361
362
363 public void cancelViewMonitor()
364 {
365 lock.lock();
366 try {
367 if (viewMonitorTimer != null) {
368 viewMonitorTimer.cancel();
369 viewMonitorTimer = null;
370 }
371 } finally {
372 lock.unlock();
373 }
374 }
375
376
377
378
379
380 public void scheduleViewMonitor()
381 {
382 lock.lock();
383 try {
384 if (viewMonitorTimer != null) {
385
386 viewMonitorTimer.cancel();
387 }
388 if (app.needsRecovery()) {
389
390 viewMonitorTimer = new Timer("ViewMonitor-" + app.getGroupId(), true);
391 if (log.isDebugEnabled()) {
392 log.debug("View monitor expires in " + awaitViewPeriod + "ms for " + app);
393 }
394 viewMonitorTimer.schedule(new ViewMonitor(), awaitViewPeriod);
395 } else {
396
397 viewMonitorTimer = null;
398 if (log.isDebugEnabled()) {
399 log.debug("View monitor removed for " + app);
400 logStatus();
401 }
402 }
403 } finally {
404 lock.unlock();
405 }
406 }
407
408
409
410
411
412
413
414
415
416 private class ViewMonitor
417 extends TimerTask
418 {
419
420
421
422
423 public void run()
424 {
425
426
427
428 cancelViewMonitor();
429 if (log.isDebugEnabled()) {
430 log.debug("View monitor expired for " + app);
431 }
432 if (!iamLeader) {
433 if (log.isDebugEnabled())
434 log.debug("I am not the leader RM replica; rescheduling view monitor");
435 scheduleViewMonitor();
436 return;
437 } else if (view.size() == 1) {
438 MemberId myId = view.getLeader();
439 if (membertable.getStatus(myId) == MemberTable.NEWMEMBER) {
440 if (log.isDebugEnabled())
441 log.info("I am a single new member; rescheduling view monitor");
442 scheduleViewMonitor();
443 return;
444 }
445 }
446
447 RecoveryStrategy rs = app.getRecoveryStrategy();
448 boolean recovered = rs.handleFailure(assignedHosts);
449 if (recovered) {
450 if (log.isDebugEnabled())
451 log.debug("Successful recovery for " + app);
452 scheduleViewMonitor();
453 } else {
454 if (log.isDebugEnabled())
455 log.warn("Unsuccessful recovery for " + app);
456 }
457 }
458
459 }
460
461 }