1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.util.Timer;
23 import java.util.TimerTask;
24
25 import jgroup.core.ConfigurationException;
26 import jgroup.core.EndPoint;
27 import jgroup.relacs.config.TransportConfig;
28
29 import org.apache.log4j.Logger;
30
31
32
33
34
35
36
37
38 public class MssHost
39 implements MssConstants, MssTag
40 {
41
42
43
44
45
46
47 private static final Logger log = Logger.getLogger(MssHost.class);
48
49
50
51
52
53
54
55 private TransportConfig config;
56
57
58 private NI ni;
59
60
61 private EventHandler ehandler;
62
63
64 private EndPoint endpoint;
65
66
67 private int incarnationId;
68
69
70 private int hostIndex;
71
72
73 private int clusterIndex;
74
75
76 private Cluster cluster;
77
78
79 private int rtt;
80
81
82 private int dev;
83
84
85 private int timeout;
86
87
88
89
90
91
92 private int alive;
93
94
95
96
97
98 private int wasAlive;
99
100
101 private MsgFlowRcvrSide msgFlow;
102
103
104 private MsgSYN syn = null;
105
106
107 private Timer synTimer = null;
108
109
110
111
112
113
114
115
116
117 public MssHost(TransportConfig config, EndPoint hostEndPoint, NI ni, Cluster cluster, EventHandler ehandler)
118 throws ConfigurationException
119 {
120 this.ni = ni;
121 this.config = config;
122 this.endpoint = hostEndPoint;
123 this.ehandler = ehandler;
124
125 if (log.isDebugEnabled())
126 log.debug("MssHost:<init>: " + endpoint + ", " + cluster);
127
128 if (endpoint.isLocal()) {
129 alive = wasAlive = config.getMaxTTL();
130 incarnationId = (int) (System.currentTimeMillis()/1000);
131 } else {
132 alive = wasAlive = UNREACHABLE;
133 incarnationId = UNDEF;
134 }
135
136
137
138
139 updateTimeout(100);
140
141
142
143
144 msgFlow = new MsgFlowRcvrSide(config);
145
146
147
148
149 this.cluster = cluster;
150 }
151
152
153
154
155
156
157
158
159
160 public EndPoint getEndPoint()
161 {
162 return endpoint;
163 }
164
165
166
167
168 public boolean isLocal()
169 {
170 return endpoint.isLocal();
171 }
172
173
174
175
176 public int getIncarnationId()
177 {
178 return incarnationId;
179 }
180
181
182
183
184 public void setIncarnationId(int incarnationId)
185 {
186 this.incarnationId = incarnationId;
187 }
188
189
190
191
192
193
194 public boolean isIn(Cluster theCluster)
195 {
196 return theCluster.equals(cluster);
197 }
198
199
200
201
202
203 public Cluster getCluster()
204 {
205 return cluster;
206 }
207
208
209
210
211
212 public int getClusterIndex()
213 {
214 return clusterIndex;
215 }
216
217
218
219
220
221 public void setClusterIndex(int index)
222 {
223 clusterIndex = index;
224 }
225
226
227
228
229
230 public void setIndex(int index)
231 {
232 hostIndex = index;
233 }
234
235
236
237
238
239 public int getIndex()
240 {
241 return hostIndex;
242 }
243
244
245
246
247
248 public int getTimeout()
249 {
250 return timeout;
251 }
252
253
254
255
256
257
258
259
260 public void updateTimeout(int t)
261 {
262 int alfan = config.getAlfan();
263 int alfad = config.getAlfad();
264 rtt = (rtt * alfan + t * (alfad - alfan)) / alfad;
265 dev = (dev * alfan + (rtt > t ? rtt - t : t - rtt) * (alfad - alfan)) / alfad;
266 timeout = rtt + dev * 4;
267 if (timeout < 1)
268 timeout = 1;
269 log.assertLog(timeout > 3, "Timeout is too small: " + timeout + ", t=" + t
270 + ", rtt=" + rtt + ", dev=" + dev + ", alphaN=" + alfan + ", alphaD=" + alfad);
271 if (log.isDebugEnabled())
272 log.debug("rtt: " + rtt + ", dev: " + dev + ", timeout: " + timeout);
273 }
274
275
276
277
278
279
280 public void abortTimeout()
281 {
282 if (synTimer != null) {
283 synTimer.cancel();
284 syn = null;
285 synTimer = null;
286 if (log.isDebugEnabled())
287 log.debug("Aborted SYN-timer for " + endpoint);
288 if (log.isDebugEnabled())
289 printReachParams();
290 }
291 }
292
293
294
295
296
297
298 public void scheduleTimeout()
299 {
300 try {
301 syn = MsgSYN.marshal(QSYN, this);
302 } catch (IOException e) {
303 log.error("Unable to marshal QSYN message for: " + this, e);
304 return;
305 }
306 TimerTask synSender = new TimerTask() {
307 public void run() {
308 if (syn != null) {
309 unicastRouteSend(syn);
310 if (log.isDebugEnabled())
311 log.debug("Sent " + syn + " for " + endpoint);
312 }
313 }
314 };
315 synTimer = new Timer("SYN-timer" + endpoint, true);
316 synTimer.schedule(synSender, 0, timeout);
317 if (log.isDebugEnabled())
318 log.debug("Scheduled SYN-timer for " + endpoint);
319 }
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334 public boolean checkSynId(int msgId)
335 {
336 if (syn != null) {
337 int nonce = syn.getMid();
338 if (log.isDebugEnabled())
339 log.debug(endpoint + " received ASYN with " + msgId + "=?" + nonce);
340 return nonce == msgId;
341 }
342 return false;
343 }
344
345
346
347
348
349
350 public boolean isReachable(int threshold)
351 {
352 return (alive >= threshold);
353 }
354
355
356
357
358
359
360
361 public boolean isReachable()
362 {
363 return isReachable(MINIMUM_ALIVE_VALUE);
364 }
365
366
367
368
369
370
371 public boolean wasReachable()
372 {
373 return (wasAlive != UNREACHABLE);
374 }
375
376
377
378
379
380 int getReachability()
381 {
382 return alive;
383 }
384
385
386
387
388
389
390
391
392
393
394
395 boolean pingOK(int newAlive)
396 {
397 wasAlive = alive;
398 alive = newAlive;
399 if (log.isDebugEnabled())
400 log.debug("MssHost:pingOK:" + endpoint + ": alive=" + alive + ", wasAlive=" + wasAlive);
401 return (wasAlive != UNREACHABLE);
402 }
403
404
405
406
407
408
409
410
411
412 boolean pingOK()
413 {
414 return pingOK(config.getMaxTTL());
415 }
416
417
418
419
420
421
422 boolean pingKO()
423 {
424 wasAlive = alive;
425 alive = (alive == UNREACHABLE ? alive : alive - 1);
426 if (log.isDebugEnabled())
427 log.debug("MssHost:pingKO:" + endpoint + ": alive=" + alive + ", wasAlive=" + wasAlive);
428 return (alive != UNREACHABLE);
429 }
430
431
432
433
434
435 void setAsUnreachable()
436 {
437 wasAlive = alive;
438 alive = UNREACHABLE;
439 }
440
441
442
443
444
445 public void setAsReachable()
446 {
447 wasAlive = alive;
448 alive = config.getMaxTTL();
449 }
450
451
452
453
454
455 public void flush()
456 {
457
458 cluster.getMsgFlow().flush(msgFlow.clusterWindow.getLastMsgDelivered());
459 }
460
461
462
463
464
465
466
467 public void flush(int lastMsgAcked)
468 {
469
470 cluster.getMsgFlow().flush(lastMsgAcked);
471 }
472
473
474
475
476
477 public MsgFlowRcvrSide getMsgFlow()
478 {
479 return msgFlow;
480 }
481
482
483
484
485
486 void resetMsgFlow()
487 {
488 msgFlow.reset();
489 }
490
491
492
493
494
495 void resetMsgFlow(int mid)
496 {
497 msgFlow.reset(mid);
498 }
499
500
501
502
503
504
505 int monitorAlive = 0;
506 long[] measure = new long[10];
507 int position = 0;
508
509
510
511
512
513 public void printReachParams()
514 {
515 if (monitorAlive != alive) {
516 monitorAlive = alive;
517 if (alive == config.getMaxTTL()) {
518
519 position = 0;
520 } else {
521
522 long now = System.currentTimeMillis();
523 long diff = now - measure[0];
524 if (log.isDebugEnabled()) {
525 log.debug(endpoint.toString() + ": alive=" + alive + ", diff=" + diff);
526 }
527 measure[position++] = now;
528 }
529 }
530 }
531
532
533
534
535
536
537 void sendNACK(int mid)
538 {
539 assert !isLocal() : "Trying to send REMOTENACK to myself";
540 MsgNACK nack = null;
541 try {
542 nack = MsgNACK.marshal(REMOTENACK, this, mid);
543 } catch (IOException e) {
544 log.warn("Unable to marshal REMOTENACK for host: " + this, e);
545 return;
546 }
547 ScheduledEvent event = new ScheduledEvent(REMOTENACK, nack);
548
549 if (NACKSUPPRESSION) {
550 long random_timeout = RandomGenerator.getRandomTimeout(getTimeout());
551 ehandler.setTimeout(random_timeout, event);
552 msgFlow.putScheduledEvent(nack.mid, event);
553 } else {
554 ehandler.setTimeout(getTimeout(), event);
555 msgFlow.putScheduledEvent(nack.mid, event);
556
557
558 }
559 }
560
561
562 void simpleSend(byte[] buf, int buflen)
563 {
564 assert !isLocal() : "Trying to send to myself";
565 if (log.isDebugEnabled())
566 log.debug("MssHost:simpleSend: send to " + endpoint);
567 ni.send(endpoint, buf, buflen);
568 }
569
570
571
572
573
574 void unicastRouteSend(Msg msg)
575 {
576 assert !isLocal() : "Trying to send to myself: " + msg;
577 if (log.isDebugEnabled())
578 log.debug("MssHost:unicastRouteSend: send to " + endpoint);
579
580 if (cluster.directlyConnected()) {
581
582 MsgCntrl msgCntrl = cluster.getControl();
583 for (FragmentIterator frag = msg.iterator(msgCntrl); frag.hasNext(); ) {
584
585 byte[] buffer = frag.next(false);
586 int bufLen = frag.fragmentLength();
587 if (log.isDebugEnabled())
588 log.debug("send: Sending msg fragment (" + frag.getFid() + ") to host " + endpoint);
589 ni.send(endpoint, buffer, bufLen);
590 }
591
592 } else {
593 log.warn("Cluster appears not to be directly connected (sending through cluster instead): " + cluster);
594 cluster.send(msg);
595
596 }
597 }
598
599
600
601
602
603
604
605
606
607 public int hashCode()
608 {
609 return endpoint.hashCode();
610 }
611
612
613
614
615
616 public String toString()
617 {
618 return endpoint.toString();
619 }
620
621
622
623
624
625
626
627
628 public boolean equals(Object obj)
629 {
630 if (this == obj)
631 return true;
632 else
633 if (obj instanceof MssHost)
634 return endpoint.equals(((MssHost)obj).endpoint);
635 else
636 return false;
637 }
638
639 }