1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package jgroup.relacs.gmi;
19
20 import static jgroup.util.log.ViewEvent.Type.Client;
21
22 import java.io.Externalizable;
23 import java.io.IOException;
24 import java.io.ObjectInput;
25 import java.io.ObjectOutput;
26 import java.lang.reflect.Proxy;
27 import java.net.UnknownHostException;
28 import java.rmi.RemoteException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.Iterator;
32 import java.util.List;
33
34 import jgroup.core.GroupUnreachableException;
35 import jgroup.core.MemberId;
36 import jgroup.core.View;
37 import jgroup.core.registry.LookupRegistry;
38 import jgroup.core.registry.RegistryFactory;
39 import jgroup.relacs.config.AppConfig;
40 import jgroup.relacs.simulator.SocketStatusImpl;
41 import jgroup.util.log.Eventlogger;
42 import jgroup.util.log.ViewEvent;
43 import net.jini.core.constraint.InvocationConstraints;
44 import net.jini.jeri.ObjectEndpoint;
45 import net.jini.jeri.OutboundRequest;
46 import net.jini.jeri.OutboundRequestIterator;
47
48 import org.apache.log4j.Logger;
49
50
51
52
53
54
55
56
57
58 public class GroupEndPoint
59 implements ObjectEndpoint, Externalizable
60 {
61
62
63
64
65
66
67 private static final Logger log = Logger.getLogger(GroupEndPoint.class);
68
69
70
71
72
73
74 private static final long serialVersionUID = 4971189422261856392L;
75
76
77
78
79
80 private static SocketStatusImpl status = null;
81
82
83
84
85
86
87
88 private List<MemberId> members = new ArrayList<MemberId>(10);
89
90
91 private int gid;
92
93
94 private String serviceName;
95
96
97
98
99
100
101
102
103
104 public GroupEndPoint()
105 {
106
107 if (Boolean.getBoolean("jgroup.simulator")) {
108 status = SocketStatusImpl.instance();
109 }
110 }
111
112
113
114
115 GroupEndPoint(String serviceName)
116 {
117 this.serviceName = serviceName;
118 this.gid = -1;
119 }
120
121
122
123
124
125
126 GroupEndPoint(int gid, MemberId me)
127 {
128 this.gid = gid;
129 this.serviceName = AppConfig.getApplication(gid).getRegistryName();
130 addMember(me);
131 }
132
133
134
135
136
137
138
139
140
141 public int getGroupId()
142 {
143 return gid;
144 }
145
146
147
148
149 public void addMembers(Iterator<MemberId> iter)
150 {
151 while (iter.hasNext())
152 addMember(iter.next());
153 }
154
155
156
157
158
159 public void addMember(MemberId member)
160 {
161 if (!members.contains(member))
162 members.add(member);
163 }
164
165
166
167
168
169 public Iterator<MemberId> getMembers()
170 {
171 return members.iterator();
172 }
173
174
175
176
177
178
179
180 private void updateGroupEndpoints()
181 throws Exception
182 {
183 Proxy newProxy = null;
184 if (gid == 1) {
185
186
187
188
189
190 newProxy = (Proxy) RegistryFactory.getRegistry();
191 } else {
192 LookupRegistry reg = RegistryFactory.getLookupRegistry();
193 newProxy = (Proxy) reg.lookup(serviceName);
194 }
195 if (log.isDebugEnabled())
196 log.debug("Updated proxy for: " + serviceName);
197 GroupInvocationHandler newHandler =
198 (GroupInvocationHandler) Proxy.getInvocationHandler(newProxy);
199 members = newHandler.getGroupEndPoint().members;
200
201 if (members.isEmpty())
202 throw new RemoteException("No members in group associated with: " + serviceName);
203 Collections.sort(members);
204 if (log.isDebugEnabled())
205 log.debug("New member list: " + members);
206 if (Eventlogger.ENABLED)
207 Eventlogger.logEvent("UpdatedGroupEndpoints: members.size=" + members.size() + ": " + members);
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 public void updateClientView(View view)
258 {
259 if (view == null)
260 return;
261 if (Eventlogger.ENABLED)
262 Eventlogger.logEvent(new ViewEvent(Client, view));
263 members.clear();
264 MemberId[] members = view.getMembers();
265 for (int i = 0; i < members.length; i++) {
266 addMember(members[i]);
267 }
268 if (log.isDebugEnabled())
269 log.debug("ClientViewUpdate: " + members);
270 }
271
272
273
274
275
276
277
278
279
280 public OutboundRequestIterator newCall(final InvocationConstraints constraints)
281 {
282 throw new UnsupportedOperationException("Invocation constraints not supported");
283 }
284
285
286
287
288
289
290 public OutboundRequestIterator newCall(MethodSemantics semantics)
291 {
292
293 final MemberId member = selectMember(semantics);
294
295 final OutboundRequestIterator iter = member.getTcpEndpoint().newRequest(InvocationConstraints.EMPTY);
296
297 return new OutboundRequestIterator() {
298
299 public boolean hasNext()
300 {
301
302
303
304
305 boolean hasMore = iter.hasNext();
306 if (!hasMore) {
307
308 members.remove(member);
309 if (log.isDebugEnabled())
310 log.debug("Removed failed member from client-side proxy: " + member);
311 }
312 return hasMore;
313 }
314
315 public OutboundRequest next()
316 throws IOException
317 {
318 if (log.isDebugEnabled())
319 log.debug(GroupEndPoint.this);
320 return iter.next();
321 }
322
323 };
324 }
325
326
327
328
329 public RemoteException executeCall(OutboundRequest call)
330 throws IOException
331 {
332 int response = call.getResponseInputStream().read();
333 switch (response) {
334 case InvocationResult.INVOCATION_COMPLETED:
335 return null;
336
337 case InvocationResult.INVOCATION_BLOCKED:
338 try {
339 Thread.sleep(100);
340 } catch (InterruptedException e) { }
341 if (log.isDebugEnabled())
342 log.debug("Invocation was blocked; retrying on another node.");
343 throw new IOException("Invocation was blocked; retrying on another node.");
344
345 case InvocationResult.INVOCATION_FAILED:
346
347 return new RemoteException("Invocation failed; server-side was unable to handle request");
348
349 case InvocationResult.UNKNOWN_INVOCATION_SEMANTICS:
350 return new RemoteException("Unknown invocation semantics");
351
352 default :
353 return new RemoteException("Unknown response code: " + response);
354 }
355 }
356
357
358
359
360
361
362 private MemberId selectMember(MethodSemantics semantics)
363 {
364 List<MemberId> eps = members;
365 Collections.sort(members);
366 if (status != null)
367 eps = accessibleMembers();
368 MemberId member = semantics.selectMember(eps);
369 if (member == null) {
370
371 try {
372 updateGroupEndpoints();
373 } catch (Exception e) {
374 throw new GroupUnreachableException(
375 "unable to update the client-proxy from the naming service", e);
376 }
377 if (status != null) {
378 eps = accessibleMembers();
379 member = semantics.selectMember(eps);
380 } else
381 member = semantics.selectMember(members);
382 }
383 if (member == null)
384 throw new GroupUnreachableException("no remaining members");
385 if (log.isDebugEnabled())
386 log.debug("Selected member: " + member);
387 return member;
388 }
389
390
391
392
393
394
395 private List<MemberId> accessibleMembers()
396 {
397 List<MemberId> list = new ArrayList<MemberId>(members.size());
398 for (Iterator<MemberId> iter = members.iterator(); iter.hasNext();) {
399 MemberId member = iter.next();
400 String hostname = member.getCanonicalHostName();
401 try {
402 if (status.isReliable(hostname)) {
403 list.add(member);
404 } else {
405 if (log.isDebugEnabled())
406 log.debug("Partition simulator: Broken link to: " + hostname);
407 }
408 } catch (UnknownHostException e) {
409 e.printStackTrace();
410 }
411 }
412 return list;
413 }
414
415
416
417
418
419
420 public int hashCode()
421 {
422 return members.hashCode();
423 }
424
425 public String toString()
426 {
427 StringBuilder buf = new StringBuilder("[GroupEndPoint: serviceName=");
428 buf.append(serviceName);
429 buf.append(", members=");
430 buf.append(members);
431 buf.append("]");
432 return buf.toString();
433 }
434
435
436
437
438
439
440
441
442 @SuppressWarnings("unchecked")
443 public void readExternal(ObjectInput in)
444 throws IOException, ClassNotFoundException
445 {
446 serviceName = in.readUTF();
447 gid = in.readInt();
448 members = (List) in.readObject();
449 }
450
451
452
453
454 public void writeExternal(ObjectOutput out)
455 throws IOException
456 {
457 out.writeUTF(serviceName);
458 out.writeInt(gid);
459 out.writeObject(members);
460 }
461
462 }