1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21
22 import java.io.IOException;
23 import java.io.ObjectInput;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27
28 import jgroup.core.EndPoint;
29 import jgroup.relacs.mss.MssConstants;
30 import jgroup.relacs.types.Checksum;
31 import jgroup.relacs.types.EndPointImpl;
32 import jgroup.relacs.types.GroupId;
33 import jgroup.relacs.types.GroupIndex;
34 import jgroup.relacs.types.IncarnationId;
35 import jgroup.relacs.types.LocalId;
36 import jgroup.relacs.types.MessageId;
37 import jgroup.relacs.types.MessageLen;
38 import jgroup.relacs.types.VersionId;
39 import jgroup.relacs.types.ViewId;
40 import jgroup.util.Abort;
41 import jgroup.util.InMessage;
42 import jgroup.util.MsgFactory;
43 import jgroup.util.OutMessage;
44
45
46
47
48
49
50
51
52
53
54 final class MsgProp
55 implements MssConstants, DaemonMsg
56 {
57
58
59
60
61
62 private static final int START = MSS_HEADER_SIZE;
63 private static final int P_DATA =
64 START + GroupId.SIZE + (ViewId.SIZE * 2) + (MessageLen.SIZE * 4) + (Checksum.SIZE * 2);
65
66
67
68
69
70
71 long cvid;
72 long pvid;
73 int last_plen;
74 int host_plen;
75 int memb_plen;
76 int msgs_plen;
77 int last_start;
78 int host_start;
79 int memb_start;
80 int msgs_start;
81 int host_pchk;
82 int msgs_pchk;
83 InMessage stream;
84 OutMessage last_prop;
85 OutMessage host_prop;
86 OutMessage memb_prop;
87 OutMessage msgs_prop;
88
89
90
91
92
93 private OutMessage outmsg;
94
95
96
97
98
99
100
101
102
103 MsgProp(OutMessage msg)
104 throws IOException
105 {
106 stream = new InMessage(msg);
107
108 stream.seek(START);
109
110
111 int gid = GroupId.unmarshal(stream);
112 cvid = ViewId.unmarshal(stream);
113 pvid = ViewId.unmarshal(stream);
114 last_plen = MessageLen.unmarshal(stream);
115 host_plen = MessageLen.unmarshal(stream);
116 memb_plen = MessageLen.unmarshal(stream);
117 msgs_plen = MessageLen.unmarshal(stream);
118 host_pchk = Checksum.unmarshal(stream);
119 msgs_pchk = Checksum.unmarshal(stream);
120
121
122 last_start = P_DATA;
123 host_start = last_start + last_plen;
124 memb_start = host_start + host_plen;
125 msgs_start = memb_start + memb_plen;
126 }
127
128
129
130
131
132
133
134
135
136 MsgProp(ObjectInput stream)
137 throws IOException
138 {
139 this.stream = (InMessage) stream;
140
141
142
143
144
145
146
147
148 cvid = ViewId.unmarshal(stream);
149 pvid = ViewId.unmarshal(stream);
150 last_plen = MessageLen.unmarshal(stream);
151 host_plen = MessageLen.unmarshal(stream);
152 memb_plen = MessageLen.unmarshal(stream);
153 msgs_plen = MessageLen.unmarshal(stream);
154 host_pchk = Checksum.unmarshal(stream);
155 msgs_pchk = Checksum.unmarshal(stream);
156
157
158 last_start = P_DATA;
159 host_start = last_start + last_plen;
160 memb_start = host_start + host_plen;
161 msgs_start = memb_start + memb_plen;
162 }
163
164
165
166
167
168
169 MsgProp()
170 {
171
172 last_prop = new OutMessage(256);
173 host_prop = new OutMessage(256);
174 memb_prop = new OutMessage(256);
175 msgs_prop = new OutMessage(256);
176 }
177
178
179
180
181
182
183 static MsgProp unmarshal(ObjectInput stream)
184 throws IOException
185 {
186 return new MsgProp(stream);
187 }
188
189
190
191
192
193
194
195
196 void encodeLastProp(EndPoint[] hosts)
197 throws IOException
198 {
199 last_prop.reset();
200 GroupIndex.marshal(last_prop, hosts.length);
201 for (int i = 0; i < hosts.length; i++)
202 hosts[i].writeExternal(last_prop);
203 last_plen = GroupIndex.SIZE + hosts.length * EndPointImpl.SIZE;
204 }
205
206
207
208
209
210
211
212
213 void encodeHostProp(Collection estimate)
214 throws IOException
215 {
216 Iterator iterator;
217
218 host_prop.reset();
219 GroupIndex.marshal(host_prop, estimate.size());
220 iterator = estimate.iterator();
221 while (iterator.hasNext()) {
222 HostData scan = (HostData) iterator.next();
223 scan.getEndPoint().writeExternal(host_prop);
224 }
225 iterator = estimate.iterator();
226 while (iterator.hasNext()) {
227 HostData scan = (HostData) iterator.next();
228 IncarnationId.marshal(host_prop, scan.getHost().getIncarnationId());
229 }
230 iterator = estimate.iterator();
231 while (iterator.hasNext()) {
232 HostData scan = (HostData) iterator.next();
233 VersionId.marshal(host_prop, scan.getAgreed());
234 }
235 host_pchk = host_prop.computeChecksum();
236 host_plen = GroupIndex.SIZE + estimate.size() *
237 (EndPointImpl.SIZE + IncarnationId.SIZE + VersionId.SIZE);
238 }
239
240
241
242
243
244
245
246
247
248 void encodeMembProp(Map members, int size)
249 throws IOException
250 {
251 memb_prop.reset();
252 GroupIndex.marshal(memb_prop, size);
253 for (Iterator iter = members.values().iterator(); iter.hasNext();) {
254 MemberData md = (MemberData) iter.next();
255 if (!md.isLeaving() && !md.isCrashed()) {
256 md.getMemberId().getLocalId().writeExternal(memb_prop);
257 }
258 }
259 memb_plen = GroupIndex.SIZE + size * LocalId.SIZE;
260 }
261
262
263
264
265
266
267 void encodeMsgsProp(int[] delivered)
268 throws IOException
269 {
270 msgs_prop.reset();
271 for (int i=0; i < delivered.length; i++) {
272 MessageId.marshal(msgs_prop, delivered[i]);
273 }
274 msgs_pchk = msgs_prop.computeChecksum();
275 msgs_plen = delivered.length * MessageId.SIZE;
276 }
277
278
279
280
281
282
283
284 MsgProp marshal(int gid, long cvid, long pvid)
285 throws IOException
286 {
287
288 int size = P_DATA + last_plen + host_plen + memb_plen + msgs_plen;
289
290 outmsg = MsgFactory.get(size, 1);
291 outmsg.seek(START);
292
293 GroupId.marshal(outmsg, gid);
294 ViewId.marshal(outmsg, cvid);
295 ViewId.marshal(outmsg, pvid);
296 MessageLen.marshal(outmsg, last_plen);
297 MessageLen.marshal(outmsg, host_plen);
298 MessageLen.marshal(outmsg, memb_plen);
299 MessageLen.marshal(outmsg, msgs_plen);
300 Checksum.marshal(outmsg, host_pchk);
301 Checksum.marshal(outmsg, msgs_pchk);
302
303 outmsg.write(last_prop);
304 outmsg.write(host_prop);
305 outmsg.write(memb_prop);
306 outmsg.write(msgs_prop);
307
308 return this;
309 }
310
311
312
313
314
315
316
317
318
319
320 boolean checkEstimate(MsgProp msg)
321 {
322 if (msg == null || host_plen != msg.host_plen || host_pchk != msg.host_pchk)
323 return false;
324 return stream.compare(host_start, msg.stream, msg.host_start, host_plen);
325 }
326
327
328
329
330
331
332 boolean checkMessages(MsgProp msg)
333 {
334 if (msg == null)
335 return false;
336
337 if (pvid == msg.pvid) {
338 if (msgs_plen != msg.msgs_plen || msgs_pchk != msg.msgs_pchk) {
339 Group.log.debug("MSGS NOT EQUAL");
340 return false;
341 }
342 boolean cmp = stream.compare(msgs_start, msg.stream, msg.msgs_start, msgs_plen);
343 if (!cmp)
344 Group.log.debug("STREAMS NOT EQUAL");
345 else
346 Group.log.debug("STREAMS EQUAL");
347 return cmp;
348
349 }
350 Group.log.debug("MSGS EQUAL");
351 return true;
352 }
353
354
355
356
357
358
359
360
361 EndPoint[] decodeLastProp()
362 throws IOException, ClassNotFoundException
363 {
364 stream.seek(last_start);
365 int len = GroupIndex.unmarshal(stream);
366 EndPoint[] ret = new EndPoint[len];
367 for (int i=0; i < len; i++) {
368 ret[i] = new EndPointImpl();
369 ret[i].readExternal(stream);
370 }
371 return ret;
372 }
373
374
375
376
377 EndPoint[] decodeHostProp_hosts()
378 throws IOException, ClassNotFoundException
379 {
380 stream.seek(host_start);
381 int len = GroupIndex.unmarshal(stream);
382 EndPoint[] ret = new EndPoint[len];
383 for (int i=0; i < len; i++) {
384 ret[i] = new EndPointImpl();
385 ret[i].readExternal(stream);
386 }
387 return ret;
388 }
389
390
391
392
393
394 int[] decodeHostProp_incarns()
395 throws IOException
396 {
397 stream.seek(host_start);
398 int len = GroupIndex.unmarshal(stream);
399 int[] ret = new int[len];
400 stream.seek(host_start + GroupIndex.SIZE + len * EndPointImpl.SIZE);
401 for (int i=0; i < len; i++)
402 ret[i] = IncarnationId.unmarshal(stream);
403 return ret;
404 }
405
406
407
408
409
410 int[] decodeHostProp_agreed()
411 throws IOException
412 {
413 stream.seek(host_start);
414 int len = GroupIndex.unmarshal(stream);
415 int[] ret = new int[len];
416 stream.seek(host_start + GroupIndex.SIZE + len * (EndPointImpl.SIZE + IncarnationId.SIZE));
417 for (int i=0; i < len; i++)
418 ret[i] = VersionId.unmarshal(stream);
419 return ret;
420 }
421
422
423
424
425 LocalId[] decodeMembProp()
426 throws IOException, ClassNotFoundException
427 {
428 stream.seek(memb_start);
429 int len = GroupIndex.unmarshal(stream);
430 LocalId[] ret = new LocalId[len];
431 for (int i=0; i < len; i++) {
432 ret[i] = new LocalId();
433 ret[i].readExternal(stream);
434 }
435 return ret;
436 }
437
438
439
440
441
442 private int[] decodeMsgsProp()
443 throws IOException
444 {
445 stream.seek(msgs_start);
446 int len = msgs_plen / MessageId.SIZE;
447 int[] delivered = new int[len];
448 for (int i = 0; i < delivered.length; i++) {
449 delivered[i] = MessageId.unmarshal(stream);
450 }
451 return delivered;
452 }
453
454
455
456
457
458
459 public int size()
460 {
461
462
463 return outmsg.size();
464 }
465
466 public OutMessage getOutMessage()
467 {
468 return outmsg;
469 }
470
471
472
473
474
475
476
477
478
479 public String toString()
480 {
481 StringBuilder buffer = new StringBuilder();
482 buffer.append("[Proposal:");
483 try {
484 EndPoint[] endpoints = decodeHostProp_hosts();
485 buffer.append(" hosts=[");
486 buffer.append(endpoints.length);
487 buffer.append("]{");
488 for (int i=0; i < endpoints.length; i++) {
489 buffer.append(endpoints[i]);
490 if (i < endpoints.length-1)
491 buffer.append(", ");
492 }
493 } catch (Exception e) {
494 Abort.exit("Error unmarshalling MsgProp; host proposal host values", e);
495 }
496 try {
497 int[] incarns = decodeHostProp_incarns();
498 buffer.append("}, incarns=[");
499 buffer.append(incarns.length);
500 buffer.append("]{");
501 for (int i=0; i < incarns.length; i++) {
502 buffer.append(incarns[i]);
503 if (i < incarns.length-1)
504 buffer.append(", ");
505 }
506 } catch (Exception e) {
507 Abort.exit("Error unmarshalling MsgProp; host proposal incarnation values", e);
508 }
509 try {
510 int[] agreed = decodeHostProp_agreed();
511 buffer.append("}, agreed=[");
512 buffer.append(agreed.length);
513 buffer.append("]{");
514 for (int i=0; i < agreed.length; i++) {
515 buffer.append(agreed[i]);
516 if (i < agreed.length-1)
517 buffer.append(", ");
518 }
519 } catch (Exception e) {
520 Abort.exit("Error unmarshalling MsgProp; host proposal agreed values", e);
521 }
522 try {
523 int[] delivered = decodeMsgsProp();
524 buffer.append("}, delivered=[");
525 buffer.append(delivered.length);
526 buffer.append("]{");
527 for (int i=0; i < delivered.length; i++) {
528 buffer.append(delivered[i]);
529 if (i < delivered.length-1)
530 buffer.append(", ");
531 }
532 } catch (Exception e) {
533 Abort.exit("Error unmarshalling MsgProp; host proposal agreed values", e);
534 }
535 buffer.append("}, cvid=" + cvid);
536 buffer.append(", pvid=" + pvid);
537 buffer.append(", last_plen=" + last_plen);
538 buffer.append(", host_plen=" + host_plen);
539 buffer.append(", memb_plen=" + memb_plen);
540 buffer.append(", msgs_plen=" + msgs_plen);
541 buffer.append(", last_start=" + last_start);
542 buffer.append(", host_start=" + host_start);
543 buffer.append(", memb_start=" + memb_start);
544 buffer.append(", msgs_start=" + msgs_start);
545 buffer.append(", host_phck=" + host_pchk);
546 buffer.append(", msgs_phck=" + msgs_pchk);
547 try {
548 EndPoint[] endpoints = decodeLastProp();
549 buffer.append(", lastprop=[");
550 buffer.append(endpoints.length);
551 buffer.append("]{");
552 for (int i=0; i < endpoints.length; i++) {
553 buffer.append(endpoints[i]);
554 if (i < endpoints.length-1)
555 buffer.append(", ");
556 }
557 } catch (Exception e) {
558 Abort.exit("Error unmarshalling MsgProp; last proposal", e);
559 }
560 buffer.append("}");
561 return buffer.toString();
562 }
563
564 }