1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22
23 import jgroup.core.EndPoint;
24 import jgroup.relacs.types.GroupIndex;
25 import jgroup.util.InMessage;
26 import jgroup.util.MsgFactory;
27 import jgroup.util.OutMessage;
28
29 import org.apache.log4j.Logger;
30
31 /**
32 * The <code>MsgRouting</code> class
33 *
34 * @author Hein Meling
35 * @since Jgroup 1.2
36 */
37 public final class MsgRouting
38 implements Msg, MssConstants, MssTag
39 {
40
41 ////////////////////////////////////////////////////////////////////////////////////////////
42 // Logger
43 ////////////////////////////////////////////////////////////////////////////////////////////
44
45 /** Obtain logger for this class */
46 private static Logger log = Logger.getLogger(MsgRouting.class);
47
48
49 ////////////////////////////////////////////////////////////////////////////////////////////
50 // Unique message instance
51 ////////////////////////////////////////////////////////////////////////////////////////////
52
53 /**
54 * As this message is never stored in a queue, a single message
55 * instance is sufficient. This allows us to save the cost of
56 * allocating and garbage collecting messages.
57 */
58 private static final MsgRouting msg = new MsgRouting();
59
60
61 ////////////////////////////////////////////////////////////////////////////////////////////
62 // Message pointers
63 ////////////////////////////////////////////////////////////////////////////////////////////
64
65 private transient int fcStartPosition;
66
67
68 ////////////////////////////////////////////////////////////////////////////////////////////
69 // Message Fields (static part)
70 ////////////////////////////////////////////////////////////////////////////////////////////
71
72 /* Tag assigned in upper level (@see jgroup.relacs.mss.MssTag) */
73 private byte routingTag;
74
75
76 ////////////////////////////////////////////////////////////////////////////////////////////
77 // Message Fields (dynamic part)
78 ////////////////////////////////////////////////////////////////////////////////////////////
79
80 /** Routing table */
81 private RoutingTable rt;
82
83 /** Array of flow control entries */
84 private FCEntry[] fc;
85
86
87 ////////////////////////////////////////////////////////////////////////////////////////////
88 // Transient fields (recomputed during unmarshalling)
89 ////////////////////////////////////////////////////////////////////////////////////////////
90
91 /** The MssDS object */
92 private transient MssDS mssds;
93
94 /** Message to be sent */
95 private transient OutMessage outmsg;
96
97 /** The index of the flow control entry associated with the local host */
98 private transient int localFCIndex = UNDEF;
99
100 /** The message sender (obtained from the fragment header) */
101 private transient MssHost sender;
102
103
104 ////////////////////////////////////////////////////////////////////////////////////////////
105 // Marshalling and unmarshalling methods
106 ////////////////////////////////////////////////////////////////////////////////////////////
107
108 /**
109 * Marshal a ROUTING message.
110 */
111 static MsgRouting marshal(MssDS mssds)
112 throws IOException
113 {
114 msg.routingTag = ROUTING;
115 msg.mssds = mssds;
116 msg.sender = HostTable.getLocalHost();
117 msg.rt = mssds.getRoutingTable();
118
119 /*
120 * Compute the max size for this message.
121 */
122 int size = GroupIndex.SIZE + mssds.size()*FCEntry.SIZE;
123 /*
124 * Here we check if this <code>MsgRouting</code> object has already
125 * been initialized with an outmsg object, in which case we simply
126 * clear the message object and reuse it. This avoids to create a
127 * new outmsg object (allocating additional memory) and there is no
128 * need to garbage collect the old one.
129 */
130 if (msg.outmsg == null) {
131 int maxsize = size + RoutingTable.getMaxSize(mssds.size(), mssds.numOfClusters());
132 if (log.isDebugEnabled()) {
133 log.debug("MsgRouting.maxsize=" + maxsize);
134 }
135 msg.outmsg = MsgFactory.get(maxsize);
136 }
137 size += msg.rt.getMarshalSize();
138 if (log.isDebugEnabled()) {
139 log.debug("MsgRouting.size=" + size);
140 }
141 msg.outmsg.clear(size);
142
143 /* Marshal the routing table */
144 msg.rt.writeExternal(msg.outmsg);
145 msg.fcStartPosition = msg.outmsg.getPosition();
146 /*
147 * Note that, we do not initialize the flow control here, since the
148 * mss will do so using the <code>setFCData()</code> method below,
149 * for each cluster; allowing the mss to reuse the same
150 * <code>MsgRouting</code> object for sending to all clusters.
151 */
152 return msg;
153 }
154
155
156 /**
157 * Unmarshal an FWDROUTING message from the given stream.
158 */
159 static MsgRouting unmarshal(InMessage in, FragmentHeader header)
160 throws IOException, ClassNotFoundException
161 {
162 /* Obtain stuff from the header */
163 msg.routingTag = header.getTag();
164 msg.sender = header.getSender();
165
166 /* Obtain the routing table from the stream */
167 msg.rt = new RoutingTable();
168 msg.rt.readExternal(in);
169
170 /* Obtain the flow control information table from the stream */
171 int len = GroupIndex.unmarshal(in);
172 msg.fc = new FCEntry[len];
173 for (int i=0; i < len; i++) {
174 msg.fc[i] = new FCEntry();
175 msg.fc[i].readExternal(in);
176 if (msg.localFCIndex == UNDEF && msg.fc[i].key.isLocal())
177 msg.localFCIndex = i;
178 }
179 return msg;
180 }
181
182
183 ////////////////////////////////////////////////////////////////////////////////////////////
184 // MsgRouting specific method
185 ////////////////////////////////////////////////////////////////////////////////////////////
186
187 /**
188 * Return the largest possible size that a <code>MsgRouting</code>
189 * message may consume, for the given distributed system size.
190 */
191 public static int getMaxSize(int numOfHosts, int numOfClusters)
192 {
193 return GroupIndex.SIZE + numOfHosts*FCEntry.SIZE
194 + RoutingTable.getMaxSize(numOfHosts, numOfClusters);
195 }
196
197
198 /**
199 * Update the flow control data on the receiver side.
200 */
201 void updateFC()
202 {
203 if (routingTag == FWDROUTING) {
204
205 /* Check if there is a flow control entry for the local host */
206 if (localFCIndex != UNDEF) {
207 /* Update congestion information (sender side flow control) */
208 sender.flush(fc[localFCIndex].lastMsgRcvd);
209 }
210
211 } else {
212
213 log.warn("Erroneous routing tag; can only process FWDROUTING: " + routingTag);
214
215 }
216 }
217
218
219 /**
220 * Returns the sender cluster of this message.
221 */
222 Cluster getCluster()
223 {
224 return sender.getCluster();
225 }
226
227
228 /**
229 * Returns the topology table associated with the routing table
230 * attached with this message.
231 */
232 public TopologyEntry[] getTopologyTable()
233 {
234 return rt.table;
235 }
236
237
238 /**
239 * Negate the cost as reverse poison
240 */
241 void splitHorizonOn(EndPoint key)
242 {
243 rt.splitHorizonOn(key);
244 }
245
246
247 /**
248 *
249 */
250 void splitHorizonOff()
251 {
252 rt.splitHorizonOff();
253 }
254
255
256 void setFCData(EndPoint destCluster)
257 {
258 fc = mssds.getClusterFCEntry(destCluster);
259 outmsg.seek(fcStartPosition);
260 try {
261 GroupIndex.marshal(outmsg, fc.length);
262 for (int i=0; i < fc.length; i++)
263 fc[i].writeExternal(outmsg);
264 } catch (IOException e) {
265 log.error("setFCData: Could not update flow control information in message stream", e);
266 }
267 }
268
269
270 ////////////////////////////////////////////////////////////////////////////////////////////
271 // Fragmentation handling (from Msg interface)
272 ////////////////////////////////////////////////////////////////////////////////////////////
273
274 /*
275 * The fragmentation iterator instance for this message; it can be
276 * reused for multiple iterations over this message.
277 */
278 private transient MsgFragmentIterator fragIter = null;
279
280
281 /**
282 * Returns a <code>FragmentIterator</code> for this
283 * <code>MsgRouting</code> object. This iterator allows to send the
284 * entire message as multiple fragments of specified size (payload).
285 * At the same time, it marks each fragment with a tag and message
286 * identifier provided through the <code>next()</code> method of the
287 * iterator.
288 */
289 public FragmentIterator iterator(MsgCntrl msgCntrl)
290 {
291 /* If there is already an iterator for this message, we reuse it. */
292 if (fragIter == null)
293 fragIter = new MsgFragmentIterator(this, msgCntrl);
294 else
295 fragIter.reset(outmsg);
296 return fragIter;
297 }
298
299
300 ////////////////////////////////////////////////////////////////////////////////////////////
301 // Msg interface methods
302 ////////////////////////////////////////////////////////////////////////////////////////////
303
304 /**
305 * Returns the tag associated to this message.
306 */
307 public byte getTag()
308 {
309 return routingTag;
310 }
311
312
313 /**
314 * Returns the message identifier for this message; all routing
315 * messages have <code>UNDEF</code> as their message identifier.
316 */
317 public int getMid()
318 {
319 return UNDEF;
320 }
321
322
323 /**
324 * Returns the sender of this message.
325 */
326 public MssHost getSender()
327 {
328 return sender;
329 }
330
331
332 /**
333 * Returns false always, since routing messages should never be routed.
334 */
335 public boolean hasToBeRouted()
336 {
337 return false;
338 }
339
340
341 /**
342 * Returns true if the sender of this message is the local host.
343 */
344 boolean isLocal()
345 {
346 return sender.isLocal();
347 }
348
349
350 /**
351 * Returns the message flow controller for the sender side.
352 */
353 public MsgFlowSndrSide getMsgFlow()
354 {
355 return sender.getCluster().getMsgFlow();
356 }
357
358
359 /**
360 * Returns the <code>OutMessage</code> associated with this message.
361 */
362 public OutMessage getOutMessage()
363 {
364 return outmsg;
365 }
366
367
368 ////////////////////////////////////////////////////////////////////////////////////////////
369 // Methods from Object
370 ////////////////////////////////////////////////////////////////////////////////////////////
371
372 /**
373 * Returns a string representation of this object
374 */
375 public String toString()
376 {
377 StringBuilder buf = new StringBuilder();
378 switch (routingTag) {
379 case FWDROUTING:
380 buf.append("FWDROUTING, ");
381 buf.append(rt.toString());
382 for (int i=0; i < fc.length; i++)
383 buf.append(fc[i]);
384 break;
385
386 case ROUTING:
387 buf.append("ROUTING, ");
388 buf.append(rt.toString());
389 for (int i=0; i < fc.length; i++)
390 buf.append(fc[i]);
391 break;
392
393 default:
394 buf.append("Illegal routing type: " + routingTag);
395 }
396 return buf.toString();
397 }
398
399 } // END MsgRouting