1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.config;
20
21 import static jgroup.relacs.simulator.SocketStatus.SOCKET_STATUS;
22 import static jgroup.util.log.ConnectionPatternEvent.Type.Merge;
23 import static jgroup.util.log.ConnectionPatternEvent.Type.Partition;
24
25 import java.io.Externalizable;
26 import java.io.IOException;
27 import java.io.ObjectInput;
28 import java.io.ObjectOutput;
29 import java.net.InetAddress;
30 import java.net.UnknownHostException;
31 import java.rmi.RemoteException;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.locks.Condition;
37 import java.util.concurrent.locks.Lock;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import jgroup.core.EndPoint;
41 import jgroup.core.registry.BootstrapRegistry;
42 import jgroup.relacs.simulator.SocketStatus;
43 import jgroup.relacs.types.EndPointImpl;
44 import jgroup.util.Abort;
45 import jgroup.util.Network;
46 import jgroup.util.log.ConnectionPatternEvent;
47 import jgroup.util.log.Eventlogger;
48
49 import org.apache.log4j.Logger;
50
51
52
53
54
55
56
57 public class Domain
58 extends EndPointImpl
59 implements Cloneable, Comparable, Externalizable
60 {
61
62
63
64
65
66
67 private static final Logger log = Logger.getLogger(Domain.class);
68
69
70
71
72
73
74 private static final long serialVersionUID = -2367179058534386338L;
75
76
77 private static InetAddress localMcastAddress = null;
78
79 private static final String DEFAULT_MCAST_ADR = "226.7.8.9";
80
81 private static final int DEFAULT_MCAST_PORT = 61341;
82
83
84
85
86
87
88
89 private String domainName;
90
91
92 private HostSet hosts = new HostSet();
93
94
95 private Map<String,Object> content = new HashMap<String,Object>();
96
97
98
99
100
101 private int numOfDaemons = -1;
102
103
104 private int replicaCount = 0;
105
106
107
108
109
110
111
112
113
114 public Domain()
115 {
116 }
117
118
119
120
121
122
123
124
125
126 public Domain(Domain domain, boolean copyHosts)
127 {
128 this(domain.domainName, domain.address, domain.port, domain.numOfDaemons);
129 if (copyHosts)
130 this.hosts = (HostSet) domain.hosts.clone();
131 }
132
133 public Domain(String domainName)
134 throws UnknownHostException
135 {
136 this(domainName, InetAddress.getByName(DEFAULT_MCAST_ADR), DEFAULT_MCAST_PORT, -1);
137 }
138
139 public Domain(String domainName, String mcastAdr, int mcastPort)
140 throws UnknownHostException
141 {
142 this(domainName, InetAddress.getByName(mcastAdr), mcastPort, -1);
143 }
144
145 public Domain(String domainName, String mcastAdr, int mcastPort, int jdaemons)
146 throws UnknownHostException
147 {
148 this(domainName, InetAddress.getByName(mcastAdr), mcastPort, jdaemons);
149 }
150
151 private Domain(String domainName, InetAddress mcastAddress, int mcastPort, int jdaemons)
152 {
153 super(mcastAddress, mcastPort);
154 this.numOfDaemons = jdaemons;
155 if (jdaemons < -1) {
156 throw new IllegalArgumentException("Invalid number Jgroup daemons");
157 }
158
159 this.domainName = domainName;
160
161
162
163
164
165 local = false;
166 if (!mcastAddress.isMulticastAddress())
167 throw new IllegalArgumentException("Domain IP address is not a multicast address.");
168 }
169
170
171
172
173
174
175
176
177
178 public boolean isLocal()
179 {
180 if (localMcastAddress == null)
181 throw new IllegalStateException("Local domain multicast address not initialized");
182 return local;
183 }
184
185
186
187
188
189
190 public boolean isMulticastEndPoint()
191 {
192 return true;
193 }
194
195
196
197
198
199
200
201
202
203 public void setLocal(boolean local)
204 {
205 this.local = local;
206 if (local)
207 localMcastAddress = address;
208 }
209
210
211
212
213
214 public String getName()
215 {
216 return domainName;
217 }
218
219 public EndPoint getEndpoint()
220 {
221 return new EndPointImpl(super.address, super.port);
222 }
223
224
225
226
227 public int size()
228 {
229 return hosts.size();
230 }
231
232
233
234
235
236 public boolean isEmpty()
237 {
238 return hosts.isEmpty();
239 }
240
241
242
243
244
245 public HostSet getHostSet()
246 {
247 return hosts;
248 }
249
250
251
252
253
254
255
256
257
258
259 public boolean addHost(Host host)
260 {
261 return hosts.addHost(host);
262 }
263
264
265
266
267
268 public Object get(String key)
269 {
270 return content.get(key);
271 }
272
273
274
275
276
277 public Object put(String key, Object value)
278 {
279 return content.put(key, value);
280 }
281
282
283
284
285
286 public boolean allDaemons()
287 {
288 return (numOfDaemons == -1);
289 }
290
291
292
293
294
295 public boolean hasNoDaemons()
296 {
297 return (numOfDaemons == 0);
298 }
299
300
301
302
303
304
305
306
307 public int numOfDaemons()
308 {
309 return numOfDaemons;
310 }
311
312
313 public void decReplicaCount()
314 {
315 replicaCount--;
316 }
317
318 public void incReplicaCount()
319 {
320 replicaCount++;
321 }
322
323 public int getReplicaCount()
324 {
325 return replicaCount;
326 }
327
328 public void setReplicaCount(int newReplicaCount)
329 {
330 this.replicaCount = newReplicaCount;
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345 @SuppressWarnings("unchecked")
346 public Object clone()
347 {
348 try {
349 Domain newDomain = (Domain) super.clone();
350 newDomain.content = (HashMap) ((HashMap) content).clone();
351 newDomain.hosts = (HostSet) hosts.clone();
352 return newDomain;
353 } catch (CloneNotSupportedException e) {
354
355 throw new Abort("Internal error", e);
356 }
357 }
358
359
360
361
362
363
364
365
366
367
368
369
370
371 public int compareTo(Object obj)
372 {
373 if (this == obj) {
374 return 0;
375 } else {
376 Domain domain = (Domain) obj;
377 if (replicaCount < domain.replicaCount) {
378 return -1;
379 } else if (replicaCount == domain.replicaCount) {
380 if (size() < domain.size()) {
381
382
383 return 1;
384 } else if (size() == domain.size()) {
385
386
387 return super.compareTo(obj);
388 } else {
389 return -1;
390 }
391 } else {
392 return 1;
393 }
394 }
395 }
396
397
398
399
400
401
402
403
404
405 public String toString()
406 {
407 return toString(false);
408 }
409
410
411
412
413 public String toString(boolean full)
414 {
415 StringBuilder buf = new StringBuilder();
416
417
418
419 buf.append(super.toString());
420
421 if (full) {
422 for (Iterator<Host> i = hosts.iterator(); i.hasNext();) {
423 buf.append("\n ");
424 buf.append((i.next()).toString());
425 }
426 }
427 return buf.toString();
428 }
429
430
431
432
433
434
435
436
437
438
439
440
441 public void readExternal(ObjectInput in)
442 throws ClassNotFoundException, IOException
443 {
444 super.readExternal(in);
445 replicaCount = in.readInt();
446 hosts = (HostSet) in.readObject();
447 content = (Map) in.readObject();
448 }
449
450
451
452
453
454
455 public void writeExternal(ObjectOutput out)
456 throws IOException
457 {
458 super.writeExternal(out);
459 out.writeInt(replicaCount);
460 out.writeObject(hosts);
461 out.writeObject(content);
462 }
463
464
465
466
467
468
469
470
471
472 private static final Map<Host, SocketStatus> statusMap = new HashMap<Host, SocketStatus>();
473
474
475 private static ConnectionPatternEvent connEvent = null;
476
477
478 private static volatile int remainingCommits;
479
480
481 private static final Lock lock = new ReentrantLock();
482 private static final Condition completed = lock.newCondition();
483
484
485
486
487
488
489
490
491
492 public void partition(Domain domain)
493 {
494 if (connEvent == null)
495 connEvent = new ConnectionPatternEvent();
496 connEvent.addEvent(Partition, this, domain);
497 boolean success = setReachability(0, domain);
498 if (!success) {
499
500 clearStatus();
501 if (log.isDebugEnabled())
502 log.debug("Retrying partition: " + domain);
503 success = setReachability(0, domain);
504 if (!success)
505 log.error("FAILED TO RETRY PARTITION: " + domain);
506 }
507 }
508
509
510
511
512
513
514
515 public void merge(Domain domain)
516 {
517 if (connEvent == null)
518 connEvent = new ConnectionPatternEvent();
519 connEvent.addEvent(Merge, this, domain);
520 boolean success = setReachability(100, domain);
521 if (!success) {
522
523 clearStatus();
524 if (log.isDebugEnabled())
525 log.debug("Retrying merge: " + domain);
526 success = setReachability(100, domain);
527 if (!success)
528 log.error("FAILED TO RETRY MERGE: " + domain);
529 }
530 }
531
532
533
534
535
536
537
538
539
540
541 public static void commit(String pattern)
542 {
543 if (connEvent == null)
544 throw new IllegalStateException("Cannot call commit() before partition() or merge()");
545 ConnectionPatternEvent preEvent =
546 new ConnectionPatternEvent("PRE-COMMIT", pattern, connEvent.getConnectionEvents());
547 if (Eventlogger.ENABLED)
548 Eventlogger.logEventFlush(preEvent);
549
550
551 lock.lock();
552 try {
553 remainingCommits = statusMap.size();
554 } finally {
555 lock.unlock();
556 }
557
558
559
560
561
562 for (final Map.Entry<Host, SocketStatus> entry : statusMap.entrySet()) {
563 final Host host = entry.getKey();
564 Thread commitThread = new Thread("Commit-" + host) {
565 public void run() {
566 if (log.isDebugEnabled())
567 log.debug("Committing: " + host);
568 try {
569 entry.getValue().commit();
570 if (log.isDebugEnabled())
571 log.debug("Have committed: " + host);
572 } catch (RemoteException e) {
573 log.warn("Failed to commit: " + host, e);
574 } finally {
575 lock.lock();
576 try {
577
578 remainingCommits--;
579
580
581
582
583 if (remainingCommits == 0)
584 completed.signal();
585 } finally {
586 lock.unlock();
587 }
588 }
589 }
590 };
591 commitThread.setDaemon(true);
592 commitThread.setPriority(Thread.MAX_PRIORITY);
593 commitThread.start();
594 }
595
596
597 lock.lock();
598 try {
599
600 while (remainingCommits > 0) {
601 log.info("Commit threads remaining: " + remainingCommits);
602 try {
603 boolean timeout = !completed.await(5, TimeUnit.SECONDS);
604 if (timeout && remainingCommits > 0)
605 throw new RuntimeException("Timeout during commit; threads remaining: " + remainingCommits);
606 } catch (InterruptedException e) {
607 log.warn("Waiting for commit thread interrupted (should not happen)", e);
608 }
609 }
610 } finally {
611 lock.unlock();
612 }
613
614 ConnectionPatternEvent postEvent =
615 new ConnectionPatternEvent("POST-COMMIT", pattern, connEvent.getConnectionEvents());
616 if (Eventlogger.ENABLED)
617 Eventlogger.logEventFlush(postEvent);
618 if (log.isDebugEnabled())
619 log.debug("ALL COMMITTED");
620
621
622
623
624 connEvent = null;
625 }
626
627
628
629
630
631 public void clearStatus()
632 {
633 statusMap.clear();
634 }
635
636
637
638
639
640
641
642
643
644
645 private boolean setReachability(int reachability, Domain domain)
646 {
647 if (!domain.equals(this)) {
648 for (Host h1 : hosts) {
649 StringBuilder buf = new StringBuilder();
650 buf.append(h1.getHostName());
651 buf.append("(");
652 buf.append(reachability);
653 buf.append("): ");
654 try {
655 SocketStatus h1status = getStatus(h1);
656 for (Host h2 : domain.getHostSet()) {
657 h1status.setStatus(h2.getAddress(), reachability);
658 buf.append(h2.getHostName());
659 buf.append(" ");
660
661 SocketStatus h2status = getStatus(h2);
662 h2status.setStatus(h1.getAddress(), reachability);
663 }
664 if (log.isDebugEnabled())
665 log.debug(buf);
666 } catch (Exception e) {
667 log.warn("Failed to reach host", e);
668
669 return false;
670 }
671 }
672 } else {
673 log.warn(domain + " is the same as this");
674 }
675
676 return true;
677 }
678
679 private SocketStatus getStatus(Host host)
680 throws Exception
681 {
682 SocketStatus status = statusMap.get(host);
683 if (status == null) {
684 try {
685 status = (SocketStatus) host.lookup(SOCKET_STATUS);
686 if (log.isDebugEnabled())
687 log.debug("SocketStatus for " + host + ": " + status);
688 } catch (Exception e) {
689 log.warn(host.getHostName() + " is not available", e);
690 throw e;
691 }
692 statusMap.put(host, status);
693 }
694 return status;
695 }
696
697 }