View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gm;
20  
21  import static jgroup.util.log.ViewEvent.Type.Merge;
22  
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import javax.naming.ConfigurationException;
30  
31  import jgroup.core.JgroupException;
32  import jgroup.core.MemberId;
33  import jgroup.core.MemberTable;
34  import jgroup.core.MembershipListener;
35  import jgroup.core.MembershipService;
36  import jgroup.core.MergingListener;
37  import jgroup.core.MergingService;
38  import jgroup.core.View;
39  import jgroup.core.multicast.MulticastListener;
40  import jgroup.core.multicast.MulticastService;
41  import jgroup.relacs.rmi.MarshalInputStream;
42  import jgroup.relacs.rmi.MarshalOutputStream;
43  import jgroup.util.log.Eventlogger;
44  import jgroup.util.log.ViewEvent;
45  
46  import org.apache.log4j.Logger;
47  
48  /**
49   *  This <code>MergingLayer</code> class implements the Jgroup state
50   *  merging service.  It intercepts and delays view change events so
51   *  that the reconciliation protocol can complete before passing the
52   *  view change on to application member.
53   *
54   *  @author Alberto Montresor
55   *  @author Hein Meling
56   *  @since 0.7
57   */
58  public class MergingLayer
59    implements MergingService, MembershipService, MembershipListener, MulticastListener
60  {
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Logger
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Obtain logger for this class */
67    private static final Logger log = Logger.getLogger(MergingLayer.class);
68  
69  
70    ////////////////////////////////////////////////////////////////////////////////////////////
71    // Constants
72    ////////////////////////////////////////////////////////////////////////////////////////////
73  
74    /** Protocol name used to distinguish messages from other protocols. */
75    private static final String PROTOCOL_NAME = "SMS";
76  
77  
78    ////////////////////////////////////////////////////////////////////////////////////////////
79    // Fields
80    ////////////////////////////////////////////////////////////////////////////////////////////
81  
82    /* Underlying membership service */
83    private final MembershipService membershipService;
84  
85    /* Underlying multicast service */
86    private final MulticastService multicastService;
87  
88    /**
89     * Dynamic list of membership listeners.  Note that the ordering of
90     * these listeners are important.
91     */
92    private final List<MembershipListener> membershipListeners =
93      new ArrayList<MembershipListener>();
94  
95    /**
96     * Dynamic list of merging listeners.  Note that the ordering of
97     * these listeners are important.
98     */
99    private final List<MergingListener> mergingListeners =
100     new ArrayList<MergingListener>();
101 
102   /** Member table used by this layer (from MembershipLayer) */
103   private final MemberTable table;
104 
105   /**
106    * Member table provided by this layer
107    * This is to hide details stored in 'table' by this layer.
108    */
109   private final MemberTable providedTable = new MemberTable();
110 
111   /** The MemberId of this member */
112   private final MemberId me;
113 
114   /** The most recent view installed (not updated until the end of viewChange()) */
115   private View view;
116 
117   /** The composition of the last view */
118   private MemberId[] members;
119 
120   /** The number of remaining put state messages before merging is complete */
121   private int remainingPutStates;
122 
123   /** The previous view */
124   private View previousView;
125 
126 
127   ////////////////////////////////////////////////////////////////////////////////////////////
128   // Constructors
129   ////////////////////////////////////////////////////////////////////////////////////////////
130 
131   /**
132    *
133    * @exception ConfigurationException
134    *            raised when the distributed system configuration object contains an error
135    * @exception IOException
136    *            if an I/O error occurs when creating communication sockets.
137    */
138   private MergingLayer(MembershipService membershipService, MulticastService multicastService)
139     throws JgroupException
140   {
141     this.membershipService = membershipService;
142     this.multicastService = multicastService;
143 
144     table = membershipService.getMemberTable();
145     me = membershipService.getMyIdentifier();
146   }
147 
148 
149   ////////////////////////////////////////////////////////////////////////////////////////////
150   // Static factory
151   ////////////////////////////////////////////////////////////////////////////////////////////
152 
153   public static MergingLayer getLayer(MembershipService membService, MulticastService mcastService)
154     throws JgroupException
155   {
156     return new MergingLayer(membService, mcastService);
157   }
158 
159 
160   ////////////////////////////////////////////////////////////////////////////////////////////
161   // Layer interface methods
162   ////////////////////////////////////////////////////////////////////////////////////////////
163 
164   /**
165    *  Add a listener object for this layer to provide upcalls to,
166    *  in response to membership and merge events.  The same listener
167    *  object may listen to both types of events or just one of them.
168    */
169   public void addListener(Object listener)
170   {
171     if (listener instanceof MembershipListener && !membershipListeners.contains(listener))
172       membershipListeners.add((MembershipListener) listener);
173     if (listener instanceof MergingListener && !mergingListeners.contains(listener))
174       mergingListeners.add((MergingListener) listener);
175     if (!(listener instanceof MergingListener) && !(listener instanceof MembershipListener))
176       throw new IllegalArgumentException("Specified listener does not implement a MembershipListener or MergingListener");
177   }
178 
179 
180   ////////////////////////////////////////////////////////////////////////////////////////////
181   // FinalizeLayer interface methods (invoked to complete the group manager construction)
182   ////////////////////////////////////////////////////////////////////////////////////////////
183 
184   /**
185    *  Post initialization of the <code>MembershipLayer</code>; invoked once
186    *  all group manager layers have been constructed.  This method is defined
187    *  in the <code>FinalizeLayer</code> inherited through the
188    *  <code>MembershipSerivce</code>.
189    */
190   public void complete(Object server)
191     throws JgroupException
192   {
193     membershipService.complete(server);
194   }
195 
196 
197   ////////////////////////////////////////////////////////////////////////////////////////////
198   // Methods of MembershipService
199   ////////////////////////////////////////////////////////////////////////////////////////////
200 
201   /* (non-Javadoc)
202    * @see jgroup.core.MembershipService#join(int)
203    */
204   public void join(int gid)
205     throws JgroupException
206   {
207     membershipService.join(gid);
208   }
209 
210   /* (non-Javadoc)
211    * @see jgroup.core.MembershipService#join()
212    */
213   public void join()
214     throws JgroupException
215   {
216     membershipService.join();
217   }
218 
219   /* (non-Javadoc)
220    * @see jgroup.core.MembershipService#leave()
221    */
222   public void leave()
223     throws JgroupException
224   {
225     membershipService.leave();
226   }
227 
228   /* (non-Javadoc)
229    * @see jgroup.core.MembershipService#isMember()
230    */
231   public boolean isMember()
232   {
233     return membershipService.isMember();
234   }
235   
236   /* (non-Javadoc)
237    * @see jgroup.core.MembershipService#isJoining()
238    */
239   public boolean isJoining()
240   {
241     return membershipService.isJoining();
242   }
243 
244   /* (non-Javadoc)
245    * @see jgroup.core.MembershipService#isMemberOrJoining()
246    */
247   public boolean isMemberOrJoining()
248   {
249     return membershipService.isMemberOrJoining();
250   }
251   
252   /* (non-Javadoc)
253    * @see jgroup.core.MembershipService#isLeader()
254    */
255   public boolean isLeader()
256   {
257     return membershipService.isLeader();
258   }
259 
260   /* (non-Javadoc)
261    * @see jgroup.core.MembershipService#getMyIdentifier()
262    */
263   public MemberId getMyIdentifier()
264   {
265     return me;
266   }
267 
268   /* (non-Javadoc)
269    * @see jgroup.core.MembershipService#getGid()
270    */
271   public int getGid()
272   {
273     return membershipService.getGid();
274   }
275 
276   /* (non-Javadoc)
277    * @see jgroup.core.MembershipService#getMemberTable()
278    */
279   public MemberTable getMemberTable()
280   {
281     return providedTable;
282   }
283 
284   public int members()
285   {
286     return membershipService.members();
287   }
288 
289   /* (non-Javadoc)
290    * @see jgroup.core.MembershipService#getViewIndex()
291    */
292   public int getViewIndex()
293   {
294     return membershipService.getViewIndex();
295   }
296 
297 
298   /* (non-Javadoc)
299    * @see jgroup.core.MembershipService#getMemberIndex()
300    */
301   public int getMemberIndex()
302   {
303     return membershipService.getMemberIndex();
304   }
305 
306   
307   ////////////////////////////////////////////////////////////////////////////////////////////
308   // Methods from MembershipListener
309   ////////////////////////////////////////////////////////////////////////////////////////////
310 
311   /* (non-Javadoc)
312    * @see jgroup.core.MembershipListener#viewChange(jgroup.core.View)
313    */
314   @AllowDuplicateViews public void viewChange(final View view)
315   {
316     if (log.isDebugEnabled())
317       log.debug("MergingLayer: viewChange");
318     // Update the member table associated to this group
319     providedTable.viewChange(view);
320     // Store the current view for use in deliverStream
321     this.view = view;
322     remainingPutStates = view.mergingViews();
323     if (log.isDebugEnabled())
324       log.debug("REMAINING PUTSTATES: " + remainingPutStates);
325 
326     /*
327      * Associate new (empty) information with new members, and also
328      * associate new information with members that have recovered.
329      */
330     members = view.getMembers();
331     for (int i=0; i < members.length; i++) {
332       int status = table.getStatus(members[i]);
333       if (status == MemberTable.NEWMEMBER || status == MemberTable.RECOVERING)
334         table.put(members[i], new MemberInfo(members[i], members[i].equals(me)));
335     }
336 
337     /*
338      * Get the total list of members registered in table, including those in the
339      * most recent view, and those that have recently partitioned or crashed.
340      */
341     Object[] objs = table.elements();
342     MemberInfo[] mdata = new MemberInfo[objs.length];
343     for (int i=0; i < objs.length; i++) 
344       mdata[i] = (MemberInfo) objs[i];
345 
346     /*
347      * Update the mset and lset values, and compute if an update message
348      * is necessary.
349      */
350     boolean needupdate = false;
351     for (int i=0; i < mdata.length; i++) {
352       if (!table.isMember(mdata[i].id)) {
353         mdata[i].mset = false;
354         mdata[i].lset = false;
355         mdata[i].tset = false;
356       } else {
357         mdata[i].tset = true;
358         if (!mdata[i].mset && !mdata[i].id.equals(me))
359           needupdate = true;
360       }
361       if (log.isDebugEnabled()) {
362         StringBuilder buf = new StringBuilder();
363         buf.append(mdata[i]);
364         buf.append(", needupdate=");
365         buf.append(needupdate);
366         log.debug(buf.toString());
367       }
368     }
369 
370     if (needupdate) {
371       if (Eventlogger.ENABLED)
372         Eventlogger.logEventFlush(new ViewEvent(Merge, view));
373       if (hasLocalCoordinator(mdata)) {
374         if (log.isDebugEnabled())
375           log.debug("I'm merging coordinator");
376         try {
377           OutputStream stream = multicastService.getMessage(PROTOCOL_NAME);
378           MarshalOutputStream msg = new MarshalOutputStream(stream);
379           int[] owners = getOwners(mdata);
380           int[] dests = getDests(mdata);
381           MemberId[] destIds = new MemberId[dests.length];
382           for (int j = 0; j < dests.length; j++)
383             destIds[j] = members[dests[j]];
384 
385           msg.writeInt(owners.length);
386           for (int j=0; j < owners.length; j++) 
387             msg.writeInt(owners[j]);
388 
389           msg.writeInt(dests.length);
390           for (int j=0; j < dests.length; j++) 
391             msg.writeInt(dests[j]);
392 
393           try {
394             int mergingSize = mergingListeners.size();
395             msg.writeInt(mergingSize);
396             if (log.isDebugEnabled()) 
397               log.debug("Number of states to get (mergingListeners): " + mergingSize);
398             for (int j = 0; j < mergingSize; j++) {
399               MergingListener mergingListener = (MergingListener) mergingListeners.get(j);
400               msg.writeObject(mergingListener.getState(destIds));
401             }
402           } catch (Exception e) {
403             log.warn("Exception caught when getting state from members", e);
404           }
405           msg.flush();
406           multicastService.mcast(stream, null);
407         } catch(Exception e) {
408           log.warn("Exception during view change events of MergingData", e);
409         }
410       }
411     } else {
412       // Notify listeners of the new view if merging was not needed
413       notifyView();
414     } // END if (needupdate)
415   }
416 
417   /* (non-Javadoc)
418    * @see jgroup.core.MembershipListener#hasLeft()
419    */
420   public void hasLeft()
421   {
422     for (MembershipListener listener : membershipListeners) {
423       listener.hasLeft();
424     }
425   }
426 
427   /* (non-Javadoc)
428    * @see jgroup.core.MembershipListener#prepareChange()
429    */
430   public void prepareChange()
431   {
432     for (MembershipListener listener : membershipListeners) {
433       listener.prepareChange();
434     }
435   }
436 
437 
438   ////////////////////////////////////////////////////////////////////////////////////////////
439   // Methods from MulticastListener
440   ////////////////////////////////////////////////////////////////////////////////////////////
441 
442   /* (non-Javadoc)
443    * @see jgroup.core.multicast.MulticastListener#getProtocolName()
444    */
445   public String getProtocolName() 
446   {
447     return PROTOCOL_NAME;
448   }
449 
450   /* (non-Javadoc)
451    * @see jgroup.core.multicast.MulticastListener#deliverStream(java.io.InputStream, jgroup.core.MemberId, int)
452    */
453   public Object deliverStream(InputStream stream, MemberId sender, int seqNo)
454   {
455     if (log.isDebugEnabled()) {
456       log.debug("MergingLayer: deliverStream from " + sender);
457       log.debug(stream);
458     }
459 
460     try {
461       MarshalInputStream in = new MarshalInputStream(stream);
462 
463       MemberId[] owners = new MemberId[in.readInt()];
464       for (int i = 0; i < owners.length; i++) {
465         owners[i] = members[in.readInt()];
466         if (log.isDebugEnabled()) {
467           log.debug("owners[" + i + "]: " + owners[i] );
468         }
469         /*
470          * If I belong to <owners>, the <mset> contains all the members
471          * in the current view.
472          */
473         if (owners[i].equals(me)) {
474           for (int k = 0; k < members.length; k++) {
475             MemberInfo md = (MemberInfo) table.get(members[k]);
476             md.mset = true;
477           }
478         }
479 
480         // Each member in <owners> are contained in <lset>.
481         MemberInfo md = (MemberInfo) table.get(owners[i]);
482         md.lset = true;
483       }
484 
485       MemberId[] dests = new MemberId[in.readInt()];
486       boolean updateState = false;
487       for (int i = 0; i < dests.length; i++) {
488         dests[i] = members[in.readInt()];
489         if (dests[i].equals(me)) {
490           // The state message is directed to me; perform putState()
491           updateState = true;
492         }
493         if (log.isDebugEnabled()) {
494           log.debug("dests[" + i + "]: " + dests[i] );
495         }
496       }
497 
498       if (!updateState) {
499         if (log.isDebugEnabled())
500           log.debug("Message not for me; ignoring the putState message");
501         return null;
502       }
503       Object[] objs = new Object[in.readInt()];
504       for (int j = 0; j < objs.length; j++) {
505         objs[j] = in.readObject();
506         if (log.isDebugEnabled())
507           log.debug("Read STATE: " + objs[j]);
508       }
509 
510       for (int j = 0, size = mergingListeners.size(); j < size; j++)
511         ((MergingListener) mergingListeners.get(j)).putState(objs[j], dests);
512     } catch (Exception e) {
513       /*
514        * If we for some reason received an incorrectly formated message;
515        * we simply log the raised exception and ignore the message since
516        * we cannot know its origin.  That is it may be spam.
517        */
518       log.warn("MergingLayer: Unable to unmarshal stream message", e);
519     }
520 
521     remainingPutStates--;
522     if (log.isDebugEnabled())
523       log.debug("Remaining put state calls: " + remainingPutStates);
524     if (remainingPutStates == 0) {
525       // Notify listeners of the new view (only after updating the the server state)
526       notifyView();
527     }
528     return null;
529   }
530 
531 
532   /* (non-Javadoc)
533    * @see jgroup.core.multicast.MulticastListener#deliverObject(java.lang.Object)
534    */
535   public Object deliverObject(Object msg, MemberId sender, int seqNo)
536   {
537     throw new UnsupportedOperationException();
538   }
539 
540 
541   ////////////////////////////////////////////////////////////////////////////////////////////
542   // Private Methods
543   ////////////////////////////////////////////////////////////////////////////////////////////
544 
545   /**
546    * 
547    */
548   private void notifyView()
549   {
550     MembershipLayer.notifyViewListeners(view, previousView, membershipListeners);
551     previousView = view;
552   }
553 
554   /**
555    *
556    */
557   private int[] getOwners(MemberInfo[] mdata)
558   {
559     // Count owners
560     int len = 0;
561     for (int i=0; i < mdata.length; i++)
562       if (mdata[i].lset) len++;
563     
564     // Write owners
565     int[] owners = new int[len];
566     len = 0;
567     for (int i=0; i < mdata.length; i++) 
568       if (mdata[i].lset) {
569         owners[len] = table.getIndex(mdata[i].id);
570         len++;
571       }
572 
573     return owners;
574   }
575     
576   /**
577    * Retreive the set of members that needs to be updated with
578    * state information from the local coordinator.
579    */
580   private int[] getDests(MemberInfo[] mdata)
581   {
582     // Count dests
583     int len = 0;
584     for (int i=0; i < mdata.length; i++)
585       if (!mdata[i].mset && mdata[i].tset) len++;
586 
587     // Write dests
588     int[] dests = new int[len];
589     len = 0;
590     for (int i=0; i < mdata.length; i++)
591       if (!mdata[i].mset && mdata[i].tset) {
592         dests[len] = table.getIndex(mdata[i].id);
593         len++;
594       }
595 
596     return dests;
597   }
598 
599 
600   /**
601    * Returns true if the local member is elected as coordinator for
602    * its partition.  Note that we assume that the input array is
603    * ordered such that the first indecies contain members of the
604    * most recent view.
605    */
606   private boolean hasLocalCoordinator(MemberInfo[] mdata)
607   {
608     /*
609      * Search for the first member in mdata with mset equal to true.
610      */
611     int min = 0;
612     for (int i=0; i < mdata.length; i++) {
613       if (mdata[i].mset && table.getStatus(mdata[i].id) != MemberTable.CRASHED) {
614         min = i;
615         break;
616       }
617     }
618     if (log.isDebugEnabled())
619       log.debug("Elected Coordinator: " + mdata[min].id);
620     return (mdata[min].id.equals(me));
621   }
622 
623 
624   private class MemberInfo 
625   { 
626     /* True if the member may potentially know LESS updates (up-to-date set) */
627     boolean lset;
628 
629     /* True if the member may potentially know MORE updates */
630     boolean mset;
631 
632     /* True if this member is in the most recent view (vcomp) */
633     boolean tset;
634 
635     MemberId id;
636 
637     MemberInfo(MemberId id, boolean set)
638     {
639       this.id = id;
640       lset = set;
641       mset = set;
642       tset = set;
643     }
644 
645     @Override
646     public String toString()
647     {
648       StringBuilder buf = new StringBuilder("MergeInfo: ");
649       buf.append(table.toString(id));
650       buf.append(": mset=");
651       buf.append(mset);
652       buf.append(", lset=");
653       buf.append(lset);
654       buf.append(", tset=");
655       buf.append(tset);
656       return buf.toString();
657     }
658 
659   } // END MemberInfo
660 
661 } // END MergingLayer