View Javadoc

1   /*
2    * Copyright (c) 1998-2004 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi;
20  
21  import java.rmi.RemoteException;
22  import java.rmi.server.ExportException;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.locks.Condition;
25  import java.util.concurrent.locks.Lock;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import jgroup.core.MemberId;
29  import jgroup.core.View;
30  import jgroup.core.multicast.AckListener;
31  import net.jini.export.Exporter;
32  import net.jini.jeri.BasicILFactory;
33  import net.jini.jeri.BasicJeriExporter;
34  import net.jini.jeri.tcp.TcpServerEndpoint;
35  
36  import org.apache.log4j.Logger;
37  import org.apache.log4j.MDC;
38  
39  /**
40   * Handler for multicast return messages from all members of the current view.
41   *
42   * @author Hein Meling
43   * @author Tor Arve Stangeland
44   */
45  public class GroupAckListener
46    implements AckListener
47  {
48  
49    ////////////////////////////////////////////////////////////////////////////////////////////
50    // Logger
51    ////////////////////////////////////////////////////////////////////////////////////////////
52  
53    /** Obtain logger for this class */
54    private static final Logger log = Logger.getLogger(GroupAckListener.class);
55  
56  
57    ////////////////////////////////////////////////////////////////////////////////////////////
58    // Constants
59    ////////////////////////////////////////////////////////////////////////////////////////////
60  
61    /** States */
62    private static final int STARTING = 1;
63  
64    private static final int WAITING = 2;
65  
66    private static final int COMPLETED = 3;
67  
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Fields
71    ////////////////////////////////////////////////////////////////////////////////////////////
72  
73    /** Current state */
74    private volatile int state = STARTING;
75  
76    /** Number of unacknowledged messages */
77    private volatile int missing;
78  
79    /** Results from servers */
80    protected Object[] results;
81  
82    /** Flag completed servers */
83    protected boolean[] completed;
84  
85    /** The BasicJeriExporter used by this AckListener */
86    private Exporter exporter;
87  
88    /** Lock and condition used to determine completion */
89    private final Lock lock = new ReentrantLock();
90    private final Condition allCompleted = lock.newCondition();
91  
92    /** The group identifier associated with this AckListener; initialized in notifyView() */
93    private int groupId;
94  
95  
96    ////////////////////////////////////////////////////////////////////////////////////////////
97    // Constructor
98    ////////////////////////////////////////////////////////////////////////////////////////////
99  
100   public GroupAckListener(Object lockObj)
101   {
102   }
103 
104 
105   ////////////////////////////////////////////////////////////////////////////////////////////
106   // Methods
107   ////////////////////////////////////////////////////////////////////////////////////////////
108 
109   /**
110    * Get a reference for the remote <code>AckListener</code>.
111    * 
112    * @return A remote proxy for the <code>AckListener</code>.
113    * @throws ExportException
114    *   Raised if the <code>AckListener</code> could not be exported.
115    */
116   public AckListener getRemoteAckListener()
117     throws ExportException
118   {
119     /*
120      * Create a new exporter on any available port for this AckListener, and
121      * enable the distributed garbage collection (DGC), and enable the keep
122      * alive attribute.
123      */
124     exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
125         new BasicILFactory(), true, true);
126     return (AckListener) exporter.export(this);
127   }
128 
129 
130   ////////////////////////////////////////////////////////////////////////////////////////////
131   // Methods from AckListener
132   ////////////////////////////////////////////////////////////////////////////////////////////
133 
134   /* (non-Javadoc)
135    * @see jgroup.core.multicast.AckListener#ack(jgroup.core.MemberId, int, java.lang.Object)
136    */
137   public void ack(MemberId id, int pos, Object obj)
138     throws RemoteException
139   {
140     if (log.isDebugEnabled()) {
141       MDC.put("group", "[Group: " + groupId + "]");
142       log.debug("Got ACK from " + id + ", pos=" + pos + ", reply=" + obj);
143     }
144     completed[pos] = true;
145     results[pos] = obj;
146     missing--;
147     updateState();
148   }
149 
150   /* (non-Javadoc)
151    * @see jgroup.core.multicast.AckListener#notifyView(jgroup.core.View)
152    */
153   public void notifyView(View view)
154     throws RemoteException
155   {
156     groupId = view.getGid();
157     if (log.isDebugEnabled())
158       MDC.put("group", "[Group: " + groupId + "]");
159     missing = view.size();
160     completed = new boolean[missing];
161     results = new Object[missing];
162     state = WAITING;
163     if (log.isDebugEnabled())
164       log.debug(this);
165   }
166 
167   /* (non-Javadoc)
168    * @see jgroup.core.multicast.AckListener#viewChange()
169    */
170   public void viewChange()
171     throws RemoteException
172   {
173     if (log.isDebugEnabled()) {
174       MDC.put("group", "[Group: " + groupId + "]");
175       log.debug(this);
176     }
177     if (missing > 0) {
178       for (int i = 0; i < completed.length; i++) {
179         if (!completed[i]) {
180           completed[i] = true;
181           results[i] = new RemoteException("Member not reachable");
182           if (log.isDebugEnabled())
183             log.debug("Unreachable member at position " + i);
184         }
185       }
186     }
187     missing = 0;
188     updateState();
189     // Unexport this remote AckListener; no more remote calls should occur
190     exporter.unexport(false);
191   }
192 
193   /**
194    * Update the internal state.
195    */
196   private void updateState()
197   {
198     lock.lock();
199     try {
200       if (missing == 0) {
201         state = COMPLETED;
202         if (log.isDebugEnabled())
203           log.debug(this);
204         allCompleted.signal();
205       } else {
206         if (log.isDebugEnabled())
207           log.debug(this);
208       }
209     } finally {
210       lock.unlock();
211     }
212   }
213 
214 
215   ////////////////////////////////////////////////////////////////////////////////////////////
216   // Methods for accessing the results array
217   ////////////////////////////////////////////////////////////////////////////////////////////
218 
219   /**
220    * Retrieves the invocation result of any completed member.
221    */
222   public Object getResult()
223   {
224     pendingCompletion();
225     // Select first completed result that is not a RemoteException
226     for (int i = 0; i < results.length; ++i)
227       if (completed[i] && !(results[i] instanceof RemoteException))
228         return results[i];
229 
230     /*
231      * If no actual results were returned, check if there is a RemoteException
232      * caused by a viewChange() and if so wrap it in a InvocationResult object
233      * for delivery to the client.
234      */
235     for (int i = 0; i < results.length; ++i)
236       if (completed[i] && results[i] instanceof RemoteException)
237         return new InvocationResult((RemoteException) results[i]);
238 
239     log.warn("No results have been received!");
240     return null;
241   }
242 
243   /**
244    * Retrieves the invocation result obtained from the leader.
245    */
246   public Object getLeaderResult()
247   {
248     pendingCompletion();
249     if (completed[0]) {
250       if (results[0] instanceof RemoteException)
251         return new InvocationResult((RemoteException) results[0]);
252       else
253         return results[0];
254     }
255 
256     log.warn("No results from leader found!");
257     return null;
258   }
259 
260   /**
261    * Retrieves the whole array of invocation results (from all members).
262    * The results array <i>may</i> contain one or more <code>Exception</code>s.
263    */
264   public Object[] getResults()
265   {
266     pendingCompletion();
267     return results;
268   }
269 
270   /**
271    * Blocks until the state of the listener is <code>COMPLETED</code>.
272    */
273   protected void pendingCompletion()
274   {
275     if (log.isDebugEnabled())
276       log.debug(this);
277     lock.lock();
278     try {
279       while (state != COMPLETED) {
280         // wait for signal
281         allCompleted.awaitUninterruptibly();
282         if (log.isDebugEnabled())
283           log.debug(this);
284       }
285     } finally {
286       lock.unlock();
287       /*
288        * Unexport this remote AckListener; note that calls to the
289        * AckListener.viewChange() method may occur after the remote AckListener
290        * has been unexported. This should be dealt with at the client-side, as
291        * this will cause a NoSuchObjectException
292        */
293       exporter.unexport(false);
294     }
295   }
296 
297 
298   /**
299    * Blocks until the state of the listener is <code>COMPLETED</code>
300    * or until the timeout expires.
301    * 
302    * @param timeout the timeout delay (in seconds) to wait before returning.
303    * @return true if results are available.
304    */
305   public boolean pendingCompletionOrTimeout(long timeout)
306   {
307     if (log.isDebugEnabled())
308       log.debug(this);
309     boolean done = false;
310     lock.lock();
311     try {
312       while (state != COMPLETED) {
313         // wait for signal or timeout
314         try {
315           done = allCompleted.await(timeout, TimeUnit.SECONDS);
316           if (!done) {
317             done = markUncompleted();
318           }
319         } catch (InterruptedException e) { }
320         if (log.isDebugEnabled())
321           log.debug(this);
322       }
323     } finally {
324       lock.unlock();
325       /*
326        * Unexport this remote AckListener; note that calls to the
327        * AckListener.ack() method may occur after the remote AckListener
328        * has been unexported. This should be dealt with at the client-side, as
329        * this will cause a NoSuchObjectException
330        */
331 //      exporter.unexport(false);
332     }
333     return done;
334   }
335 
336 
337   private boolean markUncompleted()
338   {
339     boolean someCompleted = false;
340     if (missing > 0) {
341       for (int i = 0; i < completed.length; i++) {
342         if (!completed[i]) {
343           completed[i] = true;
344           results[i] = new RemoteException("Timeout for member");
345           if (log.isDebugEnabled())
346             log.debug("Timeout for member at position " + i);
347         } else {
348           someCompleted = true;
349         }
350       }
351     }
352     missing = 0;
353     updateState();
354     return someCompleted;
355   }
356 
357 
358   ////////////////////////////////////////////////////////////////////////////////////////////
359   // Methods from Object
360   ////////////////////////////////////////////////////////////////////////////////////////////
361 
362   public String toString()
363   {
364     StringBuilder b = new StringBuilder("GroupAckListener: missing=");
365     b.append(missing);
366     switch (state) {
367       case STARTING:
368         b.append(", STARTING");
369         break;
370 
371       case WAITING:
372         b.append(", WAITING");
373         break;
374 
375       case COMPLETED:
376         b.append(", COMPLETED");
377         break;
378     }
379     b.append(", hash=");
380     b.append(hashCode());
381     return b.toString();
382   }
383 
384 } // END GroupAckListener