1 /* 2 * Copyright (c) 1998-2002 The Jgroup Team. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2 as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU Lesser General Public License for more details. 12 * 13 * You should have received a copy of the GNU Lesser General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 16 * 17 */ 18 19 package jgroup.relacs.mss; 20 21 import java.io.IOException; 22 23 import jgroup.core.EndPoint; 24 import jgroup.relacs.types.GroupIndex; 25 import jgroup.util.InMessage; 26 import jgroup.util.MsgFactory; 27 import jgroup.util.OutMessage; 28 29 import org.apache.log4j.Logger; 30 31 /** 32 * The <code>MsgRouting</code> class 33 * 34 * @author Hein Meling 35 * @since Jgroup 1.2 36 */ 37 public final class MsgRouting 38 implements Msg, MssConstants, MssTag 39 { 40 41 //////////////////////////////////////////////////////////////////////////////////////////// 42 // Logger 43 //////////////////////////////////////////////////////////////////////////////////////////// 44 45 /** Obtain logger for this class */ 46 private static Logger log = Logger.getLogger(MsgRouting.class); 47 48 49 //////////////////////////////////////////////////////////////////////////////////////////// 50 // Unique message instance 51 //////////////////////////////////////////////////////////////////////////////////////////// 52 53 /** 54 * As this message is never stored in a queue, a single message 55 * instance is sufficient. This allows us to save the cost of 56 * allocating and garbage collecting messages. 57 */ 58 private static final MsgRouting msg = new MsgRouting(); 59 60 61 //////////////////////////////////////////////////////////////////////////////////////////// 62 // Message pointers 63 //////////////////////////////////////////////////////////////////////////////////////////// 64 65 private transient int fcStartPosition; 66 67 68 //////////////////////////////////////////////////////////////////////////////////////////// 69 // Message Fields (static part) 70 //////////////////////////////////////////////////////////////////////////////////////////// 71 72 /* Tag assigned in upper level (@see jgroup.relacs.mss.MssTag) */ 73 private byte routingTag; 74 75 76 //////////////////////////////////////////////////////////////////////////////////////////// 77 // Message Fields (dynamic part) 78 //////////////////////////////////////////////////////////////////////////////////////////// 79 80 /** Routing table */ 81 private RoutingTable rt; 82 83 /** Array of flow control entries */ 84 private FCEntry[] fc; 85 86 87 //////////////////////////////////////////////////////////////////////////////////////////// 88 // Transient fields (recomputed during unmarshalling) 89 //////////////////////////////////////////////////////////////////////////////////////////// 90 91 /** The MssDS object */ 92 private transient MssDS mssds; 93 94 /** Message to be sent */ 95 private transient OutMessage outmsg; 96 97 /** The index of the flow control entry associated with the local host */ 98 private transient int localFCIndex = UNDEF; 99 100 /** The message sender (obtained from the fragment header) */ 101 private transient MssHost sender; 102 103 104 //////////////////////////////////////////////////////////////////////////////////////////// 105 // Marshalling and unmarshalling methods 106 //////////////////////////////////////////////////////////////////////////////////////////// 107 108 /** 109 * Marshal a ROUTING message. 110 */ 111 static MsgRouting marshal(MssDS mssds) 112 throws IOException 113 { 114 msg.routingTag = ROUTING; 115 msg.mssds = mssds; 116 msg.sender = HostTable.getLocalHost(); 117 msg.rt = mssds.getRoutingTable(); 118 119 /* 120 * Compute the max size for this message. 121 */ 122 int size = GroupIndex.SIZE + mssds.size()*FCEntry.SIZE; 123 /* 124 * Here we check if this <code>MsgRouting</code> object has already 125 * been initialized with an outmsg object, in which case we simply 126 * clear the message object and reuse it. This avoids to create a 127 * new outmsg object (allocating additional memory) and there is no 128 * need to garbage collect the old one. 129 */ 130 if (msg.outmsg == null) { 131 int maxsize = size + RoutingTable.getMaxSize(mssds.size(), mssds.numOfClusters()); 132 if (log.isDebugEnabled()) { 133 log.debug("MsgRouting.maxsize=" + maxsize); 134 } 135 msg.outmsg = MsgFactory.get(maxsize); 136 } 137 size += msg.rt.getMarshalSize(); 138 if (log.isDebugEnabled()) { 139 log.debug("MsgRouting.size=" + size); 140 } 141 msg.outmsg.clear(size); 142 143 /* Marshal the routing table */ 144 msg.rt.writeExternal(msg.outmsg); 145 msg.fcStartPosition = msg.outmsg.getPosition(); 146 /* 147 * Note that, we do not initialize the flow control here, since the 148 * mss will do so using the <code>setFCData()</code> method below, 149 * for each cluster; allowing the mss to reuse the same 150 * <code>MsgRouting</code> object for sending to all clusters. 151 */ 152 return msg; 153 } 154 155 156 /** 157 * Unmarshal an FWDROUTING message from the given stream. 158 */ 159 static MsgRouting unmarshal(InMessage in, FragmentHeader header) 160 throws IOException, ClassNotFoundException 161 { 162 /* Obtain stuff from the header */ 163 msg.routingTag = header.getTag(); 164 msg.sender = header.getSender(); 165 166 /* Obtain the routing table from the stream */ 167 msg.rt = new RoutingTable(); 168 msg.rt.readExternal(in); 169 170 /* Obtain the flow control information table from the stream */ 171 int len = GroupIndex.unmarshal(in); 172 msg.fc = new FCEntry[len]; 173 for (int i=0; i < len; i++) { 174 msg.fc[i] = new FCEntry(); 175 msg.fc[i].readExternal(in); 176 if (msg.localFCIndex == UNDEF && msg.fc[i].key.isLocal()) 177 msg.localFCIndex = i; 178 } 179 return msg; 180 } 181 182 183 //////////////////////////////////////////////////////////////////////////////////////////// 184 // MsgRouting specific method 185 //////////////////////////////////////////////////////////////////////////////////////////// 186 187 /** 188 * Return the largest possible size that a <code>MsgRouting</code> 189 * message may consume, for the given distributed system size. 190 */ 191 public static int getMaxSize(int numOfHosts, int numOfClusters) 192 { 193 return GroupIndex.SIZE + numOfHosts*FCEntry.SIZE 194 + RoutingTable.getMaxSize(numOfHosts, numOfClusters); 195 } 196 197 198 /** 199 * Update the flow control data on the receiver side. 200 */ 201 void updateFC() 202 { 203 if (routingTag == FWDROUTING) { 204 205 /* Check if there is a flow control entry for the local host */ 206 if (localFCIndex != UNDEF) { 207 /* Update congestion information (sender side flow control) */ 208 sender.flush(fc[localFCIndex].lastMsgRcvd); 209 } 210 211 } else { 212 213 log.warn("Erroneous routing tag; can only process FWDROUTING: " + routingTag); 214 215 } 216 } 217 218 219 /** 220 * Returns the sender cluster of this message. 221 */ 222 Cluster getCluster() 223 { 224 return sender.getCluster(); 225 } 226 227 228 /** 229 * Returns the topology table associated with the routing table 230 * attached with this message. 231 */ 232 public TopologyEntry[] getTopologyTable() 233 { 234 return rt.table; 235 } 236 237 238 /** 239 * Negate the cost as reverse poison 240 */ 241 void splitHorizonOn(EndPoint key) 242 { 243 rt.splitHorizonOn(key); 244 } 245 246 247 /** 248 * 249 */ 250 void splitHorizonOff() 251 { 252 rt.splitHorizonOff(); 253 } 254 255 256 void setFCData(EndPoint destCluster) 257 { 258 fc = mssds.getClusterFCEntry(destCluster); 259 outmsg.seek(fcStartPosition); 260 try { 261 GroupIndex.marshal(outmsg, fc.length); 262 for (int i=0; i < fc.length; i++) 263 fc[i].writeExternal(outmsg); 264 } catch (IOException e) { 265 log.error("setFCData: Could not update flow control information in message stream", e); 266 } 267 } 268 269 270 //////////////////////////////////////////////////////////////////////////////////////////// 271 // Fragmentation handling (from Msg interface) 272 //////////////////////////////////////////////////////////////////////////////////////////// 273 274 /* 275 * The fragmentation iterator instance for this message; it can be 276 * reused for multiple iterations over this message. 277 */ 278 private transient MsgFragmentIterator fragIter = null; 279 280 281 /** 282 * Returns a <code>FragmentIterator</code> for this 283 * <code>MsgRouting</code> object. This iterator allows to send the 284 * entire message as multiple fragments of specified size (payload). 285 * At the same time, it marks each fragment with a tag and message 286 * identifier provided through the <code>next()</code> method of the 287 * iterator. 288 */ 289 public FragmentIterator iterator(MsgCntrl msgCntrl) 290 { 291 /* If there is already an iterator for this message, we reuse it. */ 292 if (fragIter == null) 293 fragIter = new MsgFragmentIterator(this, msgCntrl); 294 else 295 fragIter.reset(outmsg); 296 return fragIter; 297 } 298 299 300 //////////////////////////////////////////////////////////////////////////////////////////// 301 // Msg interface methods 302 //////////////////////////////////////////////////////////////////////////////////////////// 303 304 /** 305 * Returns the tag associated to this message. 306 */ 307 public byte getTag() 308 { 309 return routingTag; 310 } 311 312 313 /** 314 * Returns the message identifier for this message; all routing 315 * messages have <code>UNDEF</code> as their message identifier. 316 */ 317 public int getMid() 318 { 319 return UNDEF; 320 } 321 322 323 /** 324 * Returns the sender of this message. 325 */ 326 public MssHost getSender() 327 { 328 return sender; 329 } 330 331 332 /** 333 * Returns false always, since routing messages should never be routed. 334 */ 335 public boolean hasToBeRouted() 336 { 337 return false; 338 } 339 340 341 /** 342 * Returns true if the sender of this message is the local host. 343 */ 344 boolean isLocal() 345 { 346 return sender.isLocal(); 347 } 348 349 350 /** 351 * Returns the message flow controller for the sender side. 352 */ 353 public MsgFlowSndrSide getMsgFlow() 354 { 355 return sender.getCluster().getMsgFlow(); 356 } 357 358 359 /** 360 * Returns the <code>OutMessage</code> associated with this message. 361 */ 362 public OutMessage getOutMessage() 363 { 364 return outmsg; 365 } 366 367 368 //////////////////////////////////////////////////////////////////////////////////////////// 369 // Methods from Object 370 //////////////////////////////////////////////////////////////////////////////////////////// 371 372 /** 373 * Returns a string representation of this object 374 */ 375 public String toString() 376 { 377 StringBuilder buf = new StringBuilder(); 378 switch (routingTag) { 379 case FWDROUTING: 380 buf.append("FWDROUTING, "); 381 buf.append(rt.toString()); 382 for (int i=0; i < fc.length; i++) 383 buf.append(fc[i]); 384 break; 385 386 case ROUTING: 387 buf.append("ROUTING, "); 388 buf.append(rt.toString()); 389 for (int i=0; i < fc.length; i++) 390 buf.append(fc[i]); 391 break; 392 393 default: 394 buf.append("Illegal routing type: " + routingTag); 395 } 396 return buf.toString(); 397 } 398 399 } // END MsgRouting