1 /* 2 * Copyright (c) 1998-2002 The Jgroup Team. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2 as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU Lesser General Public License for more details. 12 * 13 * You should have received a copy of the GNU Lesser General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 16 * 17 */ 18 19 package jgroup.relacs.mss; 20 21 import java.io.IOException; 22 import java.io.ObjectInput; 23 import java.util.Iterator; 24 import java.util.List; 25 26 import jgroup.core.EndPoint; 27 import jgroup.relacs.daemon.DaemonMsg; 28 import jgroup.relacs.types.EndPointImpl; 29 import jgroup.relacs.types.GroupIndex; 30 import jgroup.relacs.types.MessageId; 31 import jgroup.util.InMessage; 32 import jgroup.util.OutMessage; 33 34 import org.apache.log4j.Logger; 35 36 /** 37 * The <code>MsgJG</code> class is the main <i>mss</i> level message, 38 * used for passing Jgroup related multicast messages to group members. 39 * 40 * @author Hein Meling 41 * @since Jgroup 1.2 42 */ 43 public final class MsgJG 44 implements Msg, MssConstants, MssTag 45 { 46 47 //////////////////////////////////////////////////////////////////////////////////////////// 48 // Logger 49 //////////////////////////////////////////////////////////////////////////////////////////// 50 51 /** Obtain logger for this class */ 52 private static Logger log = Logger.getLogger(MsgJG.class); 53 54 55 //////////////////////////////////////////////////////////////////////////////////////////// 56 // Static section 57 //////////////////////////////////////////////////////////////////////////////////////////// 58 59 /** 60 * Constant null array used when no receivers are set; meaning that 61 * all should receive the message. Object reuse pattern; see Item 27 62 * in Effective Java. 63 */ 64 private final static EndPoint[] NULL_ENDPOINT_ARRAY = new EndPoint[0]; 65 66 67 //////////////////////////////////////////////////////////////////////////////////////////// 68 // Message Identifier Generator 69 //////////////////////////////////////////////////////////////////////////////////////////// 70 71 private static transient int fragIdGen = 0; 72 73 74 //////////////////////////////////////////////////////////////////////////////////////////// 75 // Message Fields (static part) 76 //////////////////////////////////////////////////////////////////////////////////////////// 77 78 /** Fragment identifier */ 79 private int fragId; 80 81 /** Tag assigned in upper level (@see jgroup.relacs.daemon.Tag) */ 82 private byte JGtag; 83 84 /** The cluster to which the message will be sent (only endpoint is transmitted) */ 85 private transient Cluster cluster; 86 87 /** 88 * Pointer to the message trailer, which contains the dynamic part of 89 * the message (see below) 90 */ 91 private int trailerStartPos; 92 93 94 //////////////////////////////////////////////////////////////////////////////////////////// 95 // Message Fields (dynamic part) 96 //////////////////////////////////////////////////////////////////////////////////////////// 97 98 /** 99 * Receivers array (zero length if ALL should receive this message). 100 * This array contains all receivers, when accessed from the sending 101 * side, while on the receiver side it contains only those whom was 102 * marshalled into the message destined for a particular cluster. 103 */ 104 private EndPoint[] receivers; 105 106 /** Distinguish between local and external flow control information */ 107 private byte FCtype; 108 109 /** Array of flow control entries */ 110 private FCEntry[] fc; 111 112 113 //////////////////////////////////////////////////////////////////////////////////////////// 114 // Transient fields (recomputed during unmarshalling) 115 //////////////////////////////////////////////////////////////////////////////////////////// 116 117 /** The MssDS object */ 118 private transient MssDS mssds; 119 120 /** Message to be sent */ 121 private transient OutMessage outmsg; 122 123 /** Message received from the Mss */ 124 private transient InMessage inmsg; 125 126 /** True if this host is in the destination set of this message */ 127 private transient boolean isForMe; 128 129 /** The index of the flow control entry associated with the local host */ 130 private transient int localFCIndex = UNDEF; 131 132 /** The local host (where this message resides; sender or receiver) */ 133 private transient MssHost localHost; 134 135 /** The sender host */ 136 private transient MssHost sender; 137 138 /** The start position of the cluster entry in the outmsg stream */ 139 private transient int clusterStartPosition; 140 141 /** The start position of the receiver set in the outmsg stream */ 142 private transient int receiversStartPosition; 143 144 145 //////////////////////////////////////////////////////////////////////////////////////////// 146 // Constructors and marshalling methods 147 //////////////////////////////////////////////////////////////////////////////////////////// 148 149 /** 150 * Marshal a message with receiver set all. 151 */ 152 static MsgJG marshal(DaemonMsg dmsg, byte JGtag, MssDS mssds) 153 throws IOException 154 { 155 return new MsgJG(dmsg, JGtag, NULL_ENDPOINT_ARRAY, mssds); 156 } 157 158 159 static MsgJG marshal(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds) 160 throws IOException 161 { 162 return new MsgJG(dmsg, JGtag, receivers, mssds); 163 } 164 165 166 /** 167 * Marshalling constructor: Creates a new <code>MsgJG</code> 168 * instance, based on an upper level message originated by the Jgroup 169 * daemon. 170 * 171 * @param dmsg 172 * A <code>DaemonMsg</code> from the upper level. The set 173 * of possible messages are in the @see jgroup.relacs.daemon 174 * package. 175 * @param JGtag 176 * A one byte tag indicating an upper level message type that is 177 * encapsulated within the given <code>outmsg</code>. For the set 178 * of possible values @see jgroup.relacs.daemon.Tag. 179 * @param dest a <code>Cluster</code> value 180 * @param receivers a <code>MssHost[]</code> value 181 * @param mssds a <code>MssDS</code> value 182 * 183 * @exception IOException if an error occurs 184 */ 185 private MsgJG(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds) 186 throws IOException 187 { 188 localHost = HostTable.getLocalHost(); 189 sender = localHost; 190 this.receivers = receivers; 191 this.mssds = mssds; 192 this.outmsg = dmsg.getOutMessage(); 193 194 this.fragId = fragIdGen++; 195 this.JGtag = JGtag; 196 197 /* 198 * The static <i>mss</i> data associated with a <code>MsgJG</code> 199 * is kept in the beginning of the stream (in the header), while the 200 * dynamic <i>mss</i> level data is kept at the end of the 201 * <code>OutMessage</code> stream (in the trailer). In addition to 202 * the static data, we need to keep the trailer index in the header 203 * to allow unmarshalling, starting from the beginning of the 204 * <code>OutMessage</code> stream. 205 * 206 * Note that the <code>dmsg.size()</code> returns the number of 207 * bytes that originates from the daemon layer, and since the first 208 * part of every Mss level message is <code>MSS_HEADER_SIZE</code> 209 * bytes, we have to add this to get the correct trailer position. 210 */ 211 trailerStartPos = dmsg.size() + MSS_HEADER_SIZE; 212 log.assertLog(dmsg.size() != 0, "Illegal position for provided DaemonMsg"); 213 outmsg.seek(0); 214 215 /* 216 * Store the trailer index in the static section of the <i>mss</i> 217 * level data. The trailer is written using the methods 218 * <code>setCluster()</code> and <code>setReceivers()</code>. 219 */ 220 outmsg.writeInt(trailerStartPos); 221 MessageId.marshal(outmsg, fragId); 222 outmsg.writeByte(JGtag); 223 clusterStartPosition = outmsg.getPosition(); 224 } 225 226 227 /** 228 * Returns a <code>MsgJG</code> object that contains a partially 229 * decoded m-received input stream, containing only the static part 230 * of the message structure. To decode the whole stream, including 231 * the dynamic part of the <i>mss</i> level message, all message 232 * fragments must have been added into the message object using the 233 * <code>addFragment()<code> method. 234 * 235 * @param inmsg 236 * The message input stream to decode. 237 */ 238 static MsgJG unmarshal(ObjectInput inmsg, FragmentHeader header, MssDS mssds) 239 throws IOException, ClassNotFoundException 240 { 241 return new MsgJG((InMessage) inmsg, header, mssds); 242 } 243 244 245 /** 246 * Unmarshalling constructor: Constructs a new <code>MsgJG</code> 247 * data structure from an m-received message. 248 * 249 * @param inmsg 250 * The m-received <code>InMessage</code> object 251 * 252 * @exception IOException 253 * Raised if there was problem unmarshalling the message. 254 */ 255 private MsgJG(InMessage inmsg, FragmentHeader header, MssDS mssds) 256 throws IOException, ClassNotFoundException 257 { 258 this.inmsg = inmsg; 259 this.mssds = mssds; 260 261 /* Get the local host of the receiver */ 262 localHost = HostTable.getLocalHost(); 263 sender = header.getSender(); 264 265 /* 266 * Unmarshalling static header part. 267 */ 268 /* 269 * Retrive pointer to the trailer start position in which the 270 * <i>mss</i> level message data is collected. 271 */ 272 trailerStartPos = inmsg.readInt(); 273 fragId = MessageId.unmarshal(inmsg); 274 JGtag = inmsg.readByte(); 275 EndPointImpl endpoint = new EndPointImpl(); 276 endpoint.readExternal(inmsg); 277 cluster = mssds.getClusterTable().lookup(endpoint); 278 } 279 280 281 /** 282 * Unmarshalling method to complete the unmarshalling of a 283 * <code>MsgJG</code> object. It assumes that all fragments has been 284 * added to the stream using the <code>addFragment()</code> method. 285 */ 286 InMessage complete() 287 throws IOException, ClassNotFoundException 288 { 289 // Seek to the start of the dynamic data; the trailer 290 inmsg.seek(trailerStartPos); 291 292 // Flow control data 293 FCtype = inmsg.readByte(); 294 int len = GroupIndex.unmarshal(inmsg); 295 fc = new FCEntry[len]; 296 for (int i = 0; i < len; i++) { 297 fc[i] = new FCEntry(); 298 fc[i].readExternal(inmsg); 299 if (localFCIndex == UNDEF && fc[i].key.isLocal()) 300 localFCIndex = i; 301 } 302 303 // Set of receivers 304 int nreceivers = GroupIndex.unmarshal(inmsg); 305 if (nreceivers == ALL) { 306 isForMe = true; 307 receivers = NULL_ENDPOINT_ARRAY; 308 } else { 309 isForMe = false; 310 receivers = new EndPoint[nreceivers]; 311 for (int i = 0; i < nreceivers; i++) { 312 receivers[i] = new EndPointImpl(); 313 receivers[i].readExternal(inmsg); 314 if (receivers[i].isLocal()) 315 isForMe = true; 316 } 317 } 318 319 /* 320 * We are now ready to update the flow control information, attached 321 * with this message. 322 */ 323 updateFC(); 324 325 /* 326 * Seek to position <code>MSS_HEADER_SIZE</code> and mark that 327 * position so that any reset will return to that position 328 * instead; this is to avoid that external message users see the 329 * static header part of this message. <p> 330 * 331 * PS; Note that the parameter to the mark() method is not used. 332 */ 333 inmsg.seek(MSS_HEADER_SIZE); 334 inmsg.mark(0); 335 return inmsg; 336 } 337 338 339 //////////////////////////////////////////////////////////////////////////////////////////// 340 // Fragmentation handling (from Msg interface) 341 //////////////////////////////////////////////////////////////////////////////////////////// 342 343 /** 344 * Returns a <code>FragmentIterator</code> for this 345 * <code>MsgJG</code> object. This iterator allows to send the 346 * entire message as multiple fragments of specified size (payload). 347 * At the same time, it marks each fragment with a tag and message 348 * identifier provided through the <code>next()</code> method of the 349 * iterator. 350 * 351 * Note that <code>MsgJG</code> messages cannot reuse the same 352 * iterator for sending to multiple clusters. This is because, at 353 * each iteration reuse, the fragment identifier is changed, causing 354 * problems when doing lookup in the sent queue to resend a message. 355 */ 356 public FragmentIterator iterator(MsgCntrl msgCntrl) 357 { 358 return new MsgFragmentIterator(this, msgCntrl); 359 } 360 361 362 //////////////////////////////////////////////////////////////////////////////////////////// 363 // Msg interface methods 364 //////////////////////////////////////////////////////////////////////////////////////////// 365 366 /** 367 * Returns the tag associated with this message. 368 */ 369 public byte getTag() 370 { 371 return JG; 372 } 373 374 375 /** 376 * Returns the message identifier for this message. 377 */ 378 public int getMid() 379 { 380 return fragId; 381 } 382 383 384 /** 385 * Returns the sender of this message. 386 */ 387 public MssHost getSender() 388 { 389 return sender; 390 } 391 392 393 /** 394 * Returns true if this message has to be routed to a different 395 * cluster. 396 */ 397 public boolean hasToBeRouted() 398 { 399 return !cluster.isLocal(); 400 } 401 402 403 /** 404 * Returns the message flow controller for the sender side. 405 */ 406 public MsgFlowSndrSide getMsgFlow() 407 { 408 return cluster.getMsgFlow(); 409 } 410 411 412 /** 413 * Returns the <code>OutMessage</code> associated with this message. 414 */ 415 public OutMessage getOutMessage() 416 { 417 return outmsg; 418 } 419 420 421 //////////////////////////////////////////////////////////////////////////////////////////// 422 // MsgJG specified methods 423 //////////////////////////////////////////////////////////////////////////////////////////// 424 425 /** 426 * Marshal receivers into the stream of this message. If there are 427 * no receivers specified (the <code>receivers</code> list is empty), 428 * this means that all members should receive this message. <p> 429 * 430 * This method is invoked for each cluster, to marshal the receivers 431 * that belong to that cluster into the stream. That means, not all 432 * actual receivers may be marshalled into the stream. 433 * 434 * @param receivers 435 * List of receivers for this message; if the list is empty, all 436 * should receive the message. 437 */ 438 public void setReceivers(List receivers) 439 { 440 outmsg.seek(receiversStartPosition); 441 try { 442 GroupIndex.marshal(outmsg, receivers.size()); 443 for (Iterator iter = receivers.iterator(); iter.hasNext(); ) { 444 EndPoint recv = (EndPoint) iter.next(); 445 recv.writeExternal(outmsg); 446 if (log.isDebugEnabled()) { 447 log.debug("setReceivers: " + recv); 448 } 449 } 450 } catch (IOException e) { 451 log.warn("setReceivers: Could not update receiver information in message stream"); 452 } 453 } 454 455 456 public void setCluster(Cluster cluster) 457 { 458 if (log.isDebugEnabled()) { 459 log.debug("setCluster: " + cluster); 460 } 461 462 this.cluster = cluster; 463 if (cluster.isLocal()) { 464 FCtype = LOCALFC; 465 fc = mssds.getAllFCEntry(); 466 } else { 467 FCtype = EXTERNFC; 468 fc = mssds.getClusterFCEntry(cluster.getEndPoint()); 469 } 470 471 /* Find the location for writting the cluster. */ 472 outmsg.seek(clusterStartPosition); 473 try { 474 cluster.getEndPoint().writeExternal(outmsg); 475 } catch (IOException e) { 476 log.warn("setCluster: Could not update cluster in message stream"); 477 } 478 479 /* 480 * Search for the trailer location, and write out the flow control 481 * information. 482 */ 483 outmsg.seek(trailerStartPos); 484 try { 485 outmsg.writeByte(FCtype); 486 GroupIndex.marshal(outmsg, fc.length); 487 for (int i = 0; i < fc.length; i++) { 488 fc[i].writeExternal(outmsg); 489 } 490 } catch (IOException e) { 491 log.warn("setCluster: Could not update flow control information in message stream"); 492 } 493 494 /* Store the location for the receiver set. */ 495 receiversStartPosition = outmsg.getPosition(); 496 } 497 498 499 /** 500 * Returns the destination cluster of this message. 501 */ 502 public Cluster getCluster() 503 { 504 return cluster; 505 } 506 507 508 /** 509 * Returns the receiver set for this message. On the sending side, 510 * this includes all receivers, while on the receiver side this may 511 * only include those that were marshalled into the message for a 512 * particular cluster. 513 */ 514 public EndPoint[] getReceivers() 515 { 516 return receivers; 517 } 518 519 520 private void updateFC() 521 { 522 switch (FCtype) { 523 524 case EXTERNFC: 525 /* Check if there is a flow control entry for the local host */ 526 if (localFCIndex != UNDEF) { 527 /* Update congestion information (sender side flow control) */ 528 sender.flush(fc[localFCIndex].lastMsgRcvd); 529 } 530 break; 531 532 case LOCALFC: 533 if (sender.isLocal()) { 534 535 localHost.flush(); 536 537 } else { 538 539 for (int i = 0; i < fc.length; i++) { 540 MssHost host = mssds.hostLookup(fc[i].key); 541 if (host == null) { 542 log.warn("Unavailable host: " + fc[i].key); 543 continue; 544 } 545 host.getMsgFlow().clusterWindow.set(sender.getClusterIndex(), fc[i].lastMsgRcvd); 546 } 547 548 } 549 break; 550 551 default: 552 log.warn("Erroneous flow control type"); 553 } 554 } 555 556 557 /** 558 * Returns the upper level Jgroup tag associated with this message. 559 * 560 * @see jgroup.relacs.daemon.Tag 561 */ 562 public byte getJGTag() 563 { 564 return JGtag; 565 } 566 567 568 /** 569 * Returns true if this message is destined for the local host. 570 */ 571 boolean isForMe() 572 { 573 return isForMe; 574 } 575 576 577 //////////////////////////////////////////////////////////////////////////////////////////// 578 // Methods from Object 579 //////////////////////////////////////////////////////////////////////////////////////////// 580 581 /** 582 * Returns a string representation of this object 583 */ 584 public String toString() 585 { 586 StringBuilder buf = new StringBuilder("MsgJG: jgtag="); 587 buf.append(JGtag); 588 buf.append(", sender="); 589 buf.append(sender); 590 buf.append(", cluster="); 591 buf.append(cluster); 592 buf.append(", nreceivers="); 593 buf.append(receivers.length); 594 for (int i = 0; i < receivers.length; i++) { 595 buf.append(", "); 596 buf.append(receivers[i]); 597 if (receivers[i].isLocal()) 598 buf.append(" (LOCAL)"); 599 } 600 if (outmsg != null) { 601 buf.append(", "); 602 buf.append(outmsg.toString()); 603 } else if (inmsg != null) { 604 buf.append(", "); 605 buf.append(inmsg.toString()); 606 } 607 return buf.toString(); 608 } 609 610 }