View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.daemon;
20  
21  import java.rmi.NoSuchObjectException;
22  import java.rmi.RemoteException;
23  import java.util.Collections;
24  import java.util.LinkedList;
25  import java.util.Queue;
26  
27  import jgroup.core.MemberId;
28  import jgroup.core.View;
29  import jgroup.core.multicast.AckListener;
30  
31  import org.apache.log4j.Logger;
32  
33  /**
34   *  Buffer for managing multicast (<code>MsgMcast</code>) messages, and
35   *  corresponding reply messages (<code>MsgResult</code>).  That
36   *  includes also acknowledgement management.
37   *
38   *  @author Alberto Montresor
39   *  @since Jgroup 0.8
40   */
41  final class SCopyOfSendBuffer
42  {
43  
44    ////////////////////////////////////////////////////////////////////////////////////////////
45    // Logger
46    ////////////////////////////////////////////////////////////////////////////////////////////
47  
48    /** Obtain logger for this class */
49    private static final Logger log = Logger.getLogger(SCopyOfSendBuffer.class);
50  
51  
52    ////////////////////////////////////////////////////////////////////////////////////////////
53    // Fields
54    ////////////////////////////////////////////////////////////////////////////////////////////
55  
56    @SuppressWarnings("unchecked")
57    private final Queue<MsgMcast> buffer = (Queue) Collections.synchronizedList(new LinkedList<MsgMcast>());;
58    @SuppressWarnings("unchecked")
59    private final Queue<MsgResult> ackbuffer = (Queue) Collections.synchronizedList(new LinkedList<MsgResult>());
60    private View        view;
61    private long        vid;
62    private int         hlen;
63    private int         nmessages;
64  
65    ////////////////////////////////////////////////////////////////////////////////////////////
66    // Constructor 
67    ////////////////////////////////////////////////////////////////////////////////////////////
68  
69    SCopyOfSendBuffer() {}
70  
71  
72    ////////////////////////////////////////////////////////////////////////////////////////////
73    // Methods
74    ////////////////////////////////////////////////////////////////////////////////////////////
75  
76    /**
77     *
78     */
79    void setView(View view, int hlen)
80    {
81      if (log.isDebugEnabled())
82        log.debug("Flushing buffers");
83      this.view = view;
84      this.hlen = hlen;
85      vid = view.getVid();
86  
87      nmessages = 0;
88      
89      // Call nack
90      for (MsgMcast msg : buffer) {
91        AckListener ackl = msg.ackl;
92        if (ackl != null) { 
93          try {
94            if (log.isDebugEnabled())
95              log.debug("viewChange to AckListener");
96            ackl.viewChange();
97            if (log.isDebugEnabled())
98              log.debug("viewChange to AckListener completed");
99          } catch (NoSuchObjectException e) {
100           /*
101            * Ignored, as this can happen when the AckListener has been
102            * unexported since all results has been received correctly,
103            * and thus the receiver does not need to consider a view change
104            * for the current multicast message (invocation).
105            */
106         } catch (RemoteException e) {
107           log.warn("Failed to notify the AckListener of view change: " + ackl, e);
108         }
109       }
110     }
111     buffer.clear();
112     MsgResult msgr;
113     while ((msgr = (MsgResult) ackbuffer.poll()) != null) {
114       if (msgr.vid == vid) 
115         insertMsgResult(msgr);
116     }
117   }
118   
119   
120   /**
121    *
122    */
123   void insertMsgMcast(MsgMcast msg)
124   {
125     try {
126       if (msg.ackl != null) {
127         if (log.isDebugEnabled())
128           log.debug("notifyView to AckListener");
129         msg.ackl.notifyView(view);
130         if (log.isDebugEnabled())
131           log.debug("notifyView completed");
132       }
133     } catch (RemoteException e) {
134       log.warn("Failed to notify the AckListener of view change.", e);
135     }
136     buffer.add(msg);
137     msg.setAckArray(hlen);
138   }
139 
140   
141   /**
142    *
143    */
144   void insertMsgResult(MsgResult result)
145   {
146     if (result.vid == vid) {
147       if (log.isDebugEnabled())
148         log.debug("Inserting result (same view): " + result);
149       for (MsgMcast message : buffer) {
150         if (message.mid <= result.dlvr)
151           message.setAck(result.hpos);
152         if (message.mid == result.mid && message.sender.equals(result.sender)) {
153           MemberId[] members = view.getMembers();
154           log.assertLog(members.length > result.vpos,
155               "BAD: Trying to send ack, but view seems to have less members: " + view);
156           try {
157             if (message.ackl != null)
158               message.ackl.ack(members[result.vpos], result.vpos, result.result);
159           } catch (RemoteException e) {
160             log.warn("Failed to ACK the message.", e);
161           }
162         }
163       }
164       removeAcknowledged();
165     } else {
166       if (log.isDebugEnabled())
167         log.debug("Inserting result (different view): " + result);
168       ackbuffer.add(result);
169     }
170   }
171 
172 
173   /**
174    *  Marks the given host as suspected.  This will mark messages in
175    *  this buffer, that was sent to the given host, as acknowledged.
176    *
177    *  @param host
178    *    The host to be marked as suspected.
179    */
180   void suspect(HostData host)
181   {
182     if (host.hasValidViewIndex()) {
183       int hpos = host.getViewIndex();
184       for (MsgMcast message : buffer) {
185         message.setAck(hpos);
186       }
187       removeAcknowledged();
188     }
189   }
190 
191 
192   /**
193    *  Remove messages that have been already acknowledged by all other
194    *  host.
195    */
196   private void removeAcknowledged()
197   {
198     MsgMcast    message;
199     while ((message = (MsgMcast) buffer.peek()) != null) {
200       if (message.isComplete()) 
201         buffer.poll();
202       else
203         break;
204     }
205   }
206 
207 
208   int getMid()
209   {
210     return ++nmessages;
211   }
212 
213   boolean isEmpty()
214   {
215     return buffer.isEmpty();
216   }
217   
218   
219 } // END SendBuffer