View Javadoc

1   /*
2    * Copyright (c) 1998-2004 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi;
20  
21  import java.rmi.RemoteException;
22  import java.rmi.server.ExportException;
23  
24  import jgroup.core.MemberId;
25  import jgroup.core.View;
26  import jgroup.core.multicast.AckListener;
27  import net.jini.export.Exporter;
28  import net.jini.jeri.BasicILFactory;
29  import net.jini.jeri.BasicJeriExporter;
30  import net.jini.jeri.tcp.TcpServerEndpoint;
31  
32  import org.apache.log4j.Logger;
33  import org.apache.log4j.MDC;
34  
35  /**
36   * Handler for multicast return messages from all members of the current view.
37   *
38   * @author Hein Meling
39   * @author Tor Arve Stangeland
40   */
41  public class GroupAckListenerOld
42    implements AckListener
43  {
44  
45    ////////////////////////////////////////////////////////////////////////////////////////////
46    // Logger
47    ////////////////////////////////////////////////////////////////////////////////////////////
48  
49    /** Obtain logger for this class */
50    private static final Logger log = Logger.getLogger(GroupAckListenerOld.class);
51  
52  
53    ////////////////////////////////////////////////////////////////////////////////////////////
54    // Constants
55    ////////////////////////////////////////////////////////////////////////////////////////////
56  
57    /** States */
58    private static final int STARTING = 1;
59  
60    private static final int WAITING = 2;
61  
62    private static final int COMPLETED = 3;
63  
64  
65    ////////////////////////////////////////////////////////////////////////////////////////////
66    // Fields
67    ////////////////////////////////////////////////////////////////////////////////////////////
68  
69    /** Current state */
70    private volatile int state = STARTING;
71  
72    /** Number of unacknowledged messages */
73    private volatile int missing;
74  
75    /** Results from servers */
76    protected Object[] results;
77  
78    /** Flag completed servers */
79    protected boolean[] completed;
80  
81    /** The BasicJeriExporter used by this AckListener */
82    private Exporter exporter;
83  
84    /** The object on which to synchronize */
85    private final Object lockObj;
86  
87    /** The group identifier associated with this AckListener; initialized in notifyView() */
88    private int groupId;
89  
90  
91    ////////////////////////////////////////////////////////////////////////////////////////////
92    // Constructor
93    ////////////////////////////////////////////////////////////////////////////////////////////
94  
95    public GroupAckListenerOld(Object lockObj)
96    {
97      this.lockObj = lockObj;
98    }
99  
100 
101   ////////////////////////////////////////////////////////////////////////////////////////////
102   // Methods
103   ////////////////////////////////////////////////////////////////////////////////////////////
104 
105   /**
106    * Get a reference for the remote <code>AckListener</code>.
107    * 
108    * @return A remote proxy for the <code>AckListener</code>.
109    * @throws ExportException
110    *   Raised if the <code>AckListener</code> could not be exported.
111    */
112   public AckListener getRemoteAckListener()
113     throws ExportException
114   {
115     /*
116      * Create a new exporter on any available port for this AckListener, and
117      * enable the distributed garbage collection (DGC), and enable the keep
118      * alive attribute.
119      */
120     exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
121         new BasicILFactory(), true, true);
122     return (AckListener) exporter.export(this);
123   }
124 
125 
126   ////////////////////////////////////////////////////////////////////////////////////////////
127   // Methods from AckListener
128   ////////////////////////////////////////////////////////////////////////////////////////////
129 
130   /* (non-Javadoc)
131    * @see jgroup.core.multicast.AckListener#ack(jgroup.core.MemberId, int, java.lang.Object)
132    */
133   public void ack(MemberId id, int pos, Object obj)
134     throws RemoteException
135   {
136     if (log.isDebugEnabled()) {
137       MDC.put("group", "[Group: " + groupId + "]");
138       log.debug("Got ACK from " + id + ", pos=" + pos + ", reply=" + obj);
139     }
140     completed[pos] = true;
141     results[pos] = obj;
142     missing--;
143     updateState();
144   }
145 
146   /* (non-Javadoc)
147    * @see jgroup.core.multicast.AckListener#notifyView(jgroup.core.View)
148    */
149   public void notifyView(View view)
150     throws RemoteException
151   {
152     groupId = view.getGid();
153     if (log.isDebugEnabled())
154       MDC.put("group", "[Group: " + groupId + "]");
155     missing = view.size();
156     completed = new boolean[missing];
157     results = new Object[missing];
158     state = WAITING;
159     if (log.isDebugEnabled())
160       log.debug(this);
161   }
162 
163   /* (non-Javadoc)
164    * @see jgroup.core.multicast.AckListener#viewChange()
165    */
166   public void viewChange()
167     throws RemoteException
168   {
169     if (log.isDebugEnabled()) {
170       MDC.put("group", "[Group: " + groupId + "]");
171       log.debug(this);
172     }
173     if (missing > 0) {
174       for (int i = 0; i < completed.length; i++) {
175         if (!completed[i]) {
176           completed[i] = true;
177           results[i] = new RemoteException("Member not reachable");
178           if (log.isDebugEnabled())
179             log.debug("Unreachable member at position " + i);
180         }
181       }
182     }
183     missing = 0;
184     updateState();
185     // Unexport this remote AckListener; no more remote calls should occur
186     exporter.unexport(false);
187   }
188 
189   /**
190    * Update the internal state.
191    */
192   private void updateState()
193   {
194     synchronized (lockObj) {
195       if (missing == 0) {
196         state = COMPLETED;
197         if (log.isDebugEnabled())
198           log.debug(this);
199         lockObj.notifyAll();
200       } else {
201         if (log.isDebugEnabled())
202           log.debug(this);
203       }
204     }
205   }
206 
207 
208   ////////////////////////////////////////////////////////////////////////////////////////////
209   // Methods for accessing the results array
210   ////////////////////////////////////////////////////////////////////////////////////////////
211 
212   /**
213    * Retrieves the invocation result of any completed member.
214    */
215   public Object getResult()
216   {
217     pendingCompletion();
218     // Select first completed result that is not a RemoteException
219     for (int i = 0; i < results.length; ++i)
220       if (completed[i] && !(results[i] instanceof RemoteException))
221         return results[i];
222 
223     /*
224      * If no actual results were returned, check if there is a RemoteException
225      * caused by a viewChange() and if so wrap it in a InvocationResult object
226      * for delivery to the client.
227      */
228     for (int i = 0; i < results.length; ++i)
229       if (completed[i] && results[i] instanceof RemoteException)
230         return new InvocationResult((RemoteException) results[i]);
231 
232     log.warn("No results have been received!");
233     return null;
234   }
235 
236   /**
237    * Retrieves the invocation result obtained from the leader.
238    */
239   public Object getLeaderResult()
240   {
241     pendingCompletion();
242     if (completed[0]) {
243       if (results[0] instanceof RemoteException)
244         return new InvocationResult((RemoteException) results[0]);
245       else
246         return results[0];
247     }
248 
249     log.warn("No results from leader found!");
250     return null;
251   }
252 
253   /**
254    * Retrieves the whole array of invocation results (from all members).
255    * The results array <i>may</i> contain one or more <code>Exception</code>s.
256    */
257   public Object[] getResults()
258   {
259     pendingCompletion();
260     return results;
261   }
262 
263   /**
264    * Blocks until the state of the listener is <code>COMPLETED</code>.
265    */
266   protected void pendingCompletion()
267   {
268     if (log.isDebugEnabled())
269       log.debug(this);
270     synchronized (lockObj) {
271       while (state != COMPLETED) {
272         try {
273           lockObj.wait();
274         } catch (InterruptedException e) {
275         }
276         if (log.isDebugEnabled())
277           log.debug(this);
278       }
279     }
280     /*
281      * Unexport this remote AckListener; note that calls to the
282      * AckListener.viewChange() method may occur after the remote AckListener
283      * has been unexported. This should be dealt with at the client-side, as
284      * this will cause a NoSuchObjectException
285      */
286     exporter.unexport(false);
287   }
288 
289 
290   ////////////////////////////////////////////////////////////////////////////////////////////
291   // Methods from Object
292   ////////////////////////////////////////////////////////////////////////////////////////////
293 
294   public String toString()
295   {
296     StringBuilder b = new StringBuilder("GroupAckListener: missing=");
297     b.append(missing);
298     switch (state) {
299       case STARTING:
300         b.append(", STARTING");
301         break;
302 
303       case WAITING:
304         b.append(", WAITING");
305         break;
306 
307       case COMPLETED:
308         b.append(", COMPLETED");
309         break;
310     }
311     b.append(", hash=");
312     b.append(hashCode());
313     return b.toString();
314   }
315 
316 } // END GroupAckListener