View Javadoc

1   /*
2    * Copyright (c) 1998-2003 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  package jgroup.relacs.gmi;
19  
20  import static jgroup.util.log.ViewEvent.Type.Client;
21  
22  import java.io.Externalizable;
23  import java.io.IOException;
24  import java.io.ObjectInput;
25  import java.io.ObjectOutput;
26  import java.lang.reflect.Proxy;
27  import java.net.UnknownHostException;
28  import java.rmi.RemoteException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import jgroup.core.GroupUnreachableException;
35  import jgroup.core.MemberId;
36  import jgroup.core.View;
37  import jgroup.core.registry.LookupRegistry;
38  import jgroup.core.registry.RegistryFactory;
39  import jgroup.relacs.config.AppConfig;
40  import jgroup.relacs.simulator.SocketStatusImpl;
41  import jgroup.util.log.Eventlogger;
42  import jgroup.util.log.ViewEvent;
43  import net.jini.core.constraint.InvocationConstraints;
44  import net.jini.jeri.ObjectEndpoint;
45  import net.jini.jeri.OutboundRequest;
46  import net.jini.jeri.OutboundRequestIterator;
47  
48  import org.apache.log4j.Logger;
49  
50  /**
51   * Handles identity of groups.
52   * Knows endpoints of the current view and the group leader.
53   * Responsible for updating itself when needed.
54   * 
55   * @author Tor Arve Stangeland
56   * @author Hein Meling
57   */
58  public class GroupEndPoint
59    implements ObjectEndpoint, Externalizable
60  {
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Logger
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Obtain logger for this class */
67    private static final Logger log = Logger.getLogger(GroupEndPoint.class);
68  
69  
70    ////////////////////////////////////////////////////////////////////////////////////////////
71    // Constants
72    ////////////////////////////////////////////////////////////////////////////////////////////
73  
74    private static final long serialVersionUID = 4971189422261856392L;
75  
76    /**
77     * An object used to obtain information about link connectivity when
78     * the partition simulator has been activated.
79     */
80    private static SocketStatusImpl status = null;
81  
82  
83    ////////////////////////////////////////////////////////////////////////////////////////////
84    // Fields
85    ////////////////////////////////////////////////////////////////////////////////////////////
86  
87    /** Contains all members */
88    private List<MemberId> members = new ArrayList<MemberId>(10);
89  
90    /** The group identifier */
91    private int gid;
92  
93    /** The service lookup name associated with this group endpoint */
94    private String serviceName;
95  
96  
97    ////////////////////////////////////////////////////////////////////////////////////////////
98    // Constructors
99    ////////////////////////////////////////////////////////////////////////////////////////////
100 
101   /**
102    * Constructor for externaliztion.
103    */
104   public GroupEndPoint()
105   {
106     /* Check if the partition simulator should be activated in the client JVM. */
107     if (Boolean.getBoolean("jgroup.simulator")) {
108       status = SocketStatusImpl.instance();
109     }
110   } 
111 
112   /**
113    * Creates an empty group endpoint for the given serviceName.
114    */
115   GroupEndPoint(String serviceName)
116   {
117     this.serviceName = serviceName;
118     this.gid = -1;
119   }
120 
121   /**
122    * Creates a group endpoint with given <code>MemberId</code>.  This 
123    * constructor uses the group identifier to obtain the registry service 
124    * name.
125    */
126   GroupEndPoint(int gid, MemberId me)
127   {
128     this.gid = gid;
129     this.serviceName = AppConfig.getApplication(gid).getRegistryName();
130     addMember(me);
131   }
132 
133 
134   ////////////////////////////////////////////////////////////////////////////////////////////
135   // Access methods
136   ////////////////////////////////////////////////////////////////////////////////////////////
137 
138   /**
139    * Returns the group identifier associated with this group endpoint proxy.
140    */
141   public int getGroupId()
142   {
143     return gid;
144   }
145 
146   /**
147    * Add <code>MemberId</code>s to the given set of members.
148    */
149   public void addMembers(Iterator<MemberId> iter)
150   {
151     while (iter.hasNext())
152       addMember(iter.next());
153   }
154 
155   /**
156    * Add the given <code>MemberId</code> to the this 
157    * group endpoint.
158    */
159   public void addMember(MemberId member)
160   {
161     if (!members.contains(member))
162       members.add(member);
163   }
164 
165   /**
166    * Returns an iterator over the set of <code>MemberId</code>s 
167    * in this group endpoint.
168    */
169   public Iterator<MemberId> getMembers()
170   {
171     return members.iterator();
172   }
173 
174   /**
175    *  Update our list of group member endpoints with the
176    *  most recent entries known to the naming service used,
177    *  for example the dependable registry or the group enabled
178    *  Reggie (greg).
179    */
180   private void updateGroupEndpoints()
181     throws Exception
182   {
183     Proxy newProxy = null;
184     if (gid == 1) {
185       /*
186        * If the proxy itself is the dependable registry, then we
187        * we search the bootstrap registries for new dependable
188        * registry instances.
189        */
190       newProxy = (Proxy) RegistryFactory.getRegistry();
191     } else {
192       LookupRegistry reg = RegistryFactory.getLookupRegistry();
193       newProxy = (Proxy) reg.lookup(serviceName);
194     }
195     if (log.isDebugEnabled())
196       log.debug("Updated proxy for: " + serviceName);
197     GroupInvocationHandler newHandler =
198       (GroupInvocationHandler) Proxy.getInvocationHandler(newProxy);
199     members = newHandler.getGroupEndPoint().members;
200     /* Check if the reference list is still empty */
201     if (members.isEmpty())
202       throw new RemoteException("No members in group associated with: " + serviceName);
203     Collections.sort(members);
204     if (log.isDebugEnabled())
205       log.debug("New member list: " + members);
206     if (Eventlogger.ENABLED)
207       Eventlogger.logEvent("UpdatedGroupEndpoints: members.size=" + members.size() + ": " + members);
208   }
209 
210 //  private long clientViewId = -1;
211 //  private boolean first = true;
212 
213   /**
214    * Set the most recent view identifier for this group.  This method is
215    * invoked on the client-side to update the client-side view identifier.
216    * This is used to detect changes in the membership, and perform client-side
217    * proxy updates.
218    * 
219    * This method is used only to test the periodic refresh technique, and
220    * should not be used to update the group endpoints in a real system.
221    */
222 //  public void notifyView(long serverViewId)
223 //    throws Exception
224 //  {
225 //    if (serverViewId != clientViewId) {
226 //      if (clientViewId != -1) {
227 //        if (Debug.MEASURE)
228 //          Debug.logMem("NewViewDetected view.vid=" + serverViewId);
229 //        if (log.isDebugEnabled())
230 //          log.debug("New server-side view detected");
231 //        if (!first) {
232 //          new Thread("UpdateGroupEndpoints") {
233 //            public void run() {
234 //              try {
235 //                Thread.sleep(rnd.nextInt(15000));
236 //                updateGroupEndpoints();
237 //              } catch (Exception e) {
238 //              }
239 //            }
240 //          }.start();
241 //        }
242 //        first = false;
243 //      }
244 //      clientViewId = serverViewId;
245 //    }
246 //  }
247 
248   /**
249    * Update the client-side side member list according to the given view.
250    * <p>
251    * This updating is only done on the client-side in response to receiving
252    * a new view from the server-side.
253    *
254    * @param view the new server-side view; <code>null</code> views will
255    *   be ignored.
256    */
257   public void updateClientView(View view)
258   {
259     if (view == null)
260       return;
261     if (Eventlogger.ENABLED)
262       Eventlogger.logEvent(new ViewEvent(Client, view));
263     members.clear();
264     MemberId[] members = view.getMembers();
265     for (int i = 0; i < members.length; i++) {
266       addMember(members[i]);
267     }
268     if (log.isDebugEnabled())
269       log.debug("ClientViewUpdate: " + members);
270   }
271 
272 
273   ////////////////////////////////////////////////////////////////////////////////////////////
274   // Methods from ObjectEndpoint
275   ////////////////////////////////////////////////////////////////////////////////////////////
276 
277   /* (non-Javadoc)
278    * @see net.jini.jeri.ObjectEndpoint#newCall(net.jini.core.constraint.InvocationConstraints)
279    */
280   public OutboundRequestIterator newCall(final InvocationConstraints constraints)
281   {
282     throw new UnsupportedOperationException("Invocation constraints not supported");
283   }
284   
285   
286   /** 
287    * Returns an <code>OutboundRequestIterator</code> to use to send remote call to the 
288    * referenced remote object. 
289    */
290   public OutboundRequestIterator newCall(MethodSemantics semantics) 
291   {
292     // Get the member containing the endpoint to use
293     final MemberId member = selectMember(semantics);
294     // Get the request iterator to wrap
295     final OutboundRequestIterator iter = member.getTcpEndpoint().newRequest(InvocationConstraints.EMPTY);
296 
297     return new OutboundRequestIterator() {
298 
299       public boolean hasNext()
300       {
301         /*
302          * Assume that Endpoint returns false if unable to communicate
303          * with the server endpoint; i.e. there is no point in retrying.
304          */
305         boolean hasMore = iter.hasNext();
306         if (!hasMore) {
307           // Remove failed members from client-side proxy
308           members.remove(member);
309           if (log.isDebugEnabled())
310             log.debug("Removed failed member from client-side proxy: " + member);
311         }
312         return hasMore;
313       }
314 
315       public OutboundRequest next()
316         throws IOException
317       {
318         if (log.isDebugEnabled())
319           log.debug(GroupEndPoint.this);
320         return iter.next();
321       }
322       
323     };
324   } 
325 
326   /* (non-Javadoc)
327    * @see net.jini.jeri.ObjectEndpoint#executeCall(net.jini.jeri.OutboundRequest)
328    */
329   public RemoteException executeCall(OutboundRequest call)
330     throws IOException
331   {
332     int response = call.getResponseInputStream().read();
333     switch (response) {
334       case InvocationResult.INVOCATION_COMPLETED:
335         return null;
336 
337       case InvocationResult.INVOCATION_BLOCKED:
338         try {
339           Thread.sleep(100);
340         } catch (InterruptedException e) { }
341         if (log.isDebugEnabled())
342           log.debug("Invocation was blocked; retrying on another node.");
343         throw new IOException("Invocation was blocked; retrying on another node.");
344 
345       case InvocationResult.INVOCATION_FAILED:
346         // There was an error at the server-side request handler level
347         return new RemoteException("Invocation failed; server-side was unable to handle request");
348 
349       case InvocationResult.UNKNOWN_INVOCATION_SEMANTICS:
350         return new RemoteException("Unknown invocation semantics");
351 
352       default :
353         return new RemoteException("Unknown response code: " + response);
354     }
355   }
356 
357 
358   ////////////////////////////////////////////////////////////////////////////////////////////
359   // Methods for selecting the communication endpoint (the group member)
360   ////////////////////////////////////////////////////////////////////////////////////////////
361 
362   private MemberId selectMember(MethodSemantics semantics)
363   {
364     List<MemberId> eps = members;
365     Collections.sort(members);
366     if (status != null)
367       eps = accessibleMembers();
368     MemberId member = semantics.selectMember(eps);
369     if (member == null) {
370       /* Try to refresh the proxy endpoint list */
371       try {
372         updateGroupEndpoints();
373       } catch (Exception e) {
374         throw new GroupUnreachableException(
375             "unable to update the client-proxy from the naming service", e);
376       }
377       if (status != null) {
378         eps = accessibleMembers();
379         member = semantics.selectMember(eps);
380       } else
381         member = semantics.selectMember(members);
382     }
383     if (member == null)
384       throw new GroupUnreachableException("no remaining members");
385     if (log.isDebugEnabled())
386       log.debug("Selected member: " + member);
387     return member;
388   }
389 
390 
391   /**
392    * Returns a list of <code>MemberId</code>s that is accessible
393    * according to the partition simulator.
394    */
395   private List<MemberId> accessibleMembers()
396   {
397     List<MemberId> list = new ArrayList<MemberId>(members.size());
398     for (Iterator<MemberId> iter = members.iterator(); iter.hasNext();) {
399       MemberId member = iter.next();
400       String hostname = member.getCanonicalHostName();
401       try {
402         if (status.isReliable(hostname)) {
403           list.add(member);
404         } else {
405           if (log.isDebugEnabled())
406             log.debug("Partition simulator: Broken link to: " + hostname);
407         }
408       } catch (UnknownHostException e) {
409         e.printStackTrace();
410       }
411     }
412     return list;
413   }
414 
415 
416   ////////////////////////////////////////////////////////////////////////////////////////////
417   // Methods from Object
418   ////////////////////////////////////////////////////////////////////////////////////////////
419 
420   public int hashCode()
421   {
422     return members.hashCode();
423   }
424 
425   public String toString()
426   {
427     StringBuilder buf = new StringBuilder("[GroupEndPoint: serviceName=");
428     buf.append(serviceName);
429     buf.append(", members=");
430     buf.append(members);
431     buf.append("]");
432     return buf.toString();
433   }
434 
435   ////////////////////////////////////////////////////////////////////////////////////////////
436   // Methods from Externalization
437   ////////////////////////////////////////////////////////////////////////////////////////////
438 
439   /* (non-Javadoc)
440    * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
441    */
442   @SuppressWarnings("unchecked")
443   public void readExternal(ObjectInput in)
444     throws IOException, ClassNotFoundException
445   {
446     serviceName = in.readUTF();
447     gid = in.readInt();
448     members = (List) in.readObject();
449   }
450 
451   /* (non-Javadoc)
452    * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
453    */
454   public void writeExternal(ObjectOutput out)
455     throws IOException
456   {
457     out.writeUTF(serviceName);
458     out.writeInt(gid);
459     out.writeObject(members);
460   }
461 
462 } // END GroupEndPoint