1 /* 2 * Copyright (c) 1998-2002 The Jgroup Team. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2 as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU Lesser General Public License for more details. 12 * 13 * You should have received a copy of the GNU Lesser General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 16 * 17 */ 18 19 package jgroup.relacs.mss; 20 21 import java.util.Collections; 22 import java.util.HashMap; 23 import java.util.Iterator; 24 import java.util.Map; 25 import java.util.SortedMap; 26 import java.util.TreeMap; 27 28 import jgroup.relacs.config.TransportConfig; 29 import jgroup.util.InMessage; 30 31 import org.apache.log4j.Logger; 32 33 /** 34 * The <code>MsgFlowRcvrSide</code> class handles the structures needed 35 * to maintain reliability and flow control. 36 * 37 * @author Salvatore Cammarata 38 * @author Hein Meling 39 * @since Jgroup 1.2 40 */ 41 final class MsgFlowRcvrSide 42 implements MssConstants 43 { 44 45 //////////////////////////////////////////////////////////////////////////////////////////// 46 // Logger 47 //////////////////////////////////////////////////////////////////////////////////////////// 48 49 /** Obtain logger for this class */ 50 private static final Logger log = Logger.getLogger(MsgFlowRcvrSide.class); 51 52 53 //////////////////////////////////////////////////////////////////////////////////////////// 54 // Fields 55 //////////////////////////////////////////////////////////////////////////////////////////// 56 57 /** Local host */ 58 private MssHost localHost; 59 60 /** The default payload length of each message */ 61 private int payload; 62 63 /** Right edge of the sliding window */ 64 private int lastMsgRcvd; 65 66 /** Sliding window */ 67 SWindow swindow; 68 69 /** 70 * clusterWindow is used to obtain the minimum lastDelivered(x) 71 * where x is a member of the local cluster. 72 */ 73 ClusterWindow clusterWindow; 74 75 /** List of nack sent to host */ 76 private Map<Integer, ScheduledEvent> nacks = 77 Collections.synchronizedMap(new HashMap<Integer, ScheduledEvent>()); 78 79 /** Queue of the message received and not yet delivered */ 80 private SortedMap<Integer, FragmentHeader> rcvd = new TreeMap<Integer, FragmentHeader>(); 81 82 /** Queue of the reliable message received and not yet delivered */ 83 //FIXED HEIN: Was not used; consider if it is needed later?! 84 // TreeMap reliableRcvd = new TreeMap(); 85 86 /** Queue of the fragments not yet composed */ 87 private SortedMap<Integer, byte[]> fragments = new TreeMap<Integer, byte[]>(); 88 89 /** Total message byte size */ 90 private int totlen; 91 92 93 //////////////////////////////////////////////////////////////////////////////////////////// 94 // Constructors 95 //////////////////////////////////////////////////////////////////////////////////////////// 96 97 /** 98 * Creates an empty object not initialized 99 */ 100 MsgFlowRcvrSide(TransportConfig config) 101 { 102 this.payload = config.getPayload(); 103 swindow = new SWindow(config.getMaxWindow(), UNDEF); 104 } 105 106 107 //////////////////////////////////////////////////////////////////////////////////////////// 108 // Methods 109 //////////////////////////////////////////////////////////////////////////////////////////// 110 111 /** 112 * The receive window is closed 113 */ 114 void reset() 115 { 116 reset(UNDEF); 117 } 118 119 /** 120 * We initialize the receive window with to given message fragment 121 * identifier, in order to synchronize. 122 */ 123 void reset(int mid) 124 { 125 if (log.isDebugEnabled()) 126 log.debug("Synchronization - lastMsgSent=" + mid); 127 128 lastMsgRcvd = mid; 129 swindow.clear(mid); 130 /* 131 * Note that we cannot initialize the localHost until here, since 132 * each host is associated with a MsgFlowRcvrSide object; and the 133 * localhost may not be the first one to be initialized. We do a 134 * test here so that we can avoid creating a new ClusterWindow 135 * object for each reset invocation; instead, we just clear it. 136 */ 137 if (localHost == null) { 138 localHost = HostTable.getLocalHost(); 139 clusterWindow = new ClusterWindow(localHost.getCluster().size()); 140 } else { 141 clusterWindow.clear(); 142 } 143 clusterWindow.set(localHost.getClusterIndex(), mid); 144 nacks.clear(); 145 // reliableRcvd.clear(); 146 rcvd.clear(); 147 fragments.clear(); 148 totlen = 0; 149 } 150 151 152 /** 153 * Set the last message received and return the old last received 154 * value. 155 * 156 * @return 157 * The identifier for old last received message. 158 */ 159 int setLastMsgRcvd(int mid) 160 { 161 int oldLastRcvd = lastMsgRcvd; 162 lastMsgRcvd = mid; 163 return oldLastRcvd; 164 } 165 166 167 int getLastMsgRcvd() 168 { 169 return lastMsgRcvd; 170 } 171 172 173 int getLastMsgDlvr() 174 { 175 return swindow.lastMsgDlvr; 176 } 177 178 179 /** 180 * Returns true if the specified message has already been delivered; 181 * otherwise false is returned. 182 */ 183 boolean isDelivered(int fid) 184 { 185 return swindow.lookup(fid); 186 } 187 188 189 /** 190 * Returns true if the specified message has an associated NACK 191 * timeout that has not yet expired; otherwise false is returned. 192 */ 193 boolean hasNACK(int mid) 194 { 195 return (mid < lastMsgRcvd && mid > swindow.lastMsgDlvr); 196 } 197 198 199 /** 200 * Returns the number of received messages that has not yet been 201 * delivered due to not respecting FIFO-order. It also updates the 202 * sliding window datastructures accordingly. 203 */ 204 int getDeliveryCount() 205 { 206 int pos = swindow.shiftFIFO(); 207 clusterWindow.set(localHost.getClusterIndex(), swindow.lastMsgDlvr); 208 return pos; 209 } 210 211 212 /** 213 * Returns true if the specified message is in FIFO order; otherwise 214 * false is returned. If the message is in FIFO order, it also 215 * updates the sliding window datastructures accordingly. 216 */ 217 boolean isFIFO(int mid) 218 { 219 if (mid == swindow.lastMsgDlvr + 1) { 220 swindow.shift(); 221 if (lastMsgRcvd < swindow.lastMsgDlvr) 222 lastMsgRcvd = swindow.lastMsgDlvr; 223 clusterWindow.set(localHost.getClusterIndex(), mid); 224 return true; 225 } else { 226 return false; 227 } 228 } 229 230 231 /** 232 * Returns true if the specified message is not in FIFO order, 233 * causing a gap in the sequence of messages. This means that the 234 * received message must be greater than all those already delivered. 235 */ 236 boolean isNotFIFO(int mid) 237 { 238 return (mid > swindow.lastMsgDlvr + 1); 239 } 240 241 242 /** 243 * Returns true if the specified message causes a gap in the sequence 244 * of received messages; otherwise false is returned. 245 */ 246 boolean causeGap(int mid) 247 { 248 return (mid > lastMsgRcvd); 249 } 250 251 252 /** 253 * Insert the given message in the ordered queue. Also update the 254 * sliding window with the received messages. 255 * 256 * @param msgFrag 257 * The message fragment to insert in the ordered queue. 258 */ 259 void insertMsgFrag(FragmentHeader msgFrag) 260 { 261 swindow.set(msgFrag.fragId); 262 rcvd.put(msgFrag.fragId, msgFrag); 263 } 264 265 266 /** 267 * Remove and return the first message fragment stored in the ordered 268 * queue. 269 * 270 * @return 271 * The first message stored in the ordered queue. 272 */ 273 FragmentHeader removeMsgFrag() 274 { 275 return rcvd.remove(rcvd.firstKey()); 276 } 277 278 279 /** 280 * Add a fragement to the list of fragments composing the message. 281 * If this is the last fragment, the message is composed and 282 * returned back to the caller. 283 * 284 * @param msg 285 * The message to be composed. 286 * @return 287 * Returns the message stream to be m-received by the upper layer 288 * if this was the last fragment of the message, and if it was for 289 * me; otherwise <code>null</code> is returned. 290 */ 291 InMessage addFragment(FragmentHeader header) 292 { 293 294 byte[] msgFragment = header.getFragment(); 295 int fragLen = header.getFragmentLength() - OVERHEAD_SIZE; 296 if (!header.isLastFragment()) { 297 298 /* 299 * This is not the last fragment; add it to the fragments TreeMap. 300 */ 301 if (log.isDebugEnabled()) 302 log.debug("NOT the last fragement of msg (" + header.fragId + ")"); 303 totlen += payload; 304 fragments.put(header.fragId, msgFragment); 305 return null; 306 307 } else { 308 309 /* 310 * Last fragment; compose and deliver 311 */ 312 totlen += fragLen; 313 if (log.isDebugEnabled()) 314 log.debug("LAST FRAGEMENT: frag.len= "+ fragLen 315 + " total.len= " + totlen + " payload=" + payload); 316 InMessage msgstream = new InMessage(payload, HEADER_SIZE, TRAILER_SIZE, totlen); 317 /* 318 * For each fragment of the message, reconstruct the message 319 * stream in correct (ascending) order of the message 320 * identifiers (keys to the <code>fragments</code> TreeMap). 321 * After the fragment is inserted into the message stream, it is 322 * removed from the iterator (and the underlying TreeMap.) 323 */ 324 for (Iterator iter = fragments.keySet().iterator(); iter.hasNext(); ) { 325 Integer fid = (Integer) iter.next(); 326 byte[] msgFrag = fragments.get(fid); 327 msgstream.insert(msgFrag, payload); 328 if (log.isDebugEnabled()) 329 log.debug("inserting msg (" + fid + ") in stream\n" + msgstream); 330 iter.remove(); 331 } 332 333 /* Insert the last fragment into the message stream. */ 334 msgstream.insert(msgFragment, fragLen); 335 if (log.isDebugEnabled()) 336 log.debug("final msg (" + header.fragId + ") in stream\n" + msgstream); 337 totlen = 0; 338 return msgstream; 339 340 } 341 } 342 343 344 //////////////////////////////////////////////////////////////////////////////////////////// 345 // Event handling methods 346 //////////////////////////////////////////////////////////////////////////////////////////// 347 348 ScheduledEvent getScheduledEvent(int mid) 349 { 350 return nacks.get(mid); 351 } 352 353 354 ScheduledEvent putScheduledEvent(int mid, ScheduledEvent event) 355 { 356 return nacks.put(mid, event); 357 } 358 359 360 ScheduledEvent removeScheduledEvent(int mid) 361 { 362 return nacks.remove(mid); 363 } 364 365 366 //////////////////////////////////////////////////////////////////////////////////////////// 367 // Object methods 368 //////////////////////////////////////////////////////////////////////////////////////////// 369 370 public String toString() 371 { 372 StringBuilder buf = new StringBuilder("[MsgFlowRcvrSide: lastMsgRcvd="); 373 buf.append(lastMsgRcvd); 374 buf.append(", "); 375 buf.append(swindow); 376 buf.append(", "); 377 buf.append(clusterWindow); 378 return buf.toString(); 379 } 380 381 } // END MsgFlowRcvrSide