View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.rmi;
20  
21  import java.lang.reflect.InvocationTargetException;
22  import java.rmi.RemoteException;
23  import java.rmi.server.ExportException;
24  
25  import jgroup.core.MemberId;
26  import jgroup.core.View;
27  import jgroup.core.multicast.AckListener;
28  import jgroup.relacs.gm.DispatcherService;
29  import net.jini.export.Exporter;
30  import net.jini.jeri.BasicILFactory;
31  import net.jini.jeri.BasicJeriExporter;
32  import net.jini.jeri.tcp.TcpServerEndpoint;
33  
34  import org.apache.log4j.Logger;
35  
36  /**
37   *  This class is used to collect return values of synchronous internal
38   *  group method invocations. For each invocation, an object of this
39   *  class is instantiated. When an invocation is performed, method
40   *  notifyView() is invoked to communicate to this object the number and
41   *  the identities of the members included in the view in which the
42   *  method is executed.  Method ack() is used to add the return value
43   *  returned by the specified member.  Method viewChange() is used to
44   *  notify objects of this class that a new view has been installed and
45   *  members not responding are to be considered unreachable.
46   *
47   *  @author Alberto Montresor
48   *  @since Jgroup 0.9
49   */
50  public final class SynchAckListener
51    implements AckListener
52  {
53  
54    ////////////////////////////////////////////////////////////////////////////////////////////
55    // Logger
56    ////////////////////////////////////////////////////////////////////////////////////////////
57  
58    /** Obtain logger for this class */
59    private static final Logger log = Logger.getLogger(SynchAckListener.class);
60  
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Fields
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Array containing results */
67    private Object[] results;
68    
69    /** Boolean array for results */
70    private boolean[] completed;
71    
72    /** Counter */
73    private int missing;
74  
75    /** Dispatcher used to move the dispatcher */
76    private DispatcherService dispatcher;
77  
78    /** The BasicJeriExporter used by this AckListener */
79    private Exporter exporter;
80  
81  
82    ////////////////////////////////////////////////////////////////////////////////////////////
83    // Constructor
84    ////////////////////////////////////////////////////////////////////////////////////////////
85  
86    public SynchAckListener(DispatcherService dispatcher)
87    {
88      this.dispatcher = dispatcher;
89    }
90  
91    /**
92     *  Get a reference for the remote AckListener.
93     *
94     *  @return
95     *    A remote proxy for the <code>SynchAckListener</code>.
96     *  @throws ExportException
97     *    Raised if the <code>SyncAckListener</code> could not be exported.
98     */
99    public AckListener getRemoteAckListener()
100     throws ExportException
101   {
102     /*
103      * Create a new exporter on any available port for this AckListener,
104      * and enable the distributed garbage collection (DGC), and enable
105      * the keep alive attribute.
106      */
107     exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
108         new BasicILFactory(), true, true);
109     return (AckListener) exporter.export(this);
110   }
111 
112 
113   ////////////////////////////////////////////////////////////////////////////////////////////
114   // Methods from AckListener
115   ////////////////////////////////////////////////////////////////////////////////////////////
116 
117   public synchronized void notifyView(View view)
118     throws RemoteException
119   {
120     missing = view.size();
121     completed = new boolean[missing];
122     results = new Object[missing];
123     if (log.isDebugEnabled())
124       log.debug("SynchAckListener: notifyView: " + missing);
125   }
126 
127 
128   public synchronized void ack(MemberId id, int vpos, Object obj)
129     throws RemoteException
130   {
131     if (log.isDebugEnabled())
132       log.debug("SynchAckListener: ack: " + id + ", " + obj + ", " + vpos);
133     if (obj instanceof InvocationTargetException) {
134       Throwable targetException = ((InvocationTargetException) obj).getTargetException();
135       log.warn("Caught: ", targetException);
136     }
137 
138     if (!completed[vpos]) {
139       completed[vpos] = true;
140       results[vpos] = obj;
141       missing--;
142       if (missing == 0) {
143         if (log.isDebugEnabled())
144           log.debug("SynchAckListener: ack: completed");
145         notifyAll();
146       }
147     }
148   }
149 
150 
151   public synchronized void viewChange()
152     throws RemoteException
153   {
154     if (log.isDebugEnabled())
155       log.debug("SynchAckListener: viewChange");
156     if (missing > 0) {
157       for (int i=0; i < completed.length; i++) {
158         if (!completed[i]) {
159           completed[i] = true;
160           results[i] = new RemoteException("Member not reachable");
161           if (log.isDebugEnabled())
162             log.debug("SynchAckListener: viewChange: unreachable pos " + i);
163         }
164       }
165     }
166     missing = 0;
167     // Unexport this remote AckListener; no more remote calls should occur
168     exporter.unexport(false);
169     // Ensure that any waiting ends.
170     notifyAll();
171   }
172 
173 
174   /**
175    * Wait for and obtain the results.
176    */
177   public synchronized Object getResults()
178   {
179     if (log.isDebugEnabled())
180       log.debug("SynchAckListener: getResults");
181     while (missing > 0 || results == null) {
182       dispatcher.dispatch(this);
183     }
184     /*
185      * Unexport this remote AckListener; note that calls to the
186      * AckListener.viewChange() method may occur after the remote
187      * AckListener has been unexported.  This should be dealt with
188      * at the client-side, as this will cause a NoSuchObjectException
189      */
190     exporter.unexport(false);
191     if (log.isDebugEnabled())
192       log.debug("SynchAckListener: getResults completed");
193     return results;
194   }
195 
196 } // END SynchAckListener