1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.Map;
24
25 import jgroup.core.EndPoint;
26 import jgroup.core.View;
27 import jgroup.relacs.events.DeliveryAck;
28 import jgroup.relacs.events.DeliveryEvent;
29 import jgroup.relacs.events.InstallAck;
30 import jgroup.relacs.events.InstallEvent;
31 import jgroup.relacs.mss.Mss;
32 import jgroup.relacs.types.LocalId;
33 import jgroup.util.InMessage;
34 import jgroup.util.Queue;
35 import jgroup.util.Util;
36
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45 final class ViewDescription
46 implements Tag
47 {
48
49
50
51
52
53
54 private static final Logger log = Logger.getLogger(ViewDescription.class);
55
56
57
58
59
60
61
62 private HostData[] hosts;
63
64
65 private HostViewData[] data;
66
67
68 private EndPoint[] endpoints;
69
70
71
72
73
74
75 private boolean stable;
76
77
78 private boolean containsCrashed;
79
80
81 private MemberData[] members;
82
83
84
85
86
87 private boolean[] hasInstalled;
88
89
90 private int localIndex;
91
92
93
94
95
96
97
98
99
100
101
102
103
104 ViewDescription(HostData hd, MemberData md)
105 {
106 hosts = new HostData[] { hd };
107 data = new HostViewData[] { new HostViewData(1, 1) };
108 endpoints = new EndPoint[] { hd.getEndPoint() };
109 members = new MemberData[] { md };
110 hasInstalled = new boolean[1];
111 localIndex = 0;
112 stable = true;
113 containsCrashed = false;
114 }
115
116
117
118
119
120
121
122 ViewDescription(Map thosts, EndPoint[] endpoints, EndPoint[] rset,
123 Map tmembers, LocalId[] localMembers, int localIndex)
124 {
125 this.stable = true;
126
127
128
129
130 this.hosts = new HostData[endpoints.length];
131 this.endpoints = new EndPoint[endpoints.length];
132 this.data = new HostViewData[endpoints.length];
133 this.hasInstalled = new boolean[tmembers.size()];
134 for (int i=0; i < hosts.length; i++) {
135 this.endpoints[i] = endpoints[i];
136 hosts[i] = (HostData) thosts.get(endpoints[i]);
137 if (hosts[i] == null)
138 throw new IllegalStateException("Error: installing inconsistent view");
139
140
141
142
143
144
145
146
147
148
149
150
151 data[i] = new HostViewData(endpoints.length, localMembers.length);
152
153 if (hosts[i].getVersion() > hosts[i].getAgreed()) {
154 if (log.isDebugEnabled())
155 log.debug(endpoints[i] + " version " + hosts[i].getVersion()
156 + " > agreed " + hosts[i].getAgreed() + "(unstable view)");
157 stable = false;
158 }
159 }
160
161 for (Iterator iterator = thosts.values().iterator(); iterator.hasNext(); ) {
162 HostData host = (HostData) iterator.next();
163 EndPoint endpoint = host.getEndPoint();
164 if (!host.isLeaving() && Util.in(rset, endpoint)) {
165 boolean found = false;
166 for (int i=0; i < endpoints.length && !found; i++) {
167 if (endpoints[i].equals(endpoint)) {
168 found = true;
169 }
170 }
171 if (!found) {
172 if (log.isDebugEnabled())
173 log.debug(endpoint + " excluded, but is in the reachable set (unstable view)");
174 stable = false;
175 }
176 }
177 if (log.isDebugEnabled()) {
178 log.debug(endpoint + ": is " + (host.isLeaving() ? "" : "not ")
179 + "leaving, and is " + (Util.in(rset, endpoint) ? "" : "not ")
180 + "in the reachable set.");
181 }
182 }
183 if (log.isDebugEnabled()) {
184 StringBuilder buf = new StringBuilder("endpoints: ");
185 for (int i = 0; i < endpoints.length; i++) {
186 buf.append(endpoints[i]);
187 buf.append(" ");
188 }
189 log.debug(buf);
190 buf = new StringBuilder(" reachset: ");
191 for (int i = 0; i < rset.length; i++) {
192 buf.append(rset[i]);
193 buf.append(" ");
194 }
195 log.debug(buf);
196 }
197
198
199
200
201 this.members = new MemberData[localMembers.length];
202 for (int i=0; i < localMembers.length; i++) {
203 members[i] = (MemberData) tmembers.get(localMembers[i]);
204 if (members[i] == null)
205 throw new IllegalStateException("Error: installing inconsistent view");
206 }
207
208 this.localIndex = localIndex;
209 }
210
211
212
213
214
215
216
217
218 int hostSize()
219 {
220 return hosts.length;
221 }
222
223
224
225
226 int memberSize()
227 {
228 return members.length;
229 }
230
231
232
233
234 HostData getHost(int index)
235 {
236 return hosts[index];
237 }
238
239
240
241
242 MemberData getMember(int index)
243 {
244 return members[index];
245 }
246
247
248
249
250 EndPoint[] getEndPoints()
251 {
252 return endpoints;
253 }
254
255
256
257
258 int size()
259 {
260 return endpoints.length;
261 }
262
263
264
265
266
267
268
269
270
271
272
273 boolean isDuplicate(EndPoint[] prev_cvcomp)
274 {
275 if (endpoints.length == prev_cvcomp.length) {
276 for (int i = 0; i < prev_cvcomp.length; i++) {
277 if (!prev_cvcomp[i].equals(endpoints[i])) {
278 return false;
279 }
280 }
281 return true;
282 } else {
283 return false;
284 }
285 }
286
287
288
289
290
291
292 boolean isStable()
293 {
294 return stable;
295 }
296
297
298
299
300
301
302 boolean containsCrashedMembers()
303 {
304 return containsCrashed;
305 }
306
307 int getLocalIndex()
308 {
309 return localIndex;
310 }
311
312 int getDelivered(int index)
313 {
314 log.assertLog(data.length > index, "Provided host index=" + index
315 + " is greater than data.length=" + data.length);
316 return data[index].delivered;
317 }
318
319
320
321
322 int[] getAcks()
323 {
324 int[] acks = new int[data.length];
325 for (int i=0; i < acks.length; i++) {
326 acks[i] = data[i].hack[localIndex];
327 }
328 return acks;
329 }
330
331
332
333
334 int[] getDelivered(Collection estimate)
335 {
336 int[] delivered = new int[estimate.size()];
337 int i = 0;
338 for (Iterator iter = estimate.iterator(); iter.hasNext(); i++) {
339 HostData host = (HostData) iter.next();
340 if (host.hasValidViewIndex())
341 delivered[i] = data[host.getViewIndex()].delivered;
342 else
343 delivered[i] = 0;
344 if (log.isDebugEnabled())
345 log.debug(host + ": delivered=" + delivered[i]);
346 }
347 if (log.isDebugEnabled()) {
348 StringBuilder buffer = new StringBuilder();
349 buffer.append("Delivered: ");
350 for (i=0; i < delivered.length; i++)
351 buffer.append(delivered[i] + " ");
352 log.debug(buffer.toString());
353 }
354 return delivered;
355 }
356
357
358
359
360
361
362
363
364
365 boolean isNew(int index, int mid)
366 {
367 return (mid > data[index].hack[localIndex]);
368 }
369
370
371
372
373
374 boolean localViewAck(InstallAck ack)
375 {
376 boolean viewStable = true;
377 hasInstalled[ack.getMemberIndex()] = true;
378 for (int i=0; i < members.length && viewStable; i++)
379 viewStable = (members[i].isLeaving() ||
380 members[i].isCrashed() || hasInstalled[i]);
381 return viewStable;
382 }
383
384
385
386
387 void remoteMessageAck(MsgAck msg)
388 {
389
390 log.assertLog(msg.ack.length == hosts.length, "Received incorrect ACK message");
391
392
393 for (int i=0; i < hosts.length; i++) {
394 if (data[i].hack[msg.hpos] < msg.ack[i]) {
395 data[i].hack[msg.hpos] = msg.ack[i];
396 data[i].checkAck();
397 }
398 }
399 }
400
401
402
403
404 boolean localDeliveryAck(DeliveryAck ack)
405 {
406 int mid = ack.getMessageId();
407 int hostIndex = ack.getHostIndex();
408 HostViewData sender = data[hostIndex];
409 sender.mack[ack.getMemberIndex()] = mid;
410 if (log.isDebugEnabled()) {
411 log.debug("DeliveryAck from " + hosts[hostIndex]
412 + " with mid=" + mid + ", sender.delivered=" + sender.delivered);
413 }
414 boolean carryon = true;
415
416
417
418
419
420 if (mid >= sender.delivered+1) {
421 carryon = updateDelivered(sender, hostIndex, mid);
422 }
423 if (log.isDebugEnabled()) {
424 log.debug("carryon=" + carryon);
425 }
426 return carryon;
427 }
428
429
430
431
432
433
434
435 private boolean updateDelivered(HostViewData sender, int hostIndex, int mid)
436 {
437 boolean carryon = true;
438
439
440
441
442 for (int i=0; i<members.length && carryon; i++)
443 carryon = (members[i].isCrashed() || members[i].isLeaving() || sender.mack[i] >= mid);
444 if (carryon) {
445 sender.delivered = mid;
446 if (log.isDebugEnabled())
447 log.debug("Updated delivered for " + hosts[hostIndex] + ": " + mid);
448
449
450
451 for (int i=0; i < data.length && carryon; i++) {
452 carryon = (data[i].hack[localIndex] == data[i].delivered);
453 }
454 }
455 return carryon;
456 }
457
458
459
460
461
462 void forwardMessages(Mss mss)
463 {
464
465
466
467
468 if (endpoints.length > 1) {
469 for (int i = 0; i < data.length; i++) {
470 for (Iterator iter = data[i].buffer.iterator(); iter.hasNext(); ) {
471 MsgMcast msg = (MsgMcast) iter.next();
472 if (log.isDebugEnabled())
473 log.debug("Forwarding msg " + msg.mid + " to group members");
474 mss.msend(R_FORWARD, msg, endpoints);
475 }
476 }
477 }
478 }
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493 void notifyView(int gid, View view)
494 {
495 if (log.isDebugEnabled()) {
496 log.debug(view);
497 }
498 for (int i=0; i < members.length; i++) {
499 if (!members[i].isLeaving() && !members[i].isCrashed()) {
500 InstallEvent event = new InstallEvent(gid, view, localIndex, i);
501 boolean success = DaemonInteraction.addEvent(members[i], event);
502 if (!success) {
503 members[i].setCrashed();
504 containsCrashed = true;
505 }
506 }
507 }
508 }
509
510
511
512
513
514 boolean notifyMessage(int gid, MsgMcast msg)
515 {
516 boolean newCrashed = false;
517 if (log.isDebugEnabled())
518 log.debug("notifyMessage for group: " + gid + ", msg (" + msg.mid + ")" );
519
520
521 for (int i=0; i < members.length; i++) {
522 if (!members[i].isLeaving() && !members[i].isCrashed()) {
523 InMessage inmsg = msg.getInMessage();
524 boolean success;
525 synchronized (inmsg) {
526 DeliveryEvent event = new DeliveryEvent(gid, msg.isObject, msg.mid,
527 msg.hpos, msg.sender, msg.ackr, inmsg);
528 success = DaemonInteraction.addEvent(members[i], event);
529 }
530 if (!success) {
531 members[i].setCrashed();
532 newCrashed = true;
533 containsCrashed = true;
534 } else {
535 members[i].setAck(msg.mid);
536 }
537 }
538 }
539
540
541 data[msg.hpos].hack[localIndex] = msg.mid;
542 data[msg.hpos].hack[msg.hpos] = msg.mid;
543 data[msg.hpos].buffer.insert(msg);
544
545 data[msg.hpos].checkAck();
546 return newCrashed;
547 }
548
549
550 public String toString()
551 {
552 StringBuilder b = new StringBuilder("[ViewDescription: ");
553 b.append((stable ? "stable" : "not stable"));
554 b.append(", hosts={");
555 for (int i = 0; i < hosts.length; i++) {
556 if (i != 0)
557 b.append(", ");
558 b.append(hosts[i]);
559 }
560 b.append("} delivered={");
561 for (int i = 0; i < data.length; i++) {
562 if (i != 0)
563 b.append(", ");
564 b.append(data[i].delivered);
565 }
566 b.append("}, acks={");
567 int[] acks = getAcks();
568 for (int i = 0; i < acks.length; i++) {
569 if (i != 0)
570 b.append(", ");
571 b.append(acks[i]);
572 }
573 b.append("}");
574 return b.toString();
575 }
576
577
578 private class HostViewData
579 {
580
581
582 int[] hack;
583
584
585 int[] mack;
586
587
588 int hmin;
589
590
591 int delivered;
592
593
594 Queue buffer;
595
596
597
598
599 HostViewData(int hostSize, int memberSize)
600 {
601 hack = new int[hostSize];
602 mack = new int[memberSize];
603 hmin = 0;
604 delivered = 0;
605 buffer = new Queue();
606 }
607
608
609
610
611 void checkAck()
612 {
613 int min = Integer.MAX_VALUE;
614 for (int i=0; i < hack.length; i++)
615 if (min > hack[i])
616 min = hack[i];
617 if (min > hmin) {
618 for (int i=hmin; i < min; i++)
619 buffer.removeFirst();
620 hmin = min;
621 }
622 }
623
624 }
625
626 }