View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.util.Iterator;
22  import java.util.SortedMap;
23  import java.util.TreeMap;
24  
25  import jgroup.relacs.config.TransportConfig;
26  
27  import org.apache.log4j.Logger;
28  
29  /**
30   *  The <code>MsgFlowSndrSide</code> class holds the data structures
31   *  needed to maintain reliable message delivery.
32   *
33   *  @author Salvatore Cammarata
34   *  @author Hein Meling
35   *  @since Jgroup 1.2
36   */
37  final class MsgFlowSndrSide
38    implements MssConstants
39  {
40  
41    ////////////////////////////////////////////////////////////////////////////////////////////
42    // Logger
43    ////////////////////////////////////////////////////////////////////////////////////////////
44  
45    /** Obtain logger for this class */
46    private static final Logger log = Logger.getLogger(MsgFlowSndrSide.class);
47  
48  
49    ////////////////////////////////////////////////////////////////////////////////////////////
50    // Static section
51    ////////////////////////////////////////////////////////////////////////////////////////////
52  
53    /** Token for blocking the sender if the sent window is closed */
54    private static Token token = new Token();
55  
56    /** */
57    private static Token sleepToken = new Token();  
58  
59  
60    ////////////////////////////////////////////////////////////////////////////////////////////
61    // Constants
62    ////////////////////////////////////////////////////////////////////////////////////////////
63  
64    /**
65     *  The NACK threshold; the number of NACKs required to mark the local
66     *  cluster as congested.
67     */
68    private static final int NACK_THRESHOLD = 1;
69  
70    /** The slow start threshold */
71    private static final int SLOW_THRESHOLD = 10;
72  
73    /** The delay for each iteration of the slow start */
74    private static final int SLOW_DELAY  = 10;
75  
76  
77    ////////////////////////////////////////////////////////////////////////////////////////////
78    // Sliding window parameters
79    ////////////////////////////////////////////////////////////////////////////////////////////
80  
81    /** The width of the sending window (when starting) */
82    private int startingWin;
83  
84    /** Maximum width of the sending window */
85    private int maxWin;
86  
87    /** Mimimum width of the sending window */
88    private int minWin;
89  
90    /** Width of the congestion window */
91    private int congestionWindow;
92  
93  
94    ////////////////////////////////////////////////////////////////////////////////////////////
95    // Fields
96    ////////////////////////////////////////////////////////////////////////////////////////////
97  
98    /** Associated cluster */
99    private Cluster cluster;
100 
101   /** Right edge of the sliding window */
102   private int lastMsgSent;
103 
104   /** Left edge of the sliding window */
105   private int lastMsgAcked; 
106 
107   /** Number of NACKs received */
108   private int nackCount = 0;
109 
110   /** Slow start activated */
111   private boolean slowStart = true; 
112 
113   /** Number of iterations so far for the slow start algorithm */
114   private int slowCount = 0;
115 
116   /** Queue of the message sent and not yet acked */
117   private SortedMap<Integer, FragmentIterator> sent = new TreeMap<Integer, FragmentIterator>();
118 
119   /** Queue of the message sent and not yet acked */
120   //FIXED HEIN: was not used; consider if it will be used later?!
121   //  TreeMap frozen = new TreeMap();
122 
123 
124   ////////////////////////////////////////////////////////////////////////////////////////////
125   // Constructors
126   ////////////////////////////////////////////////////////////////////////////////////////////
127 
128   MsgFlowSndrSide(Cluster cluster, TransportConfig config) 
129   {
130     this.cluster = cluster;
131     this.startingWin = config.getStartingWindow();
132     this.maxWin  = config.getMaxWindow();
133     this.minWin  = config.getMinWindow();
134   }
135 
136 
137   ////////////////////////////////////////////////////////////////////////////////////////////
138   // Methods
139   ////////////////////////////////////////////////////////////////////////////////////////////
140 
141   /**
142    * 
143    */
144   void reset(int mid) 
145   {
146     token.go();
147     if (log.isDebugEnabled()) {
148       log.debug("Before: " + this);
149     }
150 
151     // right edge of the sliding window
152     lastMsgSent = mid;
153     // left edge of the sliding window
154     lastMsgAcked = mid;
155     // reset the congestion window size
156     congestionWindow = startingWin;
157 
158     /* Clear the sent queue */
159     sent.clear();
160     //     frozen.clear();
161 
162     if (log.isDebugEnabled()) {
163       log.debug(" After: " + this);
164     }
165   }
166 
167 
168   /**
169    *  Returns true if the sender side message flow is initialized and
170    *  can be used; otherwise false is returned.
171    */
172   boolean isInitialized()
173   {
174     return (lastMsgSent != UNDEF);
175   }
176 
177 
178   /**
179    *  Insert the given message fragment (that will be sent next) in the
180    *  ordered <i>sent</i> queue.
181    *
182    *  @param msgFragment
183    *    The message fragment to store in the <i>sent</i> queue.
184    *  @return
185    *    The fragment identifier for this message.
186    */
187   int insertSentMsgFrag(FragmentIterator fragIter)
188   {
189     /* Increments the number of messages sent. */
190     int fid = ++lastMsgSent;
191     sent.put(fid, fragIter);
192     if (log.isDebugEnabled()) {
193       log.debug("Inserted msg fragment (" + fid + ") in sent queue of cluster "+ cluster);
194       log.debug(this);
195     }
196 
197     /* Slow start */
198     slowDelay();
199     return fid;
200   }
201 
202 
203   /**
204    *  Get a previously sent message fragment, based on the NACK message
205    *  identifier.  This will be used to resend the message fragment.
206    *
207    *  @param nack
208    *    The <code>MsgNACK</code> object containing the message
209    *    identifier that should be resent.
210    *  @return 
211    *    The fragment iterator associated with the identifier of the NACK
212    *    message.
213    */
214   FragmentIterator getSentMsgFrag(MsgNACK nack)
215   {
216     if (log.isDebugEnabled()) {
217       log.debug(this);
218     }
219     return sent.get(nack.mid);
220   }
221 
222 
223   /**
224    *  Flush the sent queue.  Remove all messages already acknowledged.
225    */
226   void flush(int lastMsgAcked)
227   {
228     setLastMsgAcked(lastMsgAcked);
229 
230     /* Remove received messages from the sent queue in sorted order. */
231     for (Iterator<Integer> iter = sent.keySet().iterator(); iter.hasNext(); ) {
232       int mid = iter.next();
233       if (mid <= lastMsgAcked) {
234         if (log.isDebugEnabled())
235           log.debug("flush: Removing msg (" + mid + ") from sent queue of cluster: "
236             + cluster.getEndPoint());
237         iter.remove();
238       } else if (log.isDebugEnabled()) {
239         log.debug("flush: Keeping msg (" + mid + ") in sent queue of cluster: "
240           + cluster.getEndPoint());
241       }
242     }
243     if (log.isDebugEnabled()) {
244       log.debug("flush: LastMsgAcked (" + lastMsgAcked + ") for cluster: "
245         + cluster.getEndPoint() + "; remaining messages in sent queue: " + sent.size());
246     }
247   }
248 
249 
250   /**
251    *  Slow start
252    */
253   private void slowDelay()
254   {
255     if (slowStart && slowCount < SLOW_THRESHOLD) {
256       slowCount++;
257       sleepToken.sleep(SLOW_DELAY);
258     } else {
259       slowStart = false;
260       slowCount = 0;
261     }
262   }
263 
264   /**
265    *  Returns the number of messages sent.
266    */
267   int getLastMsgSent() 
268   {
269     return lastMsgSent;
270   }
271 
272   /**
273    * 
274    */
275   boolean isCongested()
276   {
277     return nackCount > NACK_THRESHOLD;
278   }
279 
280   void clearNackCount()
281   {
282     nackCount = 0;
283   }
284 
285   void incNackCount()
286   {
287     nackCount++;
288   }
289 
290   synchronized int getActualWindow()
291   {
292     if (log.isDebugEnabled()) {
293       log.debug(cluster.getEndPoint() + ": * actualWindow=" 
294         + (congestionWindow < maxWin ? congestionWindow : maxWin));
295     }
296     return (congestionWindow < maxWin ? congestionWindow : maxWin);
297   }
298 
299   synchronized int decCongWin()
300   {
301     int newWin = congestionWindow / 2;
302     congestionWindow = (minWin < newWin ? newWin : minWin);
303     if (log.isDebugEnabled()) {
304       log.debug(cluster.getEndPoint() + ": - congestionWindow=" + congestionWindow);
305     }
306     return congestionWindow; 
307   }
308   
309   synchronized int incCongWin(int npack)
310   {
311     if (npack > 0) {
312       int inc = (npack > SLOW_THRESHOLD) ? SLOW_THRESHOLD : npack;
313       congestionWindow = Math.min(congestionWindow + inc, maxWin);
314       if (log.isDebugEnabled()) 
315         log.debug(cluster.getEndPoint() + ": + congestionWindow=" + congestionWindow);
316       return congestionWindow;
317     }
318     return congestionWindow;
319   }
320 
321   public void waitOpen()
322   {
323     token.stop();
324     slowStart = true;
325     slowCount = 0;
326   }
327 
328 
329   public int getLastMsgAcked()
330   {
331     return lastMsgAcked;
332   }
333 
334 
335   private void setLastMsgAcked(int mid)
336   {
337     if (mid != UNDEF && mid > lastMsgAcked) {
338       int llma = lastMsgAcked;
339       lastMsgAcked = mid;
340 
341       // congestion control
342       incCongWin(mid - llma);
343       token.go(); 
344     }
345   }
346 
347   boolean isWindowOpen()
348   {
349     boolean openWindow = (lastMsgSent < (lastMsgAcked + getActualWindow()));
350     if (log.isDebugEnabled()) {
351       log.debug("isWindowOpen: " + openWindow);
352     }
353     return openWindow;
354   }
355 
356 
357   ////////////////////////////////////////////////////////////////////////////////////////////
358   // Object methods
359   ////////////////////////////////////////////////////////////////////////////////////////////
360 
361   public String toString()
362   {
363     StringBuilder buf = new StringBuilder("[MsgFlowSndrSide: ");
364     buf.append("lastMsgSent=");
365     buf.append(lastMsgSent);
366     buf.append(", lastMsgAcked=");
367     buf.append(lastMsgAcked);
368     buf.append(", actualWindow=");
369     buf.append(getActualWindow());
370     buf.append(", congestionWindow=");
371     buf.append(congestionWindow);
372     buf.append(", sent={");
373     for (Iterator iter = sent.keySet().iterator(); iter.hasNext(); ) {
374       Integer fragid = (Integer) iter.next();
375       FragmentIterator frag = sent.get(fragid);
376       buf.append("\n(");
377       buf.append(fragid);
378       buf.append(") --> ");
379       buf.append(frag);
380     }
381     buf.append(" }]\n");
382     return buf.toString();
383   }
384 
385 } // END MsgFlowSndrSide