View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.mss;
20  
21  import java.io.IOException;
22  import java.util.Timer;
23  import java.util.TimerTask;
24  
25  import jgroup.core.ConfigurationException;
26  import jgroup.core.EndPoint;
27  import jgroup.relacs.config.TransportConfig;
28  
29  import org.apache.log4j.Logger;
30  
31  /**
32   *  The <code>MssHost</code> class
33   *
34   *  @author Salvatore Cammarrata
35   *  @author Hein Meling
36   *  @since Jgroup 0.5
37   */
38  public class MssHost
39    implements MssConstants, MssTag
40  {
41  
42    ////////////////////////////////////////////////////////////////////////////////////////////
43    // Logger
44    ////////////////////////////////////////////////////////////////////////////////////////////
45  
46    /** Obtain logger for this class */
47    private static final Logger log = Logger.getLogger(MssHost.class);
48  
49  
50    ////////////////////////////////////////////////////////////////////////////////////////////
51    // Fields
52    ////////////////////////////////////////////////////////////////////////////////////////////
53  
54    /** Configuration parameters */
55    private TransportConfig config;
56  
57    /** Datalink layer */
58    private NI ni;
59  
60    /** Event scheduler */
61    private EventHandler ehandler;
62  
63    /** The endpoint used to identify a host (and port) */
64    private EndPoint endpoint;
65  
66    /** Incarnation number */
67    private int incarnationId;
68  
69    /** Index in the HostTable */
70    private int hostIndex;
71  
72    /** Index in the Cluster */
73    private int clusterIndex; 
74  
75    /** The local cluster */
76    private Cluster cluster;
77  
78    /** Roundtrip timeout */
79    private int rtt;
80  
81    /** Standard deviation */
82    private int dev;
83  
84    /** Next timeout */
85    private int timeout;
86  
87    /**
88     *  Set to max TTL when the member is considered reachable.  This
89     *  value is decremented for every TOPOLOGY timeout.  If it reaches
90     *  zero the member is considered unreachable.
91     */
92    private int alive;  
93  
94    /**
95     *  Last reachability information.  Used to determine if the
96     *  reachability perception is changed.
97     */
98    private int wasAlive;
99  
100   /** Flow Control information */
101   private MsgFlowRcvrSide msgFlow;
102 
103   /** The most recent synchronization message */
104   private MsgSYN syn = null;
105 
106   /** The timer used to resend the QSYN message (if no ASYN is received) */
107   private Timer synTimer = null;
108 
109 
110   ////////////////////////////////////////////////////////////////////////////////////////////
111   // Constructors
112   ////////////////////////////////////////////////////////////////////////////////////////////
113 
114   /**
115    *  Builds a host object identified by <code>hostEndPoint</code>.
116    */
117   public MssHost(TransportConfig config, EndPoint hostEndPoint, NI ni, Cluster cluster, EventHandler ehandler)
118     throws ConfigurationException
119   {
120     this.ni = ni;
121     this.config = config;
122     this.endpoint = hostEndPoint;
123     this.ehandler = ehandler;
124 
125     if (log.isDebugEnabled())
126       log.debug("MssHost:<init>: " + endpoint + ", " + cluster);
127 
128     if (endpoint.isLocal()) {
129       alive = wasAlive = config.getMaxTTL();
130       incarnationId = (int) (System.currentTimeMillis()/1000);
131     } else {
132       alive = wasAlive = UNREACHABLE;
133       incarnationId = UNDEF;
134     }
135 
136     /*
137      *  Timeout initialization
138      */
139     updateTimeout(100); //FIXME constant value?
140 
141     /*
142      *  Flow control initialization
143      */
144     msgFlow = new MsgFlowRcvrSide(config);
145 
146     /*
147      *  Cluster initialization
148      */
149     this.cluster = cluster;
150   }
151 
152 
153   ////////////////////////////////////////////////////////////////////////////////////////////
154   // Methods
155   ////////////////////////////////////////////////////////////////////////////////////////////
156 
157   /**
158    *
159    */
160   public EndPoint getEndPoint()
161   {
162     return endpoint;
163   }
164 
165   /**
166    *
167    */
168   public boolean isLocal()
169   {
170     return endpoint.isLocal();
171   }
172 
173   /**
174    *
175    */
176   public int getIncarnationId()
177   {
178     return incarnationId;   
179   }
180 
181   /**
182    *
183    */
184   public void setIncarnationId(int incarnationId)
185   {
186     this.incarnationId = incarnationId;   
187   }
188 
189 
190   /**
191    *  Returns true if this host is in the given cluster; otherwise false
192    *  is returned.
193    */
194   public boolean isIn(Cluster theCluster)
195   {
196     return theCluster.equals(cluster);
197   }
198 
199 
200   /**
201    *
202    */
203   public Cluster getCluster() 
204   {
205     return cluster;
206   }
207 
208 
209   /**
210    *  Get this host's position (index) in the cluster.
211    */
212   public int getClusterIndex()
213   {
214     return clusterIndex;
215   }
216 
217 
218   /**
219    *  Set this host's position (index) in the cluster.
220    */
221   public void setClusterIndex(int index)
222   {
223     clusterIndex = index;
224   }
225 
226 
227   /**
228    *  Set this host's position (index) in the hosttable.
229    */
230   public void setIndex(int index)
231   {
232     hostIndex = index;
233   }
234 
235 
236   /**
237    *  Get this host's position (index) in the hosttable.
238    */
239   public int getIndex()
240   {
241     return hostIndex;
242   }
243 
244 
245   /**
246    *  Returns the timeout value associated with this host.
247    */
248   public int getTimeout()
249   {
250     return timeout;
251   }
252 
253 
254   /**
255    *  Recompute the round-trip timeout, standard deviation and timeout
256    *  values associated with this host.
257    *
258    *  @param t  an <code>int</code> value
259    */
260   public void updateTimeout(int t)
261   {
262     int alfan = config.getAlfan();
263     int alfad = config.getAlfad();
264     rtt = (rtt * alfan + t * (alfad - alfan)) / alfad;
265     dev = (dev * alfan + (rtt > t ? rtt - t : t - rtt) * (alfad - alfan)) / alfad;
266     timeout = rtt + dev * 4;
267     if (timeout < 1)
268       timeout = 1;
269     log.assertLog(timeout > 3, "Timeout is too small: " + timeout + ", t=" + t
270         + ", rtt=" + rtt + ", dev=" + dev + ", alphaN=" + alfan + ", alphaD=" + alfad);
271     if (log.isDebugEnabled())
272       log.debug("rtt: " + rtt + ", dev: " + dev + ", timeout: " + timeout);
273   }
274 
275 
276   /**
277    *  Aborts the timeout that has been set up for performing periodic
278    *  synchronization ping.
279    */
280   public void abortTimeout()
281   {
282     if (synTimer != null) {
283       synTimer.cancel();
284       syn = null;
285       synTimer = null;
286       if (log.isDebugEnabled())
287         log.debug("Aborted SYN-timer for " + endpoint);
288       if (log.isDebugEnabled())
289         printReachParams();
290     }
291   }
292 
293 
294   /**
295    *  Schedule a timer for the synchronization (QSYN) message sent
296    *  to this member.  The initial QSYN message is sent after no delay.
297    */
298   public void scheduleTimeout()
299   {
300     try {
301       syn = MsgSYN.marshal(QSYN, this);
302     } catch (IOException e) {
303       log.error("Unable to marshal QSYN message for: " + this, e);
304       return;
305     }
306     TimerTask synSender = new TimerTask() {
307       public void run() {
308         if (syn != null) {
309           unicastRouteSend(syn);
310           if (log.isDebugEnabled())
311             log.debug("Sent " + syn + " for " + endpoint);
312         }
313       }
314     };
315     synTimer = new Timer("SYN-timer" + endpoint, true);
316     synTimer.schedule(synSender, 0, timeout);
317     if (log.isDebugEnabled())
318       log.debug("Scheduled SYN-timer for " + endpoint);
319   }
320 
321 
322   /**
323    *  Checks if the given message identifier corresponds to the last SYN
324    *  message sent from this host.  True is returned if this is the
325    *  case; false otherwise.
326    *
327    *  @param msgId
328    *    The message identifier to check if was the last SYN message sent.
329    *  @return 
330    *    True is returned if the last SYN message sent by this <code> MssHost 
331    *    </code> is equal to the given message identifier; otherwise false
332    *    is returned.
333    */
334   public boolean checkSynId(int msgId)
335   {
336     if (syn != null) {
337       int nonce = syn.getMid();
338       if (log.isDebugEnabled())
339         log.debug(endpoint + " received ASYN with " + msgId + "=?" + nonce);
340       return nonce == msgId;
341     }
342     return false;
343   }
344 
345 
346   /**
347    *  Returns true if this host is considered reachable within the given
348    *  <code>threshold</code>.
349    */
350   public boolean isReachable(int threshold)
351   {
352     return (alive >= threshold);
353   }
354 
355 
356   /**
357    *  Returns true if this host is considered reachable.  That is with a
358    *  reachability <code>threshold</code> equal to
359    *  <code>MINIMUM_ALIVE_VALUE</code>.
360    */
361   public boolean isReachable()
362   {
363     return isReachable(MINIMUM_ALIVE_VALUE);
364   }
365 
366 
367   /**
368    *  Returns true if this host was reachable on prior to the last
369    *  reachability update.
370    */
371   public boolean wasReachable()
372   {
373     return (wasAlive != UNREACHABLE);
374   }
375 
376 
377   /**
378    *  Returns the current reachability value.
379    */
380   int getReachability()
381   {
382     return alive;
383   }
384 
385 
386   /**
387    *  Set the alive counter to the specified value and return the
388    *  previous reachability status.
389    *
390    *  @param newAlive
391    *    The new reachability value to be set.
392    *  @return 
393    *    True is returned if this host was reachable; false otherwise.
394    */
395   boolean pingOK(int newAlive)
396   {
397     wasAlive = alive;
398     alive = newAlive;
399     if (log.isDebugEnabled()) 
400       log.debug("MssHost:pingOK:" + endpoint + ": alive=" + alive + ", wasAlive=" + wasAlive);
401     return (wasAlive != UNREACHABLE);
402   }
403 
404 
405   /**
406    *  Set the alive counter to the maximum TTL value and return the
407    *  previous status.
408    *
409    *  @return 
410    *    True is returned if this host was reachable; false otherwise.
411    */
412   boolean pingOK()
413   {
414     return pingOK(config.getMaxTTL());
415   }
416     
417 
418   /**
419    *  Decrements the alive counter and returns true if the member is
420    *  still reachable; false is returned otherwise.
421    */
422   boolean pingKO()
423   {
424     wasAlive = alive;
425     alive = (alive == UNREACHABLE ? alive : alive - 1);
426     if (log.isDebugEnabled()) 
427       log.debug("MssHost:pingKO:" + endpoint + ": alive=" + alive + ", wasAlive=" + wasAlive);
428     return (alive != UNREACHABLE);
429   }
430 
431 
432   /**
433    *  Mark this host as unreachable.
434    */
435   void setAsUnreachable()
436   {
437     wasAlive = alive;
438     alive = UNREACHABLE;
439   }
440 
441 
442   /**
443    *  Mark this host as reachable.
444    */
445   public void setAsReachable()
446   {
447     wasAlive = alive;
448     alive = config.getMaxTTL();
449   }
450 
451 
452   /**
453    * Flush the sent queue up until the last message ACKed.
454    */
455   public void flush()
456   {
457     /* Clear the queues */
458     cluster.getMsgFlow().flush(msgFlow.clusterWindow.getLastMsgDelivered());
459   }
460 
461 
462   /**
463    * Flush the sent queue up until the last message ACKed.
464    *
465    * @param lastMsgAcked The last message ACKed.
466    */
467   public void flush(int lastMsgAcked)
468   {
469     /* Clear the queues */
470     cluster.getMsgFlow().flush(lastMsgAcked);
471   }
472 
473 
474   /**
475    *
476    */
477   public MsgFlowRcvrSide getMsgFlow()
478   {
479     return msgFlow;   
480   }
481 
482 
483   /**
484    *
485    */
486   void resetMsgFlow() 
487   {
488     msgFlow.reset();
489   }
490 
491 
492   /**
493    *
494    */
495   void resetMsgFlow(int mid)
496   {
497     msgFlow.reset(mid);
498   }
499 
500 
501   ////////////////////////////////////////////////////////////////////////////////////////////
502   // Methods for debugging
503   ////////////////////////////////////////////////////////////////////////////////////////////
504 
505   int monitorAlive = 0;
506   long[] measure = new long[10];
507   int position = 0;
508 
509   /**
510    * Method used to print the reachability and timeout parameters associated
511    * with this host, if they have changed since its last invocation.
512    */
513   public void printReachParams()
514   {
515     if (monitorAlive != alive) {
516       monitorAlive = alive;
517       if (alive == config.getMaxTTL()) {
518         // reset measurements
519         position = 0;
520       } else {
521         // read clock and compute diff to the first measurement
522         long now = System.currentTimeMillis();
523         long diff = now - measure[0];
524         if (log.isDebugEnabled()) {
525           log.debug(endpoint.toString() + ": alive=" + alive + ", diff=" + diff);
526         }
527         measure[position++] = now;
528       }
529     }
530   }
531 
532 
533   ////////////////////////////////////////////////////////////////////////////////////////////
534   // Methods for sending to this host
535   ////////////////////////////////////////////////////////////////////////////////////////////
536 
537   void sendNACK(int mid)
538   {
539     assert !isLocal() : "Trying to send REMOTENACK to myself";
540     MsgNACK nack = null;
541     try {
542       nack = MsgNACK.marshal(REMOTENACK, this, mid);
543     } catch (IOException e) {
544       log.warn("Unable to marshal REMOTENACK for host: " + this, e);
545       return;
546     }
547     ScheduledEvent event = new ScheduledEvent(REMOTENACK, nack);
548 
549     if (NACKSUPPRESSION) {
550       long random_timeout = RandomGenerator.getRandomTimeout(getTimeout());
551       ehandler.setTimeout(random_timeout, event);
552       msgFlow.putScheduledEvent(nack.mid, event);
553     } else {
554       ehandler.setTimeout(getTimeout(), event);
555       msgFlow.putScheduledEvent(nack.mid, event);
556       // Comment the following line to delay the ack sending
557 //      unicastRouteSend(nack);
558     }
559   }
560 
561 
562   void simpleSend(byte[] buf, int buflen)
563   {
564     assert !isLocal() : "Trying to send to myself";
565     if (log.isDebugEnabled())
566       log.debug("MssHost:simpleSend: send to " + endpoint);
567     ni.send(endpoint, buf, buflen);
568   }
569 
570 
571   /**
572    *
573    */
574   void unicastRouteSend(Msg msg)
575   {
576     assert !isLocal() : "Trying to send to myself: " + msg;
577     if (log.isDebugEnabled())
578       log.debug("MssHost:unicastRouteSend: send to " + endpoint);
579 
580     if (cluster.directlyConnected()) {
581 
582       MsgCntrl msgCntrl = cluster.getControl();
583       for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
584         /* Generate and send a fragment */
585         byte[] buffer = frag.next(false);
586         int bufLen = frag.fragmentLength();
587         if (log.isDebugEnabled())
588           log.debug("send: Sending msg fragment (" + frag.getFid() + ") to host " + endpoint);
589         ni.send(endpoint, buffer, bufLen);
590       }
591 
592     } else {
593       log.warn("Cluster appears not to be directly connected (sending through cluster instead): " + cluster);
594       cluster.send(msg);
595 
596     }
597   }
598 
599 
600   ////////////////////////////////////////////////////////////////////////////////////////////
601   // Methods from Object
602   ////////////////////////////////////////////////////////////////////////////////////////////
603 
604   /**
605    *  Returns a hash code value for this host.
606    */
607   public int hashCode()
608   {
609     return endpoint.hashCode();
610   }
611 
612 
613   /**
614    *  Returns a string representation of this object.
615    */
616   public String toString()
617   {
618     return endpoint.toString();
619   }
620 
621 
622   /**
623    *  Compares two objects for content equality.
624    *
625    *  @param obj the object to compare with
626    *  @return  true if these objects are equal; false otherwise.
627    */
628   public boolean equals(Object obj)
629   {
630     if (this == obj)
631       return true;
632     else
633       if (obj instanceof MssHost)
634         return endpoint.equals(((MssHost)obj).endpoint);
635       else
636         return false;
637   }
638 
639 } // END MssHost