View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  
22  import java.io.IOException;
23  import java.io.ObjectInput;
24  import java.util.Collection;
25  import java.util.Iterator;
26  import java.util.Map;
27  
28  import jgroup.core.EndPoint;
29  import jgroup.relacs.mss.MssConstants;
30  import jgroup.relacs.types.Checksum;
31  import jgroup.relacs.types.EndPointImpl;
32  import jgroup.relacs.types.GroupId;
33  import jgroup.relacs.types.GroupIndex;
34  import jgroup.relacs.types.IncarnationId;
35  import jgroup.relacs.types.LocalId;
36  import jgroup.relacs.types.MessageId;
37  import jgroup.relacs.types.MessageLen;
38  import jgroup.relacs.types.VersionId;
39  import jgroup.relacs.types.ViewId;
40  import jgroup.util.Abort;
41  import jgroup.util.InMessage;
42  import jgroup.util.MsgFactory;
43  import jgroup.util.OutMessage;
44  
45  // FIXME Reorganize this class in order to be more readable; we should
46  // divide it in a "Proposal" class and an MsgProp class
47  
48  /**
49   * The <code> MsgProp</code> class
50   *
51   * @author  Alberto Montresor
52   * @since   Jgroup 0.1
53   */
54  final class MsgProp
55    implements MssConstants, DaemonMsg
56  {
57  
58    ////////////////////////////////////////////////////////////////////////////////////////////
59    // Position constants
60    ////////////////////////////////////////////////////////////////////////////////////////////
61  
62    private static final int START  = MSS_HEADER_SIZE;
63    private static final int P_DATA = 
64      START + GroupId.SIZE + (ViewId.SIZE * 2) + (MessageLen.SIZE * 4) + (Checksum.SIZE * 2);
65  
66  
67    ////////////////////////////////////////////////////////////////////////////////////////////
68    // Fields
69    ////////////////////////////////////////////////////////////////////////////////////////////
70  
71    long                  cvid;           // Last complete view identifier
72    long                  pvid;           // Last partial view identifier
73    int                   last_plen;      // last_prop field length (in bytes)
74    int                   host_plen;      // host_prop field length (in bytes)
75    int                   memb_plen;      // memb_prop field length (in bytes)
76    int                   msgs_plen;      // msgs_prop field length (in bytes)
77    int                   last_start;     // last_prop field start position
78    int                   host_start;     // host_prop field start position
79    int                   memb_start;     // memb_prop field start position
80    int                   msgs_start;     // msgs_prop field start position
81    int                   host_pchk;      // checksum of host_prop
82    int                   msgs_pchk;      // checksum of msgs_prop
83    InMessage             stream;         // Message stream of the message
84    OutMessage            last_prop;      // Proposal section: last complete view
85    OutMessage            host_prop;      // Proposal section: view composition (hosts)
86    OutMessage            memb_prop;      // Proposal section: view composition (members)
87    OutMessage            msgs_prop;      // Proposal section: delivered messages
88  
89    /*
90     * The full proposal message stream, as returned through the
91     * DaemonMsg interface.
92     */
93    private OutMessage outmsg;
94  
95  
96    ////////////////////////////////////////////////////////////////////////////////////////////
97    // Constructors
98    ////////////////////////////////////////////////////////////////////////////////////////////
99  
100   /**
101    * Creates a proposal from local delivery
102    */
103   MsgProp(OutMessage msg)
104     throws IOException
105   {
106     stream = new InMessage(msg);
107     //FIXED HEIN: Added the seek here, to ensure that it reads the right stuff!
108     stream.seek(START);
109 
110     // Read single fields
111     int gid   = GroupId.unmarshal(stream);
112     cvid      = ViewId.unmarshal(stream);
113     pvid      = ViewId.unmarshal(stream);
114     last_plen = MessageLen.unmarshal(stream);
115     host_plen = MessageLen.unmarshal(stream);
116     memb_plen = MessageLen.unmarshal(stream);
117     msgs_plen = MessageLen.unmarshal(stream);
118     host_pchk = Checksum.unmarshal(stream);
119     msgs_pchk = Checksum.unmarshal(stream);
120 
121     // Computes starting positions
122     last_start = P_DATA;
123     host_start = last_start + last_plen;
124     memb_start = host_start + host_plen;
125     msgs_start = memb_start + memb_plen;
126   }
127   
128   
129   /**
130    *  Creates a new instance of <code>MsgProp</code> starting from a
131    *  m-received message.
132    *
133    *  @param stream
134    *    The m-received message.
135    */
136   MsgProp(ObjectInput stream)
137     throws IOException
138   {
139     this.stream = (InMessage) stream;
140     /*
141      * Note that we assume that the provided stream has unmarshalled the
142      * gid value before invoking this constructor.  This is since it is
143      * m-received through the daemon, and thus differs from the locally
144      * delivered proposals (see constructor above).
145      */
146 
147     // Read single fields
148     cvid      = ViewId.unmarshal(stream);
149     pvid      = ViewId.unmarshal(stream);
150     last_plen = MessageLen.unmarshal(stream);
151     host_plen = MessageLen.unmarshal(stream);
152     memb_plen = MessageLen.unmarshal(stream);
153     msgs_plen = MessageLen.unmarshal(stream);
154     host_pchk = Checksum.unmarshal(stream);
155     msgs_pchk = Checksum.unmarshal(stream);
156 
157     // Computes starting positions
158     last_start = P_DATA;
159     host_start = last_start + last_plen;
160     memb_start = host_start + host_plen;
161     msgs_start = memb_start + memb_plen;
162   }
163 
164 
165   /**
166    *  Creates a new instance of <code>MsgProp</code> acting as a
167    *  container for a proposal.
168    */
169   MsgProp()
170   {
171     //FIXME HEIN: Should these payload values be static?
172     last_prop   = new OutMessage(256);
173     host_prop   = new OutMessage(256);
174     memb_prop   = new OutMessage(256);
175     msgs_prop   = new OutMessage(256);
176   }
177 
178 
179   ////////////////////////////////////////////////////////////////////////////////////////////
180   // Encoding methods
181   ////////////////////////////////////////////////////////////////////////////////////////////
182 
183   static MsgProp unmarshal(ObjectInput stream)
184     throws IOException
185   {
186     return new MsgProp(stream);
187   }
188   
189   
190   /**
191    *  Encodes the last complete view and stores it into the specified group.
192    *
193    *  @param    cview   last complete view
194    *  @param    length  actual length of cview
195    */
196   void encodeLastProp(EndPoint[] hosts)
197     throws IOException
198   {
199     last_prop.reset();
200     GroupIndex.marshal(last_prop, hosts.length);
201     for (int i = 0; i < hosts.length; i++)
202       hosts[i].writeExternal(last_prop);
203     last_plen = GroupIndex.SIZE + hosts.length * EndPointImpl.SIZE;
204   }
205 
206   /**
207    *  Encodes the next view estimate (hosts) and stores it into the
208    *  specified group.
209    *
210    *  @param     hdata   estimate array
211    *  @param     hsize   actuale length of the estimate array
212    */
213   void encodeHostProp(Collection estimate)
214     throws IOException
215   {
216     Iterator iterator;
217     
218     host_prop.reset();
219     GroupIndex.marshal(host_prop, estimate.size());
220     iterator = estimate.iterator();
221     while (iterator.hasNext()) {
222       HostData scan = (HostData) iterator.next();
223       scan.getEndPoint().writeExternal(host_prop);
224     }
225     iterator = estimate.iterator();
226     while (iterator.hasNext()) {
227       HostData scan = (HostData) iterator.next();
228       IncarnationId.marshal(host_prop, scan.getHost().getIncarnationId());
229     }
230     iterator = estimate.iterator();
231     while (iterator.hasNext()) {
232       HostData scan = (HostData) iterator.next();
233       VersionId.marshal(host_prop, scan.getAgreed());
234     }
235     host_pchk = host_prop.computeChecksum();
236     host_plen = GroupIndex.SIZE + estimate.size() * 
237       (EndPointImpl.SIZE + IncarnationId.SIZE + VersionId.SIZE);
238   }
239 
240 
241   /**
242    *  Encodes the next view estimate (local members) and stores it into the
243    *  specified group.
244    *
245    *  @param members     list of members
246    *  @param msize       number of members in the current estimate
247    */
248   void encodeMembProp(Map members, int size)
249     throws IOException
250   {
251     memb_prop.reset();
252     GroupIndex.marshal(memb_prop, size);
253     for (Iterator iter = members.values().iterator(); iter.hasNext();) {
254       MemberData md = (MemberData) iter.next();
255       if (!md.isLeaving() && !md.isCrashed()) {
256         md.getMemberId().getLocalId().writeExternal(memb_prop);
257       }
258     }
259     memb_plen = GroupIndex.SIZE + size * LocalId.SIZE;
260   }
261 
262   /**
263    *  Encodes the information about delivered messages and stores it
264    *  into the specified group.
265    *
266    */
267   void encodeMsgsProp(int[] delivered)
268     throws IOException
269   {
270     msgs_prop.reset();
271     for (int i=0; i < delivered.length; i++) {
272       MessageId.marshal(msgs_prop, delivered[i]);
273     }
274     msgs_pchk = msgs_prop.computeChecksum();
275     msgs_plen = delivered.length * MessageId.SIZE;
276   }
277 
278 
279   /**
280    *  Encodes a MsgProp message starting from raw data.
281    *
282    *  @param     group   object from which raw data are extracted.
283    */
284   MsgProp marshal(int gid, long cvid, long pvid)
285     throws IOException
286   {
287     // Allocates data buffer
288     int size = P_DATA + last_plen + host_plen + memb_plen + msgs_plen;
289     /* A proposal is only sent to the coordinator, thus 1 receiver only. */
290     outmsg = MsgFactory.get(size, 1);
291     outmsg.seek(START);
292 
293     GroupId.marshal(outmsg, gid);
294     ViewId.marshal(outmsg, cvid);
295     ViewId.marshal(outmsg, pvid);
296     MessageLen.marshal(outmsg, last_plen);
297     MessageLen.marshal(outmsg, host_plen);
298     MessageLen.marshal(outmsg, memb_plen);
299     MessageLen.marshal(outmsg, msgs_plen);
300     Checksum.marshal(outmsg, host_pchk);
301     Checksum.marshal(outmsg, msgs_pchk);
302 
303     outmsg.write(last_prop);
304     outmsg.write(host_prop);
305     outmsg.write(memb_prop);
306     outmsg.write(msgs_prop);
307 
308     return this;
309   }
310 
311   ////////////////////////////////////////////////////////////////////////////////////////////
312   // Equality methods
313   ////////////////////////////////////////////////////////////////////////////////////////////
314 
315   /**
316    * Compares the estimates part of two instances of MsgProp
317    *
318    * @param     msg     the reference MsgProp message with which to compare.
319    */
320   boolean checkEstimate(MsgProp msg)
321   {
322     if (msg == null || host_plen != msg.host_plen || host_pchk != msg.host_pchk)
323       return false;
324     return stream.compare(host_start, msg.stream, msg.host_start, host_plen);
325   }
326 
327   /**
328    * Compares the messages part of two instances of MsgProp
329    *
330    * @param     msg     the reference MsgProp message with which to compare.
331    */
332   boolean checkMessages(MsgProp msg)
333   {
334     if (msg == null)
335       return false;
336     //FIXME CLEANUP debugging code
337     if (pvid == msg.pvid) {
338       if (msgs_plen != msg.msgs_plen || msgs_pchk != msg.msgs_pchk) {
339         Group.log.debug("MSGS NOT EQUAL");
340         return false;
341       }
342       boolean cmp = stream.compare(msgs_start, msg.stream, msg.msgs_start, msgs_plen);
343       if (!cmp)
344         Group.log.debug("STREAMS NOT EQUAL");
345       else
346         Group.log.debug("STREAMS EQUAL");
347       return cmp;
348 //      return stream.compare(msgs_start, msg.stream, msg.msgs_start, msgs_plen);
349     }
350     Group.log.debug("MSGS EQUAL");
351     return true;
352   }
353 
354   ////////////////////////////////////////////////////////////////////////////////////////////
355   // Decoding methods
356   ////////////////////////////////////////////////////////////////////////////////////////////
357 
358   /**
359    *  Decodes the last complete view.
360    */
361   EndPoint[] decodeLastProp()
362     throws IOException, ClassNotFoundException
363   {
364     stream.seek(last_start);
365     int len = GroupIndex.unmarshal(stream);
366     EndPoint[] ret = new EndPoint[len];
367     for (int i=0; i < len; i++) {
368       ret[i] = new EndPointImpl();
369       ret[i].readExternal(stream);
370     }
371     return ret;
372   }
373 
374   /**
375    *  Decodes the estimate for the next complete view.
376    */
377   EndPoint[] decodeHostProp_hosts()
378     throws IOException, ClassNotFoundException
379   {
380     stream.seek(host_start);
381     int len = GroupIndex.unmarshal(stream);
382     EndPoint[] ret = new EndPoint[len];
383     for (int i=0; i < len; i++) {
384       ret[i] = new EndPointImpl();
385       ret[i].readExternal(stream);
386     }
387     return ret;
388   }
389 
390 
391   /**
392    * Decodes the agreed version numbers for the next complete view.
393    */
394   int[] decodeHostProp_incarns()
395     throws IOException
396   {
397     stream.seek(host_start);
398     int len = GroupIndex.unmarshal(stream);
399     int[] ret = new int[len];
400     stream.seek(host_start + GroupIndex.SIZE + len * EndPointImpl.SIZE);
401     for (int i=0; i < len; i++)
402       ret[i] = IncarnationId.unmarshal(stream);
403     return ret;
404   }
405 
406 
407   /**
408    * Decodes the agreed version numbers for the next complete view.
409    */
410   int[] decodeHostProp_agreed()
411     throws IOException
412   {
413     stream.seek(host_start);
414     int len = GroupIndex.unmarshal(stream);
415     int[] ret = new int[len];
416     stream.seek(host_start + GroupIndex.SIZE + len * (EndPointImpl.SIZE + IncarnationId.SIZE));
417     for (int i=0; i < len; i++)
418       ret[i] = VersionId.unmarshal(stream);
419     return ret;
420   }
421 
422   /**
423    * Decodes the list of local members contained in the proposal.
424    */
425   LocalId[] decodeMembProp()
426     throws IOException, ClassNotFoundException
427   {
428     stream.seek(memb_start);
429     int len = GroupIndex.unmarshal(stream);
430     LocalId[] ret = new LocalId[len];
431     for (int i=0; i < len; i++) {
432       ret[i] = new LocalId();
433       ret[i].readExternal(stream);
434     }
435     return ret;
436   }
437 
438   /**
439    * Decodes the list of messages delivered by the member who prepared
440    * this proposal.
441    */
442   private int[] decodeMsgsProp()
443     throws IOException
444   {
445     stream.seek(msgs_start);
446     int len = msgs_plen / MessageId.SIZE;
447     int[] delivered = new int[len];
448     for (int i = 0; i < delivered.length; i++) {
449       delivered[i] = MessageId.unmarshal(stream);
450     }
451     return delivered;
452   }
453 
454 
455   ////////////////////////////////////////////////////////////////////////////////////////////
456   // Methods from DaemonMsg
457   ////////////////////////////////////////////////////////////////////////////////////////////
458 
459   public int size()
460   {
461     /* Note; it is ok to use the outmsg size since MsgProp will never be
462      * forwarded or resent (it is not queued). */
463     return outmsg.size();
464   }
465 
466   public OutMessage getOutMessage()
467   {
468     return outmsg;
469   }
470 
471 
472   ////////////////////////////////////////////////////////////////////////////////////////////
473   // Methods from Object
474   ////////////////////////////////////////////////////////////////////////////////////////////
475 
476   /**
477    *  Returns a string representation of this object
478    */
479   public String toString()
480   {
481     StringBuilder buffer = new StringBuilder();
482     buffer.append("[Proposal:");
483     try {
484       EndPoint[] endpoints = decodeHostProp_hosts();
485       buffer.append(" hosts=[");
486       buffer.append(endpoints.length);
487       buffer.append("]{");
488       for (int i=0; i < endpoints.length; i++) {
489         buffer.append(endpoints[i]);
490         if (i < endpoints.length-1)
491           buffer.append(", ");
492       }
493     } catch (Exception e) {
494       Abort.exit("Error unmarshalling MsgProp; host proposal host values", e);
495     }
496     try {
497       int[] incarns = decodeHostProp_incarns();
498       buffer.append("}, incarns=[");
499       buffer.append(incarns.length);
500       buffer.append("]{");
501       for (int i=0; i < incarns.length; i++) {
502         buffer.append(incarns[i]);
503         if (i < incarns.length-1)
504           buffer.append(", ");
505       }
506     } catch (Exception e) {
507       Abort.exit("Error unmarshalling MsgProp; host proposal incarnation values", e);
508     }
509     try {
510       int[] agreed = decodeHostProp_agreed();
511       buffer.append("}, agreed=[");
512       buffer.append(agreed.length);
513       buffer.append("]{");
514       for (int i=0; i < agreed.length; i++) {
515         buffer.append(agreed[i]);
516         if (i < agreed.length-1)
517           buffer.append(", ");
518       }
519     } catch (Exception e) {
520       Abort.exit("Error unmarshalling MsgProp; host proposal agreed values", e);
521     }
522     try {
523       int[] delivered = decodeMsgsProp();
524       buffer.append("}, delivered=[");
525       buffer.append(delivered.length);
526       buffer.append("]{");
527       for (int i=0; i < delivered.length; i++) {
528         buffer.append(delivered[i]);
529         if (i < delivered.length-1)
530           buffer.append(", ");
531       }
532     } catch (Exception e) {
533       Abort.exit("Error unmarshalling MsgProp; host proposal agreed values", e);
534     }
535     buffer.append("}, cvid=" + cvid);
536     buffer.append(", pvid=" + pvid);
537     buffer.append(", last_plen=" + last_plen);
538     buffer.append(", host_plen=" + host_plen);
539     buffer.append(", memb_plen=" + memb_plen);
540     buffer.append(", msgs_plen=" + msgs_plen);
541     buffer.append(", last_start=" + last_start);
542     buffer.append(", host_start=" + host_start);
543     buffer.append(", memb_start=" + memb_start);
544     buffer.append(", msgs_start=" + msgs_start);
545     buffer.append(", host_phck=" + host_pchk);
546     buffer.append(", msgs_phck=" + msgs_pchk);
547     try {
548       EndPoint[] endpoints = decodeLastProp();
549       buffer.append(", lastprop=[");
550       buffer.append(endpoints.length);
551       buffer.append("]{");
552       for (int i=0; i < endpoints.length; i++) {
553         buffer.append(endpoints[i]);
554         if (i < endpoints.length-1)
555           buffer.append(", ");
556       }
557     } catch (Exception e) {
558       Abort.exit("Error unmarshalling MsgProp; last proposal", e);
559     }
560     buffer.append("}");
561     return buffer.toString();
562   }
563 
564 } // END MsgProp