1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22
23 import jgroup.util.InMessage;
24
25 import org.apache.log4j.Logger;
26
27 /**
28 * The <code>MsgCntrl</code> class
29 *
30 * @author Salvatore Cammarata
31 * @author Hein Meling
32 * @since Jgroup 1.2
33 */
34 final class MsgCntrl
35 implements MssConstants, MssTag
36 {
37
38 ////////////////////////////////////////////////////////////////////////////////////////////
39 // Logger
40 ////////////////////////////////////////////////////////////////////////////////////////////
41
42 /** Obtain logger for this class */
43 private static final Logger log = Logger.getLogger(MsgCntrl.class);
44
45
46 ////////////////////////////////////////////////////////////////////////////////////////////
47 // Fields
48 ////////////////////////////////////////////////////////////////////////////////////////////
49
50 /** Event handler */
51 private EventHandler ehandler;
52
53 /** Upper layer */
54 private MssUser mssuser;
55
56 /** Distributed system */
57 private MssDS mssds;
58
59 /** Local host */
60 private MssHost me;
61
62 /**
63 * Counts the number of messages received before the message flow is
64 * initialized.
65 */
66 private int flowCnt = 0;
67
68
69 ////////////////////////////////////////////////////////////////////////////////////////////
70 // Constructors
71 ////////////////////////////////////////////////////////////////////////////////////////////
72
73 MsgCntrl(EventHandler ehandler, MssUser mssuser, MssDS mssds)
74 {
75 this.ehandler = ehandler;
76 this.mssuser = mssuser;
77 this.mssds = mssds;
78 this.me = HostTable.getLocalHost();
79 }
80
81
82 ////////////////////////////////////////////////////////////////////////////////////////////
83 // Message send/receive handling
84 ////////////////////////////////////////////////////////////////////////////////////////////
85
86 /**
87 * Compute a fragment identifier for the given fragment iterator (message),
88 * and store the message in the <i>sent</i> queue. A message may have many
89 * fragments, and thus multiple fragment identifiers associated with
90 * it. Each of these fragment identifiers point to the same message
91 * instance.
92 *
93 * @param fragIter
94 * The fragment iterator (message) for which to compute a fragment identifier.
95 * @return
96 * A fragment identifier associated with the given message.
97 */
98 int msgSend(FragmentIterator fragIter)
99 {
100 Msg msg = fragIter.getMsg();
101 MsgFlowSndrSide mf = msg.getMsgFlow();
102
103 if (!msg.hasToBeRouted()) {
104 /*
105 * This operation is needed for the correct computation of
106 * lastMsgDelivered, in particular when localhost is the
107 * only member of the cluster. It is needed to avoid that
108 * the sent queue grows too big. The sent queue is flushed
109 * periodically when a IAMALIVE message sent by me is received.
110 */
111 me.getMsgFlow().clusterWindow.set(me.getClusterIndex(), mf.getLastMsgSent() + 1);
112 }
113 log.assertLog(mf.getLastMsgAcked() != UNDEF,
114 "The sender side message flow has not yet been initialized");
115
116 /*
117 * Try to send a message; wait if the window is closed
118 */
119 while (true) {
120 if (mf.isWindowOpen()) {
121 /*
122 * The sender is not blocked
123 */
124 return mf.insertSentMsgFrag(fragIter);
125
126 } else {
127 /*
128 * The sender is blocked
129 */
130 if (log.isDebugEnabled())
131 log.warn("Sender is blocked waiting for window to open");
132 mf.waitOpen();
133 }
134 }
135 }
136
137
138 /**
139 * Handles the receiving of a normal message from the net. There are
140 * three cases: (i) the message is delivered immediately; (ii) the
141 * message is enqueued because is not complete; (iii) the message is
142 * enqueued because it is not in FIFO order.
143 *
144 * @return
145 * False if the message could not be delivered because the message
146 * flow was not yet initialized; otherwise, true is returned.
147 */
148 boolean msgReceive(FragmentHeader header)
149 {
150 if (log.isDebugEnabled())
151 log.debug("--- Enter: msgReceive --------------------------------------------------");
152
153 MssHost sender = header.getSender();
154 MsgFlowRcvrSide mf = sender.getMsgFlow();
155 SWindow swindow = mf.swindow;
156
157 /*
158 * Check if the receive window is closed.
159 */
160 if (swindow.getLastMsgDlvr() == UNDEF) {
161 if ((++flowCnt % 20) == 0) {
162 log.warn("The receiver side message flow has not yet been initialized: " + flowCnt);
163 }
164 return false;
165 }
166
167 log.assertLog(swindow.inWindow(header.fragId),
168 "Flow control overflow: Msg.header=" + header + ", MsgFlow=" + mf);
169 /*
170 * FIXED Hein Nov. 3rd 2005; ignore messages not in the sliding window;
171 * we return true to pretend that it has been delivered.
172 */
173 if (!swindow.inWindow(header.fragId))
174 return true;
175
176 /*
177 * Check if there is a NACK timeout not yet expired for this
178 * message; if so we remove it.
179 */
180 if (mf.hasNACK(header.fragId)) {
181 if (log.isDebugEnabled())
182 log.debug("msgReceive : Check nackTimeout event associated with msg.header=" + header);
183 ScheduledEvent event = mf.getScheduledEvent(header.fragId);
184 if (event != null) {
185 MsgNACK nack = (MsgNACK) event.getData();
186 if (log.isDebugEnabled())
187 log.debug("msgReceive : Remove NACK timeout " + nack);
188 ehandler.abortTimeout(event);
189
190 // Remove the nack from the queue
191 mf.removeScheduledEvent(header.fragId);
192
193 /* With adaptive timeouts, timeouts are delayed */
194 sender.updateTimeout(nack.getRTT());
195 }
196 }
197
198 if (log.isDebugEnabled())
199 log.debug("msgReceive: START CONTROL - mid = " + header.fragId + ", lastMsgDlvr = "
200 + swindow.lastMsgDlvr + ", lastMsgRcvd = " + mf.getLastMsgRcvd());
201
202 if (mf.isFIFO(header.fragId)) {
203
204 /*
205 * The message can be delivered in FIFO order.
206 */
207 if (log.isDebugEnabled())
208 log.debug("msgReceive: FIFO OK " + header.fragId);
209
210 InMessage msgstream = mf.addFragment(header);
211 if (msgstream != null) {
212 if (log.isDebugEnabled())
213 log.debug("msgReceive: Delivering msg (" + header.fragId + ") to mssuser (Daemon)");
214 try {
215 MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
216 InMessage inmsg = msgjg.complete();
217 deliverMsg(msgjg, inmsg, sender);
218
219 } catch (IOException e) {
220 log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
221 } catch (ClassNotFoundException e) {
222 log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
223 }
224 }
225
226 /* Check if we can deliver delayed messages from this sender. */
227 checkReceived(sender);
228
229 } else if (!mf.causeGap(header.fragId) && mf.isNotFIFO(header.fragId)) {
230
231 /*
232 * The message cannot be delivered since it is not in FIFO order,
233 * and it may fill a existing hole. We insert the message in the
234 * queue unless it has already been received.
235 */
236 if (!mf.isDelivered(header.fragId)) {
237 if (log.isDebugEnabled())
238 log.debug("msgReceive: Queuing received msg (" + header.fragId
239 + "); it does not respect FIFO order");
240 mf.insertMsgFrag(header);
241 }
242
243 } else if (mf.causeGap(header.fragId)) {
244
245 /*
246 * The message cannot be delivered and there is a hole in the
247 * sequence not yet notified.
248 */
249 if (log.isDebugEnabled())
250 log.debug("msgReceive: recevied msg causing a gap in the sequence (" + header.fragId + ") from sender " + sender
251 + "; lastMsgRcvd=" + mf.getLastMsgRcvd());
252 mf.insertMsgFrag(header);
253 int oldLastMsgRcvd = mf.setLastMsgRcvd(header.fragId);
254
255 // FIXME No local recovery (???) We don't need to NACK request
256 // all msgs between, use the window to determine the missing
257 // messages. Maybe we can use: mf.isDelivered(i)
258 for (int i = oldLastMsgRcvd + 1; i < header.fragId; i++) {
259 /*
260 * Sending NACK messages for the missing message(s)
261 */
262 if (log.isDebugEnabled())
263 log.debug("msgReceive: Sending NACK to " + sender + " for msg fragment (" + i + ")");
264 sender.sendNACK(i);
265 }
266 }
267 if (log.isDebugEnabled()) {
268 log.debug(swindow.toString());
269 log.debug("--- Exit: msgReceive --------------------------------------------------");
270 }
271 return true;
272 }
273
274
275 ////////////////////////////////////////////////////////////////////////////////////////////
276 // Private message receive handling methods
277 ////////////////////////////////////////////////////////////////////////////////////////////
278
279 /**
280 * Check if the message is for me, and if so deliver to the upper
281 * layer. Messages not for me, are either routed to the correct
282 * destination or discarded.
283 */
284 private void deliverMsg(MsgJG msgjg, InMessage inmsg, MssHost sender)
285 {
286 if (msgjg.hasToBeRouted()) {
287
288 //FIXME routing is currently not supported (at least it has not
289 //been tested, at all)
290
291 /* The message has to be routed to its actual destination */
292 if (log.isDebugEnabled())
293 log.warn("Routing message to destination: " + msgjg);
294 Cluster destCluster = msgjg.getCluster();
295 // MssHost[] receivers = msgjg.getReceivers();
296
297 // /*
298 // * If the message is unicast (only one receiver) and the cluster
299 // * is directly connected, we send it directly to the receiver;
300 // * otherwise we use the route of the destination cluster.
301 // */
302 // if (receivers.length == 1 && destCluster.directlyConnected()) {
303 // if (log.isDebugEnabled())
304 // log.debug("Routing unicast msg directly to member: " + receivers[0]);
305 // receivers[0].unicastRouteSend(msgjg);
306 // } else {
307 if (log.isDebugEnabled())
308 log.debug("Routing multicast msg destined for cluster " + destCluster);
309 destCluster.send(msgjg);
310 // }
311
312 } else if (!msgjg.isForMe()) {
313
314 /* The message is not for me; discard it */
315 if (log.isDebugEnabled())
316 log.debug("Discarding message not addressed to me: " + msgjg);
317
318 } else {
319
320 /* The message is for me; deliver it to the upper layer */
321 if (log.isDebugEnabled())
322 log.debug("Delivering message: " + msgjg);
323 mssuser.remoteReceive(msgjg.getJGTag(), inmsg, sender.getEndPoint());
324
325 }
326 }
327
328
329 /**
330 * Check if there are messages in the FIFO-order queue of
331 * <code>sender</code> that can be delivered.
332 */
333 private void checkReceived(MssHost sender)
334 {
335 MsgFlowRcvrSide msgFlow = sender.getMsgFlow();
336 for (int i = 0, dlvrCount = msgFlow.getDeliveryCount(); i < dlvrCount; i++) {
337 /* Get and remove the first message in the queue */
338 FragmentHeader header = msgFlow.removeMsgFrag();
339 MsgFlowRcvrSide mFlow = header.getSender().getMsgFlow();
340 InMessage msgstream = mFlow.addFragment(header);
341 if (msgstream != null) {
342 if (log.isDebugEnabled())
343 log.debug("checkReceived: Delayed delivery msg (" + header.fragId + ") to mssuser (Daemon)");
344 try {
345 MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
346 InMessage inmsg = msgjg.complete();
347 deliverMsg(msgjg, inmsg, msgjg.getSender());
348 } catch (IOException e) {
349 log.warn("Could not unmarshal message stream\n" + msgstream, e);
350 } catch (ClassNotFoundException e) {
351 log.warn("Could not unmarshal message stream\n" + msgstream, e);
352 }
353 }
354 }
355 }
356
357 } // END MsgCntrl