1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22
23 import jgroup.util.InMessage;
24
25 import org.apache.log4j.Logger;
26
27
28
29
30
31
32
33
34 final class MsgCntrl
35 implements MssConstants, MssTag
36 {
37
38
39
40
41
42
43 private static final Logger log = Logger.getLogger(MsgCntrl.class);
44
45
46
47
48
49
50
51 private EventHandler ehandler;
52
53
54 private MssUser mssuser;
55
56
57 private MssDS mssds;
58
59
60 private MssHost me;
61
62
63
64
65
66 private int flowCnt = 0;
67
68
69
70
71
72
73 MsgCntrl(EventHandler ehandler, MssUser mssuser, MssDS mssds)
74 {
75 this.ehandler = ehandler;
76 this.mssuser = mssuser;
77 this.mssds = mssds;
78 this.me = HostTable.getLocalHost();
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 int msgSend(FragmentIterator fragIter)
99 {
100 Msg msg = fragIter.getMsg();
101 MsgFlowSndrSide mf = msg.getMsgFlow();
102
103 if (!msg.hasToBeRouted()) {
104
105
106
107
108
109
110
111 me.getMsgFlow().clusterWindow.set(me.getClusterIndex(), mf.getLastMsgSent() + 1);
112 }
113 log.assertLog(mf.getLastMsgAcked() != UNDEF,
114 "The sender side message flow has not yet been initialized");
115
116
117
118
119 while (true) {
120 if (mf.isWindowOpen()) {
121
122
123
124 return mf.insertSentMsgFrag(fragIter);
125
126 } else {
127
128
129
130 if (log.isDebugEnabled())
131 log.warn("Sender is blocked waiting for window to open");
132 mf.waitOpen();
133 }
134 }
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148 boolean msgReceive(FragmentHeader header)
149 {
150 if (log.isDebugEnabled())
151 log.debug("--- Enter: msgReceive --------------------------------------------------");
152
153 MssHost sender = header.getSender();
154 MsgFlowRcvrSide mf = sender.getMsgFlow();
155 SWindow swindow = mf.swindow;
156
157
158
159
160 if (swindow.getLastMsgDlvr() == UNDEF) {
161 if ((++flowCnt % 20) == 0) {
162 log.warn("The receiver side message flow has not yet been initialized: " + flowCnt);
163 }
164 return false;
165 }
166
167 log.assertLog(swindow.inWindow(header.fragId),
168 "Flow control overflow: Msg.header=" + header + ", MsgFlow=" + mf);
169
170
171
172
173 if (!swindow.inWindow(header.fragId))
174 return true;
175
176
177
178
179
180 if (mf.hasNACK(header.fragId)) {
181 if (log.isDebugEnabled())
182 log.debug("msgReceive : Check nackTimeout event associated with msg.header=" + header);
183 ScheduledEvent event = mf.getScheduledEvent(header.fragId);
184 if (event != null) {
185 MsgNACK nack = (MsgNACK) event.getData();
186 if (log.isDebugEnabled())
187 log.debug("msgReceive : Remove NACK timeout " + nack);
188 ehandler.abortTimeout(event);
189
190
191 mf.removeScheduledEvent(header.fragId);
192
193
194 sender.updateTimeout(nack.getRTT());
195 }
196 }
197
198 if (log.isDebugEnabled())
199 log.debug("msgReceive: START CONTROL - mid = " + header.fragId + ", lastMsgDlvr = "
200 + swindow.lastMsgDlvr + ", lastMsgRcvd = " + mf.getLastMsgRcvd());
201
202 if (mf.isFIFO(header.fragId)) {
203
204
205
206
207 if (log.isDebugEnabled())
208 log.debug("msgReceive: FIFO OK " + header.fragId);
209
210 InMessage msgstream = mf.addFragment(header);
211 if (msgstream != null) {
212 if (log.isDebugEnabled())
213 log.debug("msgReceive: Delivering msg (" + header.fragId + ") to mssuser (Daemon)");
214 try {
215 MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
216 InMessage inmsg = msgjg.complete();
217 deliverMsg(msgjg, inmsg, sender);
218
219 } catch (IOException e) {
220 log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
221 } catch (ClassNotFoundException e) {
222 log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
223 }
224 }
225
226
227 checkReceived(sender);
228
229 } else if (!mf.causeGap(header.fragId) && mf.isNotFIFO(header.fragId)) {
230
231
232
233
234
235
236 if (!mf.isDelivered(header.fragId)) {
237 if (log.isDebugEnabled())
238 log.debug("msgReceive: Queuing received msg (" + header.fragId
239 + "); it does not respect FIFO order");
240 mf.insertMsgFrag(header);
241 }
242
243 } else if (mf.causeGap(header.fragId)) {
244
245
246
247
248
249 if (log.isDebugEnabled())
250 log.debug("msgReceive: recevied msg causing a gap in the sequence (" + header.fragId + ") from sender " + sender
251 + "; lastMsgRcvd=" + mf.getLastMsgRcvd());
252 mf.insertMsgFrag(header);
253 int oldLastMsgRcvd = mf.setLastMsgRcvd(header.fragId);
254
255
256
257
258 for (int i = oldLastMsgRcvd + 1; i < header.fragId; i++) {
259
260
261
262 if (log.isDebugEnabled())
263 log.debug("msgReceive: Sending NACK to " + sender + " for msg fragment (" + i + ")");
264 sender.sendNACK(i);
265 }
266 }
267 if (log.isDebugEnabled()) {
268 log.debug(swindow.toString());
269 log.debug("--- Exit: msgReceive --------------------------------------------------");
270 }
271 return true;
272 }
273
274
275
276
277
278
279
280
281
282
283
284 private void deliverMsg(MsgJG msgjg, InMessage inmsg, MssHost sender)
285 {
286 if (msgjg.hasToBeRouted()) {
287
288
289
290
291
292 if (log.isDebugEnabled())
293 log.warn("Routing message to destination: " + msgjg);
294 Cluster destCluster = msgjg.getCluster();
295
296
297
298
299
300
301
302
303
304
305
306
307 if (log.isDebugEnabled())
308 log.debug("Routing multicast msg destined for cluster " + destCluster);
309 destCluster.send(msgjg);
310
311
312 } else if (!msgjg.isForMe()) {
313
314
315 if (log.isDebugEnabled())
316 log.debug("Discarding message not addressed to me: " + msgjg);
317
318 } else {
319
320
321 if (log.isDebugEnabled())
322 log.debug("Delivering message: " + msgjg);
323 mssuser.remoteReceive(msgjg.getJGTag(), inmsg, sender.getEndPoint());
324
325 }
326 }
327
328
329
330
331
332
333 private void checkReceived(MssHost sender)
334 {
335 MsgFlowRcvrSide msgFlow = sender.getMsgFlow();
336 for (int i = 0, dlvrCount = msgFlow.getDeliveryCount(); i < dlvrCount; i++) {
337
338 FragmentHeader header = msgFlow.removeMsgFrag();
339 MsgFlowRcvrSide mFlow = header.getSender().getMsgFlow();
340 InMessage msgstream = mFlow.addFragment(header);
341 if (msgstream != null) {
342 if (log.isDebugEnabled())
343 log.debug("checkReceived: Delayed delivery msg (" + header.fragId + ") to mssuser (Daemon)");
344 try {
345 MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
346 InMessage inmsg = msgjg.complete();
347 deliverMsg(msgjg, inmsg, msgjg.getSender());
348 } catch (IOException e) {
349 log.warn("Could not unmarshal message stream\n" + msgstream, e);
350 } catch (ClassNotFoundException e) {
351 log.warn("Could not unmarshal message stream\n" + msgstream, e);
352 }
353 }
354 }
355 }
356
357 }