1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.util.Iterator;
22 import java.util.SortedMap;
23 import java.util.TreeMap;
24
25 import jgroup.relacs.config.TransportConfig;
26
27 import org.apache.log4j.Logger;
28
29
30
31
32
33
34
35
36
37 final class MsgFlowSndrSide
38 implements MssConstants
39 {
40
41
42
43
44
45
46 private static final Logger log = Logger.getLogger(MsgFlowSndrSide.class);
47
48
49
50
51
52
53
54 private static Token token = new Token();
55
56
57 private static Token sleepToken = new Token();
58
59
60
61
62
63
64
65
66
67
68 private static final int NACK_THRESHOLD = 1;
69
70
71 private static final int SLOW_THRESHOLD = 10;
72
73
74 private static final int SLOW_DELAY = 10;
75
76
77
78
79
80
81
82 private int startingWin;
83
84
85 private int maxWin;
86
87
88 private int minWin;
89
90
91 private int congestionWindow;
92
93
94
95
96
97
98
99 private Cluster cluster;
100
101
102 private int lastMsgSent;
103
104
105 private int lastMsgAcked;
106
107
108 private int nackCount = 0;
109
110
111 private boolean slowStart = true;
112
113
114 private int slowCount = 0;
115
116
117 private SortedMap<Integer, FragmentIterator> sent = new TreeMap<Integer, FragmentIterator>();
118
119
120
121
122
123
124
125
126
127
128 MsgFlowSndrSide(Cluster cluster, TransportConfig config)
129 {
130 this.cluster = cluster;
131 this.startingWin = config.getStartingWindow();
132 this.maxWin = config.getMaxWindow();
133 this.minWin = config.getMinWindow();
134 }
135
136
137
138
139
140
141
142
143
144 void reset(int mid)
145 {
146 token.go();
147 if (log.isDebugEnabled()) {
148 log.debug("Before: " + this);
149 }
150
151
152 lastMsgSent = mid;
153
154 lastMsgAcked = mid;
155
156 congestionWindow = startingWin;
157
158
159 sent.clear();
160
161
162 if (log.isDebugEnabled()) {
163 log.debug(" After: " + this);
164 }
165 }
166
167
168
169
170
171
172 boolean isInitialized()
173 {
174 return (lastMsgSent != UNDEF);
175 }
176
177
178
179
180
181
182
183
184
185
186
187 int insertSentMsgFrag(FragmentIterator fragIter)
188 {
189
190 int fid = ++lastMsgSent;
191 sent.put(fid, fragIter);
192 if (log.isDebugEnabled()) {
193 log.debug("Inserted msg fragment (" + fid + ") in sent queue of cluster "+ cluster);
194 log.debug(this);
195 }
196
197
198 slowDelay();
199 return fid;
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214 FragmentIterator getSentMsgFrag(MsgNACK nack)
215 {
216 if (log.isDebugEnabled()) {
217 log.debug(this);
218 }
219 return sent.get(nack.mid);
220 }
221
222
223
224
225
226 void flush(int lastMsgAcked)
227 {
228 setLastMsgAcked(lastMsgAcked);
229
230
231 for (Iterator<Integer> iter = sent.keySet().iterator(); iter.hasNext(); ) {
232 int mid = iter.next();
233 if (mid <= lastMsgAcked) {
234 if (log.isDebugEnabled())
235 log.debug("flush: Removing msg (" + mid + ") from sent queue of cluster: "
236 + cluster.getEndPoint());
237 iter.remove();
238 } else if (log.isDebugEnabled()) {
239 log.debug("flush: Keeping msg (" + mid + ") in sent queue of cluster: "
240 + cluster.getEndPoint());
241 }
242 }
243 if (log.isDebugEnabled()) {
244 log.debug("flush: LastMsgAcked (" + lastMsgAcked + ") for cluster: "
245 + cluster.getEndPoint() + "; remaining messages in sent queue: " + sent.size());
246 }
247 }
248
249
250
251
252
253 private void slowDelay()
254 {
255 if (slowStart && slowCount < SLOW_THRESHOLD) {
256 slowCount++;
257 sleepToken.sleep(SLOW_DELAY);
258 } else {
259 slowStart = false;
260 slowCount = 0;
261 }
262 }
263
264
265
266
267 int getLastMsgSent()
268 {
269 return lastMsgSent;
270 }
271
272
273
274
275 boolean isCongested()
276 {
277 return nackCount > NACK_THRESHOLD;
278 }
279
280 void clearNackCount()
281 {
282 nackCount = 0;
283 }
284
285 void incNackCount()
286 {
287 nackCount++;
288 }
289
290 synchronized int getActualWindow()
291 {
292 if (log.isDebugEnabled()) {
293 log.debug(cluster.getEndPoint() + ": * actualWindow="
294 + (congestionWindow < maxWin ? congestionWindow : maxWin));
295 }
296 return (congestionWindow < maxWin ? congestionWindow : maxWin);
297 }
298
299 synchronized int decCongWin()
300 {
301 int newWin = congestionWindow / 2;
302 congestionWindow = (minWin < newWin ? newWin : minWin);
303 if (log.isDebugEnabled()) {
304 log.debug(cluster.getEndPoint() + ": - congestionWindow=" + congestionWindow);
305 }
306 return congestionWindow;
307 }
308
309 synchronized int incCongWin(int npack)
310 {
311 if (npack > 0) {
312 int inc = (npack > SLOW_THRESHOLD) ? SLOW_THRESHOLD : npack;
313 congestionWindow = Math.min(congestionWindow + inc, maxWin);
314 if (log.isDebugEnabled())
315 log.debug(cluster.getEndPoint() + ": + congestionWindow=" + congestionWindow);
316 return congestionWindow;
317 }
318 return congestionWindow;
319 }
320
321 public void waitOpen()
322 {
323 token.stop();
324 slowStart = true;
325 slowCount = 0;
326 }
327
328
329 public int getLastMsgAcked()
330 {
331 return lastMsgAcked;
332 }
333
334
335 private void setLastMsgAcked(int mid)
336 {
337 if (mid != UNDEF && mid > lastMsgAcked) {
338 int llma = lastMsgAcked;
339 lastMsgAcked = mid;
340
341
342 incCongWin(mid - llma);
343 token.go();
344 }
345 }
346
347 boolean isWindowOpen()
348 {
349 boolean openWindow = (lastMsgSent < (lastMsgAcked + getActualWindow()));
350 if (log.isDebugEnabled()) {
351 log.debug("isWindowOpen: " + openWindow);
352 }
353 return openWindow;
354 }
355
356
357
358
359
360
361 public String toString()
362 {
363 StringBuilder buf = new StringBuilder("[MsgFlowSndrSide: ");
364 buf.append("lastMsgSent=");
365 buf.append(lastMsgSent);
366 buf.append(", lastMsgAcked=");
367 buf.append(lastMsgAcked);
368 buf.append(", actualWindow=");
369 buf.append(getActualWindow());
370 buf.append(", congestionWindow=");
371 buf.append(congestionWindow);
372 buf.append(", sent={");
373 for (Iterator iter = sent.keySet().iterator(); iter.hasNext(); ) {
374 Integer fragid = (Integer) iter.next();
375 FragmentIterator frag = sent.get(fragid);
376 buf.append("\n(");
377 buf.append(fragid);
378 buf.append(") --> ");
379 buf.append(frag);
380 }
381 buf.append(" }]\n");
382 return buf.toString();
383 }
384
385 }