1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.rmi;
20
21 import java.lang.reflect.InvocationTargetException;
22 import java.rmi.RemoteException;
23 import java.rmi.server.ExportException;
24
25 import jgroup.core.MemberId;
26 import jgroup.core.View;
27 import jgroup.core.multicast.AckListener;
28 import jgroup.relacs.gm.DispatcherService;
29 import net.jini.export.Exporter;
30 import net.jini.jeri.BasicILFactory;
31 import net.jini.jeri.BasicJeriExporter;
32 import net.jini.jeri.tcp.TcpServerEndpoint;
33
34 import org.apache.log4j.Logger;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public final class SynchAckListener
51 implements AckListener
52 {
53
54
55
56
57
58
59 private static final Logger log = Logger.getLogger(SynchAckListener.class);
60
61
62
63
64
65
66
67 private Object[] results;
68
69
70 private boolean[] completed;
71
72
73 private int missing;
74
75
76 private DispatcherService dispatcher;
77
78
79 private Exporter exporter;
80
81
82
83
84
85
86 public SynchAckListener(DispatcherService dispatcher)
87 {
88 this.dispatcher = dispatcher;
89 }
90
91
92
93
94
95
96
97
98
99 public AckListener getRemoteAckListener()
100 throws ExportException
101 {
102
103
104
105
106
107 exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
108 new BasicILFactory(), true, true);
109 return (AckListener) exporter.export(this);
110 }
111
112
113
114
115
116
117 public synchronized void notifyView(View view)
118 throws RemoteException
119 {
120 missing = view.size();
121 completed = new boolean[missing];
122 results = new Object[missing];
123 if (log.isDebugEnabled())
124 log.debug("SynchAckListener: notifyView: " + missing);
125 }
126
127
128 public synchronized void ack(MemberId id, int vpos, Object obj)
129 throws RemoteException
130 {
131 if (log.isDebugEnabled())
132 log.debug("SynchAckListener: ack: " + id + ", " + obj + ", " + vpos);
133 if (obj instanceof InvocationTargetException) {
134 Throwable targetException = ((InvocationTargetException) obj).getTargetException();
135 log.warn("Caught: ", targetException);
136 }
137
138 if (!completed[vpos]) {
139 completed[vpos] = true;
140 results[vpos] = obj;
141 missing--;
142 if (missing == 0) {
143 if (log.isDebugEnabled())
144 log.debug("SynchAckListener: ack: completed");
145 notifyAll();
146 }
147 }
148 }
149
150
151 public synchronized void viewChange()
152 throws RemoteException
153 {
154 if (log.isDebugEnabled())
155 log.debug("SynchAckListener: viewChange");
156 if (missing > 0) {
157 for (int i=0; i < completed.length; i++) {
158 if (!completed[i]) {
159 completed[i] = true;
160 results[i] = new RemoteException("Member not reachable");
161 if (log.isDebugEnabled())
162 log.debug("SynchAckListener: viewChange: unreachable pos " + i);
163 }
164 }
165 }
166 missing = 0;
167
168 exporter.unexport(false);
169
170 notifyAll();
171 }
172
173
174
175
176
177 public synchronized Object getResults()
178 {
179 if (log.isDebugEnabled())
180 log.debug("SynchAckListener: getResults");
181 while (missing > 0 || results == null) {
182 dispatcher.dispatch(this);
183 }
184
185
186
187
188
189
190 exporter.unexport(false);
191 if (log.isDebugEnabled())
192 log.debug("SynchAckListener: getResults completed");
193 return results;
194 }
195
196 }