View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.SortedMap;
26  import java.util.TreeMap;
27  
28  import jgroup.relacs.config.TransportConfig;
29  import jgroup.util.InMessage;
30  
31  import org.apache.log4j.Logger;
32  
33  /**
34   *  The <code>MsgFlowRcvrSide</code> class handles the structures needed
35   *  to maintain reliability and flow control.
36   *
37   *  @author Salvatore Cammarata
38   *  @author Hein Meling
39   *  @since Jgroup 1.2
40   */
41  final class MsgFlowRcvrSide 
42    implements MssConstants 
43  {
44  
45    ////////////////////////////////////////////////////////////////////////////////////////////
46    // Logger
47    ////////////////////////////////////////////////////////////////////////////////////////////
48  
49    /** Obtain logger for this class */
50    private static final Logger log = Logger.getLogger(MsgFlowRcvrSide.class);
51  
52    
53    ////////////////////////////////////////////////////////////////////////////////////////////
54    // Fields
55    ////////////////////////////////////////////////////////////////////////////////////////////
56  
57    /** Local host */
58    private MssHost localHost;
59  
60    /** The default payload length of each message */
61    private int payload;
62  
63    /** Right edge of the sliding window */
64    private int lastMsgRcvd;
65  
66    /** Sliding window */
67    SWindow swindow;
68  
69    /** 
70     *  clusterWindow is used to obtain the minimum lastDelivered(x)
71     *  where x is a member of the local cluster. 
72     */
73    ClusterWindow clusterWindow;
74  
75    /** List of nack sent to host */
76    private Map<Integer, ScheduledEvent> nacks =
77      Collections.synchronizedMap(new HashMap<Integer, ScheduledEvent>());
78  
79    /** Queue of the message received and not yet delivered */
80    private SortedMap<Integer, FragmentHeader> rcvd = new TreeMap<Integer, FragmentHeader>();
81  
82    /** Queue of the reliable message received and not yet delivered */
83    //FIXED HEIN: Was not used; consider if it is needed later?!
84    //  TreeMap reliableRcvd = new TreeMap();
85  
86    /** Queue of the fragments not yet composed */
87    private SortedMap<Integer, byte[]> fragments = new TreeMap<Integer, byte[]>();
88  
89    /** Total message byte size */
90    private int totlen;
91    
92  
93    ////////////////////////////////////////////////////////////////////////////////////////////
94    // Constructors
95    ////////////////////////////////////////////////////////////////////////////////////////////
96  
97    /**
98     *  Creates an empty object not initialized 
99     */
100   MsgFlowRcvrSide(TransportConfig config) 
101   {
102     this.payload = config.getPayload();
103     swindow = new SWindow(config.getMaxWindow(), UNDEF);
104   }
105 
106   
107   ////////////////////////////////////////////////////////////////////////////////////////////
108   // Methods
109   ////////////////////////////////////////////////////////////////////////////////////////////
110 
111   /**
112    *  The receive window is closed
113    */
114   void reset()
115   {
116     reset(UNDEF);
117   }
118 
119   /**
120    *  We initialize the receive window with to given message fragment
121    *  identifier, in order to synchronize.
122    */
123   void reset(int mid) 
124   {
125     if (log.isDebugEnabled())
126       log.debug("Synchronization - lastMsgSent=" + mid);
127 
128     lastMsgRcvd = mid;
129     swindow.clear(mid);
130     /*
131      * Note that we cannot initialize the localHost until here, since
132      * each host is associated with a MsgFlowRcvrSide object; and the
133      * localhost may not be the first one to be initialized.  We do a
134      * test here so that we can avoid creating a new ClusterWindow
135      * object for each reset invocation; instead, we just clear it.
136      */
137     if (localHost == null) {
138       localHost = HostTable.getLocalHost();
139       clusterWindow = new ClusterWindow(localHost.getCluster().size());
140     } else {
141       clusterWindow.clear();
142     }
143     clusterWindow.set(localHost.getClusterIndex(), mid);
144     nacks.clear();
145 //     reliableRcvd.clear();
146     rcvd.clear();
147     fragments.clear();
148     totlen = 0;
149   }
150 
151 
152   /**
153    *  Set the last message received and return the old last received
154    *  value.
155    *
156    *  @return
157    *    The identifier for old last received message.
158    */
159   int setLastMsgRcvd(int mid)
160   {
161     int oldLastRcvd = lastMsgRcvd;
162     lastMsgRcvd = mid;
163     return oldLastRcvd;
164   }
165 
166 
167   int getLastMsgRcvd()
168   {
169     return lastMsgRcvd;
170   }
171 
172 
173   int getLastMsgDlvr()
174   {
175     return swindow.lastMsgDlvr;
176   }
177 
178 
179   /**
180    *  Returns true if the specified message has already been delivered;
181    *  otherwise false is returned.
182    */
183   boolean isDelivered(int fid)
184   {
185     return swindow.lookup(fid);
186   }
187 
188 
189   /**
190    *  Returns true if the specified message has an associated NACK
191    *  timeout that has not yet expired; otherwise false is returned.
192    */
193   boolean hasNACK(int mid)
194   {
195     return (mid < lastMsgRcvd && mid > swindow.lastMsgDlvr);
196   }
197 
198 
199   /**
200    *  Returns the number of received messages that has not yet been
201    *  delivered due to not respecting FIFO-order.  It also updates the
202    *  sliding window datastructures accordingly.
203    */
204   int getDeliveryCount()
205   {
206     int pos = swindow.shiftFIFO();
207     clusterWindow.set(localHost.getClusterIndex(), swindow.lastMsgDlvr);
208     return pos;
209   }
210 
211 
212   /**
213    *  Returns true if the specified message is in FIFO order; otherwise
214    *  false is returned.  If the message is in FIFO order, it also
215    *  updates the sliding window datastructures accordingly.
216    */
217   boolean isFIFO(int mid)
218   {
219     if (mid == swindow.lastMsgDlvr + 1) {
220       swindow.shift();
221       if (lastMsgRcvd < swindow.lastMsgDlvr) 
222         lastMsgRcvd = swindow.lastMsgDlvr;
223       clusterWindow.set(localHost.getClusterIndex(), mid);
224       return true;
225     } else {
226       return false;
227     }
228   }
229 
230 
231   /**
232    *  Returns true if the specified message is not in FIFO order,
233    *  causing a gap in the sequence of messages.  This means that the
234    *  received message must be greater than all those already delivered.
235    */
236   boolean isNotFIFO(int mid)
237   {
238     return (mid > swindow.lastMsgDlvr + 1);
239   }
240 
241 
242   /**
243    *  Returns true if the specified message causes a gap in the sequence
244    *  of received messages; otherwise false is returned.
245    */
246   boolean causeGap(int mid)
247   {
248     return (mid > lastMsgRcvd);
249   }
250 
251 
252   /**
253    *  Insert the given message in the ordered queue.  Also update the
254    *  sliding window with the received messages.
255    *
256    *  @param msgFrag
257    *    The message fragment to insert in the ordered queue.
258    */
259   void insertMsgFrag(FragmentHeader msgFrag)
260   {
261     swindow.set(msgFrag.fragId);
262     rcvd.put(msgFrag.fragId, msgFrag);
263   }
264 
265 
266   /**
267    *  Remove and return the first message fragment stored in the ordered
268    *  queue.
269    *
270    *  @return 
271    *    The first message stored in the ordered queue.
272    */
273   FragmentHeader removeMsgFrag()
274   {
275     return rcvd.remove(rcvd.firstKey());
276   }
277 
278 
279   /**
280    *  Add a fragement to the list of fragments composing the message.
281    *  If this is the last fragment, the message is composed and
282    *  returned back to the caller.
283    *
284    *  @param msg
285    *    The message to be composed.
286    *  @return 
287    *    Returns the message stream to be m-received by the upper layer
288    *    if this was the last fragment of the message, and if it was for
289    *    me; otherwise <code>null</code> is returned.
290    */
291   InMessage addFragment(FragmentHeader header)
292   {
293 
294     byte[] msgFragment = header.getFragment();
295     int fragLen = header.getFragmentLength() - OVERHEAD_SIZE;
296     if (!header.isLastFragment()) {
297 
298       /*
299        * This is not the last fragment; add it to the fragments TreeMap.
300        */
301       if (log.isDebugEnabled())
302         log.debug("NOT the last fragement of msg (" + header.fragId + ")");
303       totlen += payload;
304       fragments.put(header.fragId, msgFragment);
305       return null;
306 
307     } else {
308 
309       /*
310        * Last fragment; compose and deliver
311        */
312       totlen += fragLen;
313       if (log.isDebugEnabled())
314         log.debug("LAST FRAGEMENT: frag.len= "+ fragLen
315                   + "  total.len= " + totlen + "  payload=" + payload);
316       InMessage msgstream = new InMessage(payload, HEADER_SIZE, TRAILER_SIZE, totlen);
317       /*
318        * For each fragment of the message, reconstruct the message
319        * stream in correct (ascending) order of the message
320        * identifiers (keys to the <code>fragments</code> TreeMap).
321        * After the fragment is inserted into the message stream, it is
322        * removed from the iterator (and the underlying TreeMap.)
323        */
324       for (Iterator iter = fragments.keySet().iterator(); iter.hasNext(); ) {
325         Integer fid = (Integer) iter.next();
326         byte[] msgFrag = fragments.get(fid);
327         msgstream.insert(msgFrag, payload);
328         if (log.isDebugEnabled())
329           log.debug("inserting msg (" + fid + ") in stream\n" + msgstream);
330         iter.remove();
331       }
332 
333       /* Insert the last fragment into the message stream. */
334       msgstream.insert(msgFragment, fragLen);
335       if (log.isDebugEnabled())
336         log.debug("final msg (" + header.fragId + ") in stream\n" + msgstream);
337       totlen = 0;
338       return msgstream;
339 
340     }
341   }
342 
343 
344   ////////////////////////////////////////////////////////////////////////////////////////////
345   // Event handling methods
346   ////////////////////////////////////////////////////////////////////////////////////////////
347 
348   ScheduledEvent getScheduledEvent(int mid)
349   {
350     return nacks.get(mid);
351   }
352 
353 
354   ScheduledEvent putScheduledEvent(int mid, ScheduledEvent event)
355   {
356     return nacks.put(mid, event);
357   }
358 
359 
360   ScheduledEvent removeScheduledEvent(int mid)
361   {
362     return nacks.remove(mid);
363   }
364 
365 
366   ////////////////////////////////////////////////////////////////////////////////////////////
367   // Object methods
368   ////////////////////////////////////////////////////////////////////////////////////////////
369 
370   public String toString()
371   {
372     StringBuilder buf = new StringBuilder("[MsgFlowRcvrSide: lastMsgRcvd=");
373     buf.append(lastMsgRcvd);
374     buf.append(", ");
375     buf.append(swindow);
376     buf.append(", ");
377     buf.append(clusterWindow);
378     return buf.toString();
379   }
380 
381 } // END MsgFlowRcvrSide