View Javadoc

1   /*
2    * Copyright (c) 1998-2006 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.relacs.gmi.protocols;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.ObjectInputStream;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Map;
27  
28  import jgroup.core.IID;
29  import jgroup.relacs.gm.TimestampService.Timestamp;
30  import jgroup.relacs.gmi.GroupInvocationDispatcher;
31  import jgroup.relacs.gmi.InvocationResult;
32  import jgroup.util.CacheMap;
33  
34  import org.apache.log4j.Logger;
35  
36  /**
37   * BasicDispatcher is the super class for all protocols, and must be subclassed.
38   *
39   * @author Hein Meling <hein.meling@uis.no>
40   */
41  public class BasicDispatcher
42    implements ProtocolDispatcher
43  {
44  
45    ////////////////////////////////////////////////////////////////////////////////////////////
46    // Logger
47    ////////////////////////////////////////////////////////////////////////////////////////////
48  
49    /** Obtain logger for this class */
50    private static final Logger log = Logger.getLogger(BasicDispatcher.class);
51  
52  
53    ////////////////////////////////////////////////////////////////////////////////////////////
54    // Static fields (common to all protocols)
55    ////////////////////////////////////////////////////////////////////////////////////////////
56  
57    /** Map containing (con)current invocation identifiers */
58    private static final Map<Thread,IID> iids =
59      Collections.synchronizedMap(new HashMap<Thread,IID>(7));
60  
61  
62    ////////////////////////////////////////////////////////////////////////////////////////////
63    // Fields
64    ////////////////////////////////////////////////////////////////////////////////////////////
65  
66    /** Cache of invocation results */
67    private final CacheMap<Timestamp,InvocationResult> cache =
68      new CacheMap<Timestamp,InvocationResult>();
69  
70    /** Method invocation dispatcher */
71    private final GroupInvocationDispatcher dispatcher;
72  
73  
74    ////////////////////////////////////////////////////////////////////////////////////////////
75    // Constructor
76    ////////////////////////////////////////////////////////////////////////////////////////////
77  
78    public BasicDispatcher(GroupInvocationDispatcher dispatcher)
79    {
80      this.dispatcher = dispatcher;
81    }
82  
83  
84    ////////////////////////////////////////////////////////////////////////////////////////////
85    // ProtocolDispatcher interface
86    ////////////////////////////////////////////////////////////////////////////////////////////
87  
88    /* (non-Javadoc)
89     * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
90     */
91    public InvocationResult dispatch(InputStream in)
92      throws IOException
93    {
94      try {
95        ObjectInputStream oin = new ObjectInputStream(in);
96        IID iid = (IID) oin.readObject();
97        iids.put(Thread.currentThread(), iid);
98  
99        InvocationResult result = checkCache(oin);
100       if (result == null) {
101         /* No timestamp attached with the invocation; just dispatch it */
102         if (log.isDebugEnabled())
103           log.debug(iid);
104         result = dispatcher.dispatch(oin);
105       }
106       return result;
107 
108     } catch (IOException ioex) {
109       log.warn("Failed reading IID or Timestamp object from stream", ioex);
110     } catch (ClassNotFoundException cnfex) {
111       log.warn("ClassNotFoundException during unmarshalling of IID or Timestamp", cnfex);
112     } finally {
113       // Always remove the iid, independent of success or failure of the invocation
114       iids.remove(Thread.currentThread());
115     }
116     return null;
117   }
118 
119   /**
120    * Check if the stream contains a timestamp, and if it does check if the
121    * timestamp has already been executed locally; in which case we just return
122    * the previous result stored in the cache.  Otherwise we either dispatch the
123    * invocation locally and store the result in the cache.  <code>null</code> is
124    * returned if the stream has no timestamp.
125    */
126   private InvocationResult checkCache(ObjectInputStream oin)
127     throws IOException, ClassNotFoundException
128   {
129     // FIXME need to clear the cache every once in a while; for instance
130     // in cases were a client group's member failed before performing the
131     // invocation while the other members did perform it.
132 
133     InvocationResult result = null;
134     Timestamp ts = (Timestamp) oin.readObject();
135     if (ts != null) {
136       if (log.isDebugEnabled())
137         log.debug(ts.toString());
138       /* Check the cache, since the invocation included a timestamp */
139       synchronized (this) {
140         result = cache.get(ts);
141         if (result == null) {
142           /* Not in cache; dispatch it */
143           result = dispatcher.dispatch(oin);
144           /* Insert the result into the cache */
145           cache.put(ts, result, ts.viewSize);
146           if (log.isDebugEnabled())
147             log.debug("Invocation not in cache; inserted it: " + ts.viewSize);
148         } else {
149           if (log.isDebugEnabled())
150 
151             log.debug("Invocation in cache; returning it.");
152         }
153       }
154       if (log.isDebugEnabled()) {
155         log.debug("Size of cache: " + cache.size());
156         log.debug(cache.toString());
157       }
158     }
159     return result;
160   }
161 
162   /**
163    * The default implementation does not need access to the server object, and
164    * hence the call is ignored. Implementations that need access to the server
165    * object needs to override this method.
166    */
167   public void setServer(Object server) { }
168 
169 
170   ////////////////////////////////////////////////////////////////////////////////////////////
171   // Public methods (static)
172   ////////////////////////////////////////////////////////////////////////////////////////////
173 
174   /**
175    * Returns the invocation identifier (<code>IID</code>) for the currently
176    * executing method.
177    */
178   public static IID getIdentifier()
179   {
180     return iids.get(Thread.currentThread());
181   }
182 
183 } // END BasicDispatcher