1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.io.ObjectInput;
23 import java.util.Iterator;
24 import java.util.List;
25
26 import jgroup.core.EndPoint;
27 import jgroup.relacs.daemon.DaemonMsg;
28 import jgroup.relacs.types.EndPointImpl;
29 import jgroup.relacs.types.GroupIndex;
30 import jgroup.relacs.types.MessageId;
31 import jgroup.util.InMessage;
32 import jgroup.util.OutMessage;
33
34 import org.apache.log4j.Logger;
35
36 /**
37 * The <code>MsgJG</code> class is the main <i>mss</i> level message,
38 * used for passing Jgroup related multicast messages to group members.
39 *
40 * @author Hein Meling
41 * @since Jgroup 1.2
42 */
43 public final class MsgJG
44 implements Msg, MssConstants, MssTag
45 {
46
47 ////////////////////////////////////////////////////////////////////////////////////////////
48 // Logger
49 ////////////////////////////////////////////////////////////////////////////////////////////
50
51 /** Obtain logger for this class */
52 private static Logger log = Logger.getLogger(MsgJG.class);
53
54
55 ////////////////////////////////////////////////////////////////////////////////////////////
56 // Static section
57 ////////////////////////////////////////////////////////////////////////////////////////////
58
59 /**
60 * Constant null array used when no receivers are set; meaning that
61 * all should receive the message. Object reuse pattern; see Item 27
62 * in Effective Java.
63 */
64 private final static EndPoint[] NULL_ENDPOINT_ARRAY = new EndPoint[0];
65
66
67 ////////////////////////////////////////////////////////////////////////////////////////////
68 // Message Identifier Generator
69 ////////////////////////////////////////////////////////////////////////////////////////////
70
71 private static transient int fragIdGen = 0;
72
73
74 ////////////////////////////////////////////////////////////////////////////////////////////
75 // Message Fields (static part)
76 ////////////////////////////////////////////////////////////////////////////////////////////
77
78 /** Fragment identifier */
79 private int fragId;
80
81 /** Tag assigned in upper level (@see jgroup.relacs.daemon.Tag) */
82 private byte JGtag;
83
84 /** The cluster to which the message will be sent (only endpoint is transmitted) */
85 private transient Cluster cluster;
86
87 /**
88 * Pointer to the message trailer, which contains the dynamic part of
89 * the message (see below)
90 */
91 private int trailerStartPos;
92
93
94 ////////////////////////////////////////////////////////////////////////////////////////////
95 // Message Fields (dynamic part)
96 ////////////////////////////////////////////////////////////////////////////////////////////
97
98 /**
99 * Receivers array (zero length if ALL should receive this message).
100 * This array contains all receivers, when accessed from the sending
101 * side, while on the receiver side it contains only those whom was
102 * marshalled into the message destined for a particular cluster.
103 */
104 private EndPoint[] receivers;
105
106 /** Distinguish between local and external flow control information */
107 private byte FCtype;
108
109 /** Array of flow control entries */
110 private FCEntry[] fc;
111
112
113 ////////////////////////////////////////////////////////////////////////////////////////////
114 // Transient fields (recomputed during unmarshalling)
115 ////////////////////////////////////////////////////////////////////////////////////////////
116
117 /** The MssDS object */
118 private transient MssDS mssds;
119
120 /** Message to be sent */
121 private transient OutMessage outmsg;
122
123 /** Message received from the Mss */
124 private transient InMessage inmsg;
125
126 /** True if this host is in the destination set of this message */
127 private transient boolean isForMe;
128
129 /** The index of the flow control entry associated with the local host */
130 private transient int localFCIndex = UNDEF;
131
132 /** The local host (where this message resides; sender or receiver) */
133 private transient MssHost localHost;
134
135 /** The sender host */
136 private transient MssHost sender;
137
138 /** The start position of the cluster entry in the outmsg stream */
139 private transient int clusterStartPosition;
140
141 /** The start position of the receiver set in the outmsg stream */
142 private transient int receiversStartPosition;
143
144
145 ////////////////////////////////////////////////////////////////////////////////////////////
146 // Constructors and marshalling methods
147 ////////////////////////////////////////////////////////////////////////////////////////////
148
149 /**
150 * Marshal a message with receiver set all.
151 */
152 static MsgJG marshal(DaemonMsg dmsg, byte JGtag, MssDS mssds)
153 throws IOException
154 {
155 return new MsgJG(dmsg, JGtag, NULL_ENDPOINT_ARRAY, mssds);
156 }
157
158
159 static MsgJG marshal(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds)
160 throws IOException
161 {
162 return new MsgJG(dmsg, JGtag, receivers, mssds);
163 }
164
165
166 /**
167 * Marshalling constructor: Creates a new <code>MsgJG</code>
168 * instance, based on an upper level message originated by the Jgroup
169 * daemon.
170 *
171 * @param dmsg
172 * A <code>DaemonMsg</code> from the upper level. The set
173 * of possible messages are in the @see jgroup.relacs.daemon
174 * package.
175 * @param JGtag
176 * A one byte tag indicating an upper level message type that is
177 * encapsulated within the given <code>outmsg</code>. For the set
178 * of possible values @see jgroup.relacs.daemon.Tag.
179 * @param dest a <code>Cluster</code> value
180 * @param receivers a <code>MssHost[]</code> value
181 * @param mssds a <code>MssDS</code> value
182 *
183 * @exception IOException if an error occurs
184 */
185 private MsgJG(DaemonMsg dmsg, byte JGtag, EndPoint[] receivers, MssDS mssds)
186 throws IOException
187 {
188 localHost = HostTable.getLocalHost();
189 sender = localHost;
190 this.receivers = receivers;
191 this.mssds = mssds;
192 this.outmsg = dmsg.getOutMessage();
193
194 this.fragId = fragIdGen++;
195 this.JGtag = JGtag;
196
197 /*
198 * The static <i>mss</i> data associated with a <code>MsgJG</code>
199 * is kept in the beginning of the stream (in the header), while the
200 * dynamic <i>mss</i> level data is kept at the end of the
201 * <code>OutMessage</code> stream (in the trailer). In addition to
202 * the static data, we need to keep the trailer index in the header
203 * to allow unmarshalling, starting from the beginning of the
204 * <code>OutMessage</code> stream.
205 *
206 * Note that the <code>dmsg.size()</code> returns the number of
207 * bytes that originates from the daemon layer, and since the first
208 * part of every Mss level message is <code>MSS_HEADER_SIZE</code>
209 * bytes, we have to add this to get the correct trailer position.
210 */
211 trailerStartPos = dmsg.size() + MSS_HEADER_SIZE;
212 log.assertLog(dmsg.size() != 0, "Illegal position for provided DaemonMsg");
213 outmsg.seek(0);
214
215 /*
216 * Store the trailer index in the static section of the <i>mss</i>
217 * level data. The trailer is written using the methods
218 * <code>setCluster()</code> and <code>setReceivers()</code>.
219 */
220 outmsg.writeInt(trailerStartPos);
221 MessageId.marshal(outmsg, fragId);
222 outmsg.writeByte(JGtag);
223 clusterStartPosition = outmsg.getPosition();
224 }
225
226
227 /**
228 * Returns a <code>MsgJG</code> object that contains a partially
229 * decoded m-received input stream, containing only the static part
230 * of the message structure. To decode the whole stream, including
231 * the dynamic part of the <i>mss</i> level message, all message
232 * fragments must have been added into the message object using the
233 * <code>addFragment()<code> method.
234 *
235 * @param inmsg
236 * The message input stream to decode.
237 */
238 static MsgJG unmarshal(ObjectInput inmsg, FragmentHeader header, MssDS mssds)
239 throws IOException, ClassNotFoundException
240 {
241 return new MsgJG((InMessage) inmsg, header, mssds);
242 }
243
244
245 /**
246 * Unmarshalling constructor: Constructs a new <code>MsgJG</code>
247 * data structure from an m-received message.
248 *
249 * @param inmsg
250 * The m-received <code>InMessage</code> object
251 *
252 * @exception IOException
253 * Raised if there was problem unmarshalling the message.
254 */
255 private MsgJG(InMessage inmsg, FragmentHeader header, MssDS mssds)
256 throws IOException, ClassNotFoundException
257 {
258 this.inmsg = inmsg;
259 this.mssds = mssds;
260
261 /* Get the local host of the receiver */
262 localHost = HostTable.getLocalHost();
263 sender = header.getSender();
264
265 /*
266 * Unmarshalling static header part.
267 */
268 /*
269 * Retrive pointer to the trailer start position in which the
270 * <i>mss</i> level message data is collected.
271 */
272 trailerStartPos = inmsg.readInt();
273 fragId = MessageId.unmarshal(inmsg);
274 JGtag = inmsg.readByte();
275 EndPointImpl endpoint = new EndPointImpl();
276 endpoint.readExternal(inmsg);
277 cluster = mssds.getClusterTable().lookup(endpoint);
278 }
279
280
281 /**
282 * Unmarshalling method to complete the unmarshalling of a
283 * <code>MsgJG</code> object. It assumes that all fragments has been
284 * added to the stream using the <code>addFragment()</code> method.
285 */
286 InMessage complete()
287 throws IOException, ClassNotFoundException
288 {
289 // Seek to the start of the dynamic data; the trailer
290 inmsg.seek(trailerStartPos);
291
292 // Flow control data
293 FCtype = inmsg.readByte();
294 int len = GroupIndex.unmarshal(inmsg);
295 fc = new FCEntry[len];
296 for (int i = 0; i < len; i++) {
297 fc[i] = new FCEntry();
298 fc[i].readExternal(inmsg);
299 if (localFCIndex == UNDEF && fc[i].key.isLocal())
300 localFCIndex = i;
301 }
302
303 // Set of receivers
304 int nreceivers = GroupIndex.unmarshal(inmsg);
305 if (nreceivers == ALL) {
306 isForMe = true;
307 receivers = NULL_ENDPOINT_ARRAY;
308 } else {
309 isForMe = false;
310 receivers = new EndPoint[nreceivers];
311 for (int i = 0; i < nreceivers; i++) {
312 receivers[i] = new EndPointImpl();
313 receivers[i].readExternal(inmsg);
314 if (receivers[i].isLocal())
315 isForMe = true;
316 }
317 }
318
319 /*
320 * We are now ready to update the flow control information, attached
321 * with this message.
322 */
323 updateFC();
324
325 /*
326 * Seek to position <code>MSS_HEADER_SIZE</code> and mark that
327 * position so that any reset will return to that position
328 * instead; this is to avoid that external message users see the
329 * static header part of this message. <p>
330 *
331 * PS; Note that the parameter to the mark() method is not used.
332 */
333 inmsg.seek(MSS_HEADER_SIZE);
334 inmsg.mark(0);
335 return inmsg;
336 }
337
338
339 ////////////////////////////////////////////////////////////////////////////////////////////
340 // Fragmentation handling (from Msg interface)
341 ////////////////////////////////////////////////////////////////////////////////////////////
342
343 /**
344 * Returns a <code>FragmentIterator</code> for this
345 * <code>MsgJG</code> object. This iterator allows to send the
346 * entire message as multiple fragments of specified size (payload).
347 * At the same time, it marks each fragment with a tag and message
348 * identifier provided through the <code>next()</code> method of the
349 * iterator.
350 *
351 * Note that <code>MsgJG</code> messages cannot reuse the same
352 * iterator for sending to multiple clusters. This is because, at
353 * each iteration reuse, the fragment identifier is changed, causing
354 * problems when doing lookup in the sent queue to resend a message.
355 */
356 public FragmentIterator iterator(MsgCntrl msgCntrl)
357 {
358 return new MsgFragmentIterator(this, msgCntrl);
359 }
360
361
362 ////////////////////////////////////////////////////////////////////////////////////////////
363 // Msg interface methods
364 ////////////////////////////////////////////////////////////////////////////////////////////
365
366 /**
367 * Returns the tag associated with this message.
368 */
369 public byte getTag()
370 {
371 return JG;
372 }
373
374
375 /**
376 * Returns the message identifier for this message.
377 */
378 public int getMid()
379 {
380 return fragId;
381 }
382
383
384 /**
385 * Returns the sender of this message.
386 */
387 public MssHost getSender()
388 {
389 return sender;
390 }
391
392
393 /**
394 * Returns true if this message has to be routed to a different
395 * cluster.
396 */
397 public boolean hasToBeRouted()
398 {
399 return !cluster.isLocal();
400 }
401
402
403 /**
404 * Returns the message flow controller for the sender side.
405 */
406 public MsgFlowSndrSide getMsgFlow()
407 {
408 return cluster.getMsgFlow();
409 }
410
411
412 /**
413 * Returns the <code>OutMessage</code> associated with this message.
414 */
415 public OutMessage getOutMessage()
416 {
417 return outmsg;
418 }
419
420
421 ////////////////////////////////////////////////////////////////////////////////////////////
422 // MsgJG specified methods
423 ////////////////////////////////////////////////////////////////////////////////////////////
424
425 /**
426 * Marshal receivers into the stream of this message. If there are
427 * no receivers specified (the <code>receivers</code> list is empty),
428 * this means that all members should receive this message. <p>
429 *
430 * This method is invoked for each cluster, to marshal the receivers
431 * that belong to that cluster into the stream. That means, not all
432 * actual receivers may be marshalled into the stream.
433 *
434 * @param receivers
435 * List of receivers for this message; if the list is empty, all
436 * should receive the message.
437 */
438 public void setReceivers(List receivers)
439 {
440 outmsg.seek(receiversStartPosition);
441 try {
442 GroupIndex.marshal(outmsg, receivers.size());
443 for (Iterator iter = receivers.iterator(); iter.hasNext(); ) {
444 EndPoint recv = (EndPoint) iter.next();
445 recv.writeExternal(outmsg);
446 if (log.isDebugEnabled()) {
447 log.debug("setReceivers: " + recv);
448 }
449 }
450 } catch (IOException e) {
451 log.warn("setReceivers: Could not update receiver information in message stream");
452 }
453 }
454
455
456 public void setCluster(Cluster cluster)
457 {
458 if (log.isDebugEnabled()) {
459 log.debug("setCluster: " + cluster);
460 }
461
462 this.cluster = cluster;
463 if (cluster.isLocal()) {
464 FCtype = LOCALFC;
465 fc = mssds.getAllFCEntry();
466 } else {
467 FCtype = EXTERNFC;
468 fc = mssds.getClusterFCEntry(cluster.getEndPoint());
469 }
470
471 /* Find the location for writting the cluster. */
472 outmsg.seek(clusterStartPosition);
473 try {
474 cluster.getEndPoint().writeExternal(outmsg);
475 } catch (IOException e) {
476 log.warn("setCluster: Could not update cluster in message stream");
477 }
478
479 /*
480 * Search for the trailer location, and write out the flow control
481 * information.
482 */
483 outmsg.seek(trailerStartPos);
484 try {
485 outmsg.writeByte(FCtype);
486 GroupIndex.marshal(outmsg, fc.length);
487 for (int i = 0; i < fc.length; i++) {
488 fc[i].writeExternal(outmsg);
489 }
490 } catch (IOException e) {
491 log.warn("setCluster: Could not update flow control information in message stream");
492 }
493
494 /* Store the location for the receiver set. */
495 receiversStartPosition = outmsg.getPosition();
496 }
497
498
499 /**
500 * Returns the destination cluster of this message.
501 */
502 public Cluster getCluster()
503 {
504 return cluster;
505 }
506
507
508 /**
509 * Returns the receiver set for this message. On the sending side,
510 * this includes all receivers, while on the receiver side this may
511 * only include those that were marshalled into the message for a
512 * particular cluster.
513 */
514 public EndPoint[] getReceivers()
515 {
516 return receivers;
517 }
518
519
520 private void updateFC()
521 {
522 switch (FCtype) {
523
524 case EXTERNFC:
525 /* Check if there is a flow control entry for the local host */
526 if (localFCIndex != UNDEF) {
527 /* Update congestion information (sender side flow control) */
528 sender.flush(fc[localFCIndex].lastMsgRcvd);
529 }
530 break;
531
532 case LOCALFC:
533 if (sender.isLocal()) {
534
535 localHost.flush();
536
537 } else {
538
539 for (int i = 0; i < fc.length; i++) {
540 MssHost host = mssds.hostLookup(fc[i].key);
541 if (host == null) {
542 log.warn("Unavailable host: " + fc[i].key);
543 continue;
544 }
545 host.getMsgFlow().clusterWindow.set(sender.getClusterIndex(), fc[i].lastMsgRcvd);
546 }
547
548 }
549 break;
550
551 default:
552 log.warn("Erroneous flow control type");
553 }
554 }
555
556
557 /**
558 * Returns the upper level Jgroup tag associated with this message.
559 *
560 * @see jgroup.relacs.daemon.Tag
561 */
562 public byte getJGTag()
563 {
564 return JGtag;
565 }
566
567
568 /**
569 * Returns true if this message is destined for the local host.
570 */
571 boolean isForMe()
572 {
573 return isForMe;
574 }
575
576
577 ////////////////////////////////////////////////////////////////////////////////////////////
578 // Methods from Object
579 ////////////////////////////////////////////////////////////////////////////////////////////
580
581 /**
582 * Returns a string representation of this object
583 */
584 public String toString()
585 {
586 StringBuilder buf = new StringBuilder("MsgJG: jgtag=");
587 buf.append(JGtag);
588 buf.append(", sender=");
589 buf.append(sender);
590 buf.append(", cluster=");
591 buf.append(cluster);
592 buf.append(", nreceivers=");
593 buf.append(receivers.length);
594 for (int i = 0; i < receivers.length; i++) {
595 buf.append(", ");
596 buf.append(receivers[i]);
597 if (receivers[i].isLocal())
598 buf.append(" (LOCAL)");
599 }
600 if (outmsg != null) {
601 buf.append(", ");
602 buf.append(outmsg.toString());
603 } else if (inmsg != null) {
604 buf.append(", ");
605 buf.append(inmsg.toString());
606 }
607 return buf.toString();
608 }
609
610 }