View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  import java.rmi.NoSuchObjectException;
22  import java.rmi.RemoteException;
23  import java.util.Iterator;
24  
25  import jgroup.core.MemberId;
26  import jgroup.core.View;
27  import jgroup.core.multicast.AckListener;
28  import jgroup.util.Queue;
29  
30  import org.apache.log4j.Logger;
31  
32  /**
33   *  Buffer for managing multicast (<code>MsgMcast</code>) messages, and
34   *  corresponding reply messages (<code>MsgResult</code>).  That
35   *  includes also acknowledgement management.
36   *
37   *  @author Alberto Montresor
38   *  @since Jgroup 0.8
39   */
40  final class SendBuffer
41  {
42  
43    ////////////////////////////////////////////////////////////////////////////////////////////
44    // Logger
45    ////////////////////////////////////////////////////////////////////////////////////////////
46  
47    /** Obtain logger for this class */
48    private static final Logger log = Logger.getLogger(SendBuffer.class);
49  
50  
51    ////////////////////////////////////////////////////////////////////////////////////////////
52    // Fields
53    ////////////////////////////////////////////////////////////////////////////////////////////
54  
55    private final Queue buffer = new Queue();
56    private final Queue ackbuffer = new Queue();
57    private View        view;
58    private long        vid;
59    private int         hlen;
60    private int         nmessages;
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Constructor 
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    SendBuffer() { }
67  
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Methods
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /**
74     *
75     */
76    void setView(View view, int hlen)
77    {
78      if (log.isDebugEnabled())
79        log.debug("Flushing buffers");
80      this.view = view;
81      this.hlen = hlen;
82      vid = view.getVid();
83  
84      nmessages = 0;
85      
86      // Call nack
87      for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
88        AckListener ackl = ((MsgMcast) iter.next()).ackl;
89        if (ackl != null) { 
90          try {
91            if (log.isDebugEnabled())
92              log.debug("viewChange to AckListener");
93            ackl.viewChange();
94            if (log.isDebugEnabled())
95              log.debug("viewChange to AckListener completed");
96          } catch (NoSuchObjectException e) {
97            /*
98             * Ignored, as this can happen when the AckListener has been
99             * unexported since all results has been received correctly,
100            * and thus the receiver does not need to consider a view change
101            * for the current multicast message (invocation).
102            */
103         } catch (RemoteException e) {
104           log.warn("Failed to notify the AckListener of view change: " + ackl, e);
105         }
106       }
107     }
108     
109     buffer.clear();
110     MsgResult msgr;
111     while ((msgr = (MsgResult) ackbuffer.removeFirst()) != null) {
112       if (msgr.vid == vid) 
113         insertMsgResult(msgr);
114     }
115   }
116   
117   
118   /**
119    *
120    */
121   void insertMsgMcast(MsgMcast msg)
122   {
123     try {
124       if (msg.ackl != null) {
125         if (log.isDebugEnabled())
126           log.debug("notifyView to AckListener");
127         msg.ackl.notifyView(view);
128         if (log.isDebugEnabled())
129           log.debug("notifyView completed");
130       }
131     } catch (RemoteException e) {
132       log.warn("Failed to notify the AckListener of view change.", e);
133     }
134     buffer.insert(msg);
135     msg.setAckArray(hlen);
136   }
137 
138   
139   /**
140    *
141    */
142   void insertMsgResult(MsgResult result)
143   {
144     if (result.vid == vid) {
145       if (log.isDebugEnabled())
146         log.debug("Inserting result (same view): " + result);
147       for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
148         MsgMcast message = (MsgMcast) iter.next();
149         if (message.mid <= result.dlvr)
150           message.setAck(result.hpos);
151         if (message.mid == result.mid && message.sender.equals(result.sender)) {
152           MemberId[] members = view.getMembers();
153           log.assertLog(members.length > result.vpos,
154               "BAD: Trying to send ack, but view seems to have less members: " + view);
155           try {
156             if (message.ackl != null)
157               message.ackl.ack(members[result.vpos], result.vpos, result.result);
158           } catch (RemoteException e) {
159             log.warn("Failed to ACK the message.", e);
160           }
161         }
162       }
163       removeAcknowledged();
164     } else {
165       if (log.isDebugEnabled())
166         log.debug("Inserting result (different view): " + result);
167       ackbuffer.insert(result);
168     }
169   }
170 
171 
172   /**
173    *  Marks the given host as suspected.  This will mark messages in
174    *  this buffer, that was sent to the given host, as acknowledged.
175    *
176    *  @param host
177    *    The host to be marked as suspected.
178    */
179   void suspect(HostData host)
180   {
181     if (host.hasValidViewIndex()) {
182       int hpos = host.getViewIndex();
183       for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
184         MsgMcast message = (MsgMcast) iter.next();
185         message.setAck(hpos);
186       }
187       removeAcknowledged();
188     }
189   }
190 
191 
192   /**
193    *  Remove messages that have been already acknowledged by all other
194    *  host.
195    */
196   private void removeAcknowledged()
197   {
198     MsgMcast    message;
199 
200     while ((message = (MsgMcast) buffer.getFirst()) != null) {
201       if (message.isComplete()) 
202         buffer.removeFirst();
203       else
204         break;
205     }
206   }
207 
208 
209   int getMid()
210   {
211     return ++nmessages;
212   }
213 
214   boolean isEmpty()
215   {
216     return buffer.isEmpty();
217   }
218   
219   
220 } // END SendBuffer