View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  
23  import jgroup.util.InMessage;
24  
25  import org.apache.log4j.Logger;
26  
27  /**
28   *  The <code>MsgCntrl</code> class
29   *
30   *  @author Salvatore Cammarata
31   *  @author Hein Meling
32   *  @since Jgroup 1.2
33   */
34  final class MsgCntrl
35    implements MssConstants, MssTag
36  {
37  
38    ////////////////////////////////////////////////////////////////////////////////////////////
39    // Logger
40    ////////////////////////////////////////////////////////////////////////////////////////////
41  
42    /** Obtain logger for this class */
43    private static final Logger log = Logger.getLogger(MsgCntrl.class);
44  
45  
46    ////////////////////////////////////////////////////////////////////////////////////////////
47    // Fields
48    ////////////////////////////////////////////////////////////////////////////////////////////
49  
50    /** Event handler */
51    private EventHandler ehandler;
52    
53    /** Upper layer */
54    private MssUser mssuser;
55  
56    /** Distributed system */
57    private MssDS mssds;
58    
59    /** Local host */
60    private MssHost me;
61  
62    /**
63     *  Counts the number of messages received before the message flow is
64     *  initialized.
65     */
66    private int flowCnt = 0;
67  
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Constructors
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    MsgCntrl(EventHandler ehandler, MssUser mssuser, MssDS mssds)
74    {
75      this.ehandler = ehandler;
76      this.mssuser = mssuser;
77      this.mssds = mssds;
78      this.me = HostTable.getLocalHost();
79    }
80  
81  
82    ////////////////////////////////////////////////////////////////////////////////////////////
83    // Message send/receive handling
84    ////////////////////////////////////////////////////////////////////////////////////////////
85  
86    /**
87     *  Compute a fragment identifier for the given fragment iterator (message),
88     *  and store the message in the <i>sent</i> queue.  A message may have many
89     *  fragments, and thus multiple fragment identifiers associated with
90     *  it.  Each of these fragment identifiers point to the same message
91     *  instance.
92     *
93     *  @param fragIter
94     *    The fragment iterator (message) for which to compute a fragment identifier.
95     *  @return
96     *    A fragment identifier associated with the given message.
97     */
98    int msgSend(FragmentIterator fragIter)
99    {
100     Msg msg = fragIter.getMsg();
101     MsgFlowSndrSide mf = msg.getMsgFlow();
102 
103     if (!msg.hasToBeRouted()) {
104       /*
105        *  This operation is needed for the correct computation of
106        *  lastMsgDelivered, in particular when localhost is the
107        *  only member of the cluster. It is needed to avoid that
108        *  the sent queue grows too big. The sent queue is flushed
109        *  periodically when a IAMALIVE message sent by me is received.
110        */
111       me.getMsgFlow().clusterWindow.set(me.getClusterIndex(), mf.getLastMsgSent() + 1);
112     }
113     log.assertLog(mf.getLastMsgAcked() != UNDEF,
114       "The sender side message flow has not yet been initialized");
115 
116     /*
117      *  Try to send a message; wait if the window is closed 
118      */
119     while (true) {
120       if (mf.isWindowOpen()) {
121         /* 
122          *  The sender is not blocked 
123          */
124         return mf.insertSentMsgFrag(fragIter);
125 
126       } else {
127         /* 
128          *  The sender is blocked 
129          */
130         if (log.isDebugEnabled())
131           log.warn("Sender is blocked waiting for window to open");
132         mf.waitOpen();
133       }
134     }
135   }
136 
137 
138   /**
139    *  Handles the receiving of a normal message from the net.  There are
140    *  three cases: (i) the message is delivered immediately; (ii) the
141    *  message is enqueued because is not complete; (iii) the message is
142    *  enqueued because it is not in FIFO order.
143    *
144    *  @return 
145    *    False if the message could not be delivered because the message
146    *    flow was not yet initialized; otherwise, true is returned.
147    */
148   boolean msgReceive(FragmentHeader header)
149   {
150     if (log.isDebugEnabled())
151       log.debug("--- Enter: msgReceive --------------------------------------------------");
152 
153     MssHost sender = header.getSender();
154     MsgFlowRcvrSide mf = sender.getMsgFlow();
155     SWindow swindow = mf.swindow;
156 
157     /*
158      * Check if the receive window is closed.
159      */
160     if (swindow.getLastMsgDlvr() == UNDEF) {
161       if ((++flowCnt % 20) == 0) {
162         log.warn("The receiver side message flow has not yet been initialized: " + flowCnt);
163       }
164       return false;
165     }
166 
167     log.assertLog(swindow.inWindow(header.fragId),
168       "Flow control overflow: Msg.header=" + header + ", MsgFlow=" + mf);
169     /*
170      * FIXED Hein Nov. 3rd 2005; ignore messages not in the sliding window;
171      * we return true to pretend that it has been delivered.
172      */
173     if (!swindow.inWindow(header.fragId))
174       return true;
175 
176     /*
177      * Check if there is a NACK timeout not yet expired for this
178      * message; if so we remove it.
179      */
180     if (mf.hasNACK(header.fragId)) {
181       if (log.isDebugEnabled())
182         log.debug("msgReceive : Check nackTimeout event associated with msg.header=" + header);
183       ScheduledEvent event = mf.getScheduledEvent(header.fragId);
184       if (event != null) {
185         MsgNACK nack = (MsgNACK) event.getData();
186         if (log.isDebugEnabled())
187           log.debug("msgReceive : Remove NACK timeout " + nack);
188         ehandler.abortTimeout(event);
189 
190         // Remove the nack from the queue
191         mf.removeScheduledEvent(header.fragId);
192 
193         /* With adaptive timeouts, timeouts are delayed */
194         sender.updateTimeout(nack.getRTT());
195       }
196     }
197 
198     if (log.isDebugEnabled())
199       log.debug("msgReceive: START CONTROL - mid = " + header.fragId + ", lastMsgDlvr = "
200                 + swindow.lastMsgDlvr + ", lastMsgRcvd = " + mf.getLastMsgRcvd());
201 
202     if (mf.isFIFO(header.fragId)) {
203 
204       /*
205        * The message can be delivered in FIFO order.
206        */
207       if (log.isDebugEnabled())
208         log.debug("msgReceive: FIFO OK " + header.fragId);
209 
210       InMessage msgstream = mf.addFragment(header);
211       if (msgstream != null) {
212         if (log.isDebugEnabled())
213           log.debug("msgReceive: Delivering msg (" + header.fragId + ") to mssuser (Daemon)");
214         try {
215           MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
216           InMessage inmsg = msgjg.complete();
217           deliverMsg(msgjg, inmsg, sender);
218 
219         } catch (IOException e) {
220           log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
221         } catch (ClassNotFoundException e) {
222           log.warn("Could not unmarshal message stream (" + header.fragId + ")\n" + msgstream, e);
223         }
224       }
225 
226       /* Check if we can deliver delayed messages from this sender. */
227       checkReceived(sender);
228 
229     } else if (!mf.causeGap(header.fragId) && mf.isNotFIFO(header.fragId)) {
230 
231       /*
232        * The message cannot be delivered since it is not in FIFO order,
233        * and it may fill a existing hole.  We insert the message in the
234        * queue unless it has already been received.
235        */
236       if (!mf.isDelivered(header.fragId)) {
237         if (log.isDebugEnabled())
238           log.debug("msgReceive: Queuing received msg (" + header.fragId
239                     + "); it does not respect FIFO order");
240         mf.insertMsgFrag(header);
241       }
242 
243     } else if (mf.causeGap(header.fragId)) {
244 
245       /*
246        * The message cannot be delivered and there is a hole in the
247        * sequence not yet notified.
248        */
249       if (log.isDebugEnabled())
250         log.debug("msgReceive: recevied msg causing a gap in the sequence (" + header.fragId + ") from sender " + sender
251                   + "; lastMsgRcvd=" + mf.getLastMsgRcvd());
252       mf.insertMsgFrag(header);
253       int oldLastMsgRcvd = mf.setLastMsgRcvd(header.fragId);
254 
255       // FIXME No local recovery (???)  We don't need to NACK request
256       // all msgs between, use the window to determine the missing
257       // messages.  Maybe we can use: mf.isDelivered(i)
258       for (int i = oldLastMsgRcvd + 1; i < header.fragId; i++) {
259         /*
260          * Sending NACK messages for the missing message(s)
261          */
262         if (log.isDebugEnabled())
263           log.debug("msgReceive: Sending NACK to " + sender + " for msg fragment (" + i + ")");
264         sender.sendNACK(i);
265       }
266     }
267     if (log.isDebugEnabled()) {
268       log.debug(swindow.toString());
269       log.debug("--- Exit: msgReceive --------------------------------------------------");
270     }
271     return true;
272   }
273 
274 
275   ////////////////////////////////////////////////////////////////////////////////////////////
276   // Private message receive handling methods
277   ////////////////////////////////////////////////////////////////////////////////////////////
278 
279   /**
280    *  Check if the message is for me, and if so deliver to the upper
281    *  layer.  Messages not for me, are either routed to the correct
282    *  destination or discarded.
283    */
284   private void deliverMsg(MsgJG msgjg, InMessage inmsg, MssHost sender)
285   {
286     if (msgjg.hasToBeRouted()) {
287 
288       //FIXME routing is currently not supported (at least it has not
289       //been tested, at all)
290 
291       /* The message has to be routed to its actual destination */
292       if (log.isDebugEnabled())
293         log.warn("Routing message to destination: " + msgjg);
294       Cluster destCluster = msgjg.getCluster();
295 //       MssHost[] receivers = msgjg.getReceivers();
296 
297 //       /*
298 //        * If the message is unicast (only one receiver) and the cluster
299 //        * is directly connected, we send it directly to the receiver;
300 //        * otherwise we use the route of the destination cluster.
301 //        */
302 //       if (receivers.length == 1 && destCluster.directlyConnected()) {
303 //         if (log.isDebugEnabled())
304 //           log.debug("Routing unicast msg directly to member: " + receivers[0]);
305 //         receivers[0].unicastRouteSend(msgjg);
306 //       } else {
307         if (log.isDebugEnabled())
308           log.debug("Routing multicast msg destined for cluster " + destCluster);
309         destCluster.send(msgjg);
310 //       }
311 
312     } else if (!msgjg.isForMe()) {
313 
314       /* The message is not for me; discard it */
315       if (log.isDebugEnabled())
316         log.debug("Discarding message not addressed to me: " + msgjg);
317 
318     } else {
319 
320       /* The message is for me; deliver it to the upper layer */
321       if (log.isDebugEnabled())
322         log.debug("Delivering message: " + msgjg);
323       mssuser.remoteReceive(msgjg.getJGTag(), inmsg, sender.getEndPoint());
324 
325     }
326   }
327 
328 
329   /**
330    *  Check if there are messages in the FIFO-order queue of
331    *  <code>sender</code> that can be delivered.
332    */
333   private void checkReceived(MssHost sender) 
334   {
335     MsgFlowRcvrSide msgFlow = sender.getMsgFlow();
336     for (int i = 0, dlvrCount = msgFlow.getDeliveryCount(); i < dlvrCount; i++) {
337       /* Get and remove the first message in the queue */
338       FragmentHeader header = msgFlow.removeMsgFrag();
339       MsgFlowRcvrSide mFlow = header.getSender().getMsgFlow();
340       InMessage msgstream = mFlow.addFragment(header);
341       if (msgstream != null) {
342         if (log.isDebugEnabled())
343           log.debug("checkReceived: Delayed delivery msg (" + header.fragId + ") to mssuser (Daemon)");
344         try {
345           MsgJG msgjg = MsgJG.unmarshal(msgstream, header, mssds);
346           InMessage inmsg = msgjg.complete();
347           deliverMsg(msgjg, inmsg, msgjg.getSender());
348         } catch (IOException e) {
349           log.warn("Could not unmarshal message stream\n" + msgstream, e);
350         } catch (ClassNotFoundException e) {
351           log.warn("Could not unmarshal message stream\n" + msgstream, e);
352         }
353       }
354     }
355   }
356 
357 } // END MsgCntrl