1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.SortedMap;
26 import java.util.TreeMap;
27
28 import jgroup.relacs.config.TransportConfig;
29 import jgroup.util.InMessage;
30
31 import org.apache.log4j.Logger;
32
33 /**
34 * The <code>MsgFlowRcvrSide</code> class handles the structures needed
35 * to maintain reliability and flow control.
36 *
37 * @author Salvatore Cammarata
38 * @author Hein Meling
39 * @since Jgroup 1.2
40 */
41 final class MsgFlowRcvrSide
42 implements MssConstants
43 {
44
45 ////////////////////////////////////////////////////////////////////////////////////////////
46 // Logger
47 ////////////////////////////////////////////////////////////////////////////////////////////
48
49 /** Obtain logger for this class */
50 private static final Logger log = Logger.getLogger(MsgFlowRcvrSide.class);
51
52
53 ////////////////////////////////////////////////////////////////////////////////////////////
54 // Fields
55 ////////////////////////////////////////////////////////////////////////////////////////////
56
57 /** Local host */
58 private MssHost localHost;
59
60 /** The default payload length of each message */
61 private int payload;
62
63 /** Right edge of the sliding window */
64 private int lastMsgRcvd;
65
66 /** Sliding window */
67 SWindow swindow;
68
69 /**
70 * clusterWindow is used to obtain the minimum lastDelivered(x)
71 * where x is a member of the local cluster.
72 */
73 ClusterWindow clusterWindow;
74
75 /** List of nack sent to host */
76 private Map<Integer, ScheduledEvent> nacks =
77 Collections.synchronizedMap(new HashMap<Integer, ScheduledEvent>());
78
79 /** Queue of the message received and not yet delivered */
80 private SortedMap<Integer, FragmentHeader> rcvd = new TreeMap<Integer, FragmentHeader>();
81
82 /** Queue of the reliable message received and not yet delivered */
83 //FIXED HEIN: Was not used; consider if it is needed later?!
84 // TreeMap reliableRcvd = new TreeMap();
85
86 /** Queue of the fragments not yet composed */
87 private SortedMap<Integer, byte[]> fragments = new TreeMap<Integer, byte[]>();
88
89 /** Total message byte size */
90 private int totlen;
91
92
93 ////////////////////////////////////////////////////////////////////////////////////////////
94 // Constructors
95 ////////////////////////////////////////////////////////////////////////////////////////////
96
97 /**
98 * Creates an empty object not initialized
99 */
100 MsgFlowRcvrSide(TransportConfig config)
101 {
102 this.payload = config.getPayload();
103 swindow = new SWindow(config.getMaxWindow(), UNDEF);
104 }
105
106
107 ////////////////////////////////////////////////////////////////////////////////////////////
108 // Methods
109 ////////////////////////////////////////////////////////////////////////////////////////////
110
111 /**
112 * The receive window is closed
113 */
114 void reset()
115 {
116 reset(UNDEF);
117 }
118
119 /**
120 * We initialize the receive window with to given message fragment
121 * identifier, in order to synchronize.
122 */
123 void reset(int mid)
124 {
125 if (log.isDebugEnabled())
126 log.debug("Synchronization - lastMsgSent=" + mid);
127
128 lastMsgRcvd = mid;
129 swindow.clear(mid);
130 /*
131 * Note that we cannot initialize the localHost until here, since
132 * each host is associated with a MsgFlowRcvrSide object; and the
133 * localhost may not be the first one to be initialized. We do a
134 * test here so that we can avoid creating a new ClusterWindow
135 * object for each reset invocation; instead, we just clear it.
136 */
137 if (localHost == null) {
138 localHost = HostTable.getLocalHost();
139 clusterWindow = new ClusterWindow(localHost.getCluster().size());
140 } else {
141 clusterWindow.clear();
142 }
143 clusterWindow.set(localHost.getClusterIndex(), mid);
144 nacks.clear();
145 // reliableRcvd.clear();
146 rcvd.clear();
147 fragments.clear();
148 totlen = 0;
149 }
150
151
152 /**
153 * Set the last message received and return the old last received
154 * value.
155 *
156 * @return
157 * The identifier for old last received message.
158 */
159 int setLastMsgRcvd(int mid)
160 {
161 int oldLastRcvd = lastMsgRcvd;
162 lastMsgRcvd = mid;
163 return oldLastRcvd;
164 }
165
166
167 int getLastMsgRcvd()
168 {
169 return lastMsgRcvd;
170 }
171
172
173 int getLastMsgDlvr()
174 {
175 return swindow.lastMsgDlvr;
176 }
177
178
179 /**
180 * Returns true if the specified message has already been delivered;
181 * otherwise false is returned.
182 */
183 boolean isDelivered(int fid)
184 {
185 return swindow.lookup(fid);
186 }
187
188
189 /**
190 * Returns true if the specified message has an associated NACK
191 * timeout that has not yet expired; otherwise false is returned.
192 */
193 boolean hasNACK(int mid)
194 {
195 return (mid < lastMsgRcvd && mid > swindow.lastMsgDlvr);
196 }
197
198
199 /**
200 * Returns the number of received messages that has not yet been
201 * delivered due to not respecting FIFO-order. It also updates the
202 * sliding window datastructures accordingly.
203 */
204 int getDeliveryCount()
205 {
206 int pos = swindow.shiftFIFO();
207 clusterWindow.set(localHost.getClusterIndex(), swindow.lastMsgDlvr);
208 return pos;
209 }
210
211
212 /**
213 * Returns true if the specified message is in FIFO order; otherwise
214 * false is returned. If the message is in FIFO order, it also
215 * updates the sliding window datastructures accordingly.
216 */
217 boolean isFIFO(int mid)
218 {
219 if (mid == swindow.lastMsgDlvr + 1) {
220 swindow.shift();
221 if (lastMsgRcvd < swindow.lastMsgDlvr)
222 lastMsgRcvd = swindow.lastMsgDlvr;
223 clusterWindow.set(localHost.getClusterIndex(), mid);
224 return true;
225 } else {
226 return false;
227 }
228 }
229
230
231 /**
232 * Returns true if the specified message is not in FIFO order,
233 * causing a gap in the sequence of messages. This means that the
234 * received message must be greater than all those already delivered.
235 */
236 boolean isNotFIFO(int mid)
237 {
238 return (mid > swindow.lastMsgDlvr + 1);
239 }
240
241
242 /**
243 * Returns true if the specified message causes a gap in the sequence
244 * of received messages; otherwise false is returned.
245 */
246 boolean causeGap(int mid)
247 {
248 return (mid > lastMsgRcvd);
249 }
250
251
252 /**
253 * Insert the given message in the ordered queue. Also update the
254 * sliding window with the received messages.
255 *
256 * @param msgFrag
257 * The message fragment to insert in the ordered queue.
258 */
259 void insertMsgFrag(FragmentHeader msgFrag)
260 {
261 swindow.set(msgFrag.fragId);
262 rcvd.put(msgFrag.fragId, msgFrag);
263 }
264
265
266 /**
267 * Remove and return the first message fragment stored in the ordered
268 * queue.
269 *
270 * @return
271 * The first message stored in the ordered queue.
272 */
273 FragmentHeader removeMsgFrag()
274 {
275 return rcvd.remove(rcvd.firstKey());
276 }
277
278
279 /**
280 * Add a fragement to the list of fragments composing the message.
281 * If this is the last fragment, the message is composed and
282 * returned back to the caller.
283 *
284 * @param msg
285 * The message to be composed.
286 * @return
287 * Returns the message stream to be m-received by the upper layer
288 * if this was the last fragment of the message, and if it was for
289 * me; otherwise <code>null</code> is returned.
290 */
291 InMessage addFragment(FragmentHeader header)
292 {
293
294 byte[] msgFragment = header.getFragment();
295 int fragLen = header.getFragmentLength() - OVERHEAD_SIZE;
296 if (!header.isLastFragment()) {
297
298 /*
299 * This is not the last fragment; add it to the fragments TreeMap.
300 */
301 if (log.isDebugEnabled())
302 log.debug("NOT the last fragement of msg (" + header.fragId + ")");
303 totlen += payload;
304 fragments.put(header.fragId, msgFragment);
305 return null;
306
307 } else {
308
309 /*
310 * Last fragment; compose and deliver
311 */
312 totlen += fragLen;
313 if (log.isDebugEnabled())
314 log.debug("LAST FRAGEMENT: frag.len= "+ fragLen
315 + " total.len= " + totlen + " payload=" + payload);
316 InMessage msgstream = new InMessage(payload, HEADER_SIZE, TRAILER_SIZE, totlen);
317 /*
318 * For each fragment of the message, reconstruct the message
319 * stream in correct (ascending) order of the message
320 * identifiers (keys to the <code>fragments</code> TreeMap).
321 * After the fragment is inserted into the message stream, it is
322 * removed from the iterator (and the underlying TreeMap.)
323 */
324 for (Iterator iter = fragments.keySet().iterator(); iter.hasNext(); ) {
325 Integer fid = (Integer) iter.next();
326 byte[] msgFrag = fragments.get(fid);
327 msgstream.insert(msgFrag, payload);
328 if (log.isDebugEnabled())
329 log.debug("inserting msg (" + fid + ") in stream\n" + msgstream);
330 iter.remove();
331 }
332
333 /* Insert the last fragment into the message stream. */
334 msgstream.insert(msgFragment, fragLen);
335 if (log.isDebugEnabled())
336 log.debug("final msg (" + header.fragId + ") in stream\n" + msgstream);
337 totlen = 0;
338 return msgstream;
339
340 }
341 }
342
343
344 ////////////////////////////////////////////////////////////////////////////////////////////
345 // Event handling methods
346 ////////////////////////////////////////////////////////////////////////////////////////////
347
348 ScheduledEvent getScheduledEvent(int mid)
349 {
350 return nacks.get(mid);
351 }
352
353
354 ScheduledEvent putScheduledEvent(int mid, ScheduledEvent event)
355 {
356 return nacks.put(mid, event);
357 }
358
359
360 ScheduledEvent removeScheduledEvent(int mid)
361 {
362 return nacks.remove(mid);
363 }
364
365
366 ////////////////////////////////////////////////////////////////////////////////////////////
367 // Object methods
368 ////////////////////////////////////////////////////////////////////////////////////////////
369
370 public String toString()
371 {
372 StringBuilder buf = new StringBuilder("[MsgFlowRcvrSide: lastMsgRcvd=");
373 buf.append(lastMsgRcvd);
374 buf.append(", ");
375 buf.append(swindow);
376 buf.append(", ");
377 buf.append(clusterWindow);
378 return buf.toString();
379 }
380
381 } // END MsgFlowRcvrSide