1 /*
2 * Copyright (c) 1998-2006 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.gmi.protocols;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.ObjectInputStream;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Map;
27
28 import jgroup.core.IID;
29 import jgroup.relacs.gm.TimestampService.Timestamp;
30 import jgroup.relacs.gmi.GroupInvocationDispatcher;
31 import jgroup.relacs.gmi.InvocationResult;
32 import jgroup.util.CacheMap;
33
34 import org.apache.log4j.Logger;
35
36 /**
37 * BasicDispatcher is the super class for all protocols, and must be subclassed.
38 *
39 * @author Hein Meling <hein.meling@uis.no>
40 */
41 public class BasicDispatcher
42 implements ProtocolDispatcher
43 {
44
45 ////////////////////////////////////////////////////////////////////////////////////////////
46 // Logger
47 ////////////////////////////////////////////////////////////////////////////////////////////
48
49 /** Obtain logger for this class */
50 private static final Logger log = Logger.getLogger(BasicDispatcher.class);
51
52
53 ////////////////////////////////////////////////////////////////////////////////////////////
54 // Static fields (common to all protocols)
55 ////////////////////////////////////////////////////////////////////////////////////////////
56
57 /** Map containing (con)current invocation identifiers */
58 private static final Map<Thread,IID> iids =
59 Collections.synchronizedMap(new HashMap<Thread,IID>(7));
60
61
62 ////////////////////////////////////////////////////////////////////////////////////////////
63 // Fields
64 ////////////////////////////////////////////////////////////////////////////////////////////
65
66 /** Cache of invocation results */
67 private final CacheMap<Timestamp,InvocationResult> cache =
68 new CacheMap<Timestamp,InvocationResult>();
69
70 /** Method invocation dispatcher */
71 private final GroupInvocationDispatcher dispatcher;
72
73
74 ////////////////////////////////////////////////////////////////////////////////////////////
75 // Constructor
76 ////////////////////////////////////////////////////////////////////////////////////////////
77
78 public BasicDispatcher(GroupInvocationDispatcher dispatcher)
79 {
80 this.dispatcher = dispatcher;
81 }
82
83
84 ////////////////////////////////////////////////////////////////////////////////////////////
85 // ProtocolDispatcher interface
86 ////////////////////////////////////////////////////////////////////////////////////////////
87
88 /* (non-Javadoc)
89 * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream)
90 */
91 public InvocationResult dispatch(InputStream in)
92 throws IOException
93 {
94 try {
95 ObjectInputStream oin = new ObjectInputStream(in);
96 IID iid = (IID) oin.readObject();
97 iids.put(Thread.currentThread(), iid);
98
99 InvocationResult result = checkCache(oin);
100 if (result == null) {
101 /* No timestamp attached with the invocation; just dispatch it */
102 if (log.isDebugEnabled())
103 log.debug(iid);
104 result = dispatcher.dispatch(oin);
105 }
106 return result;
107
108 } catch (IOException ioex) {
109 log.warn("Failed reading IID or Timestamp object from stream", ioex);
110 } catch (ClassNotFoundException cnfex) {
111 log.warn("ClassNotFoundException during unmarshalling of IID or Timestamp", cnfex);
112 } finally {
113 // Always remove the iid, independent of success or failure of the invocation
114 iids.remove(Thread.currentThread());
115 }
116 return null;
117 }
118
119 /**
120 * Check if the stream contains a timestamp, and if it does check if the
121 * timestamp has already been executed locally; in which case we just return
122 * the previous result stored in the cache. Otherwise we either dispatch the
123 * invocation locally and store the result in the cache. <code>null</code> is
124 * returned if the stream has no timestamp.
125 */
126 private InvocationResult checkCache(ObjectInputStream oin)
127 throws IOException, ClassNotFoundException
128 {
129 // FIXME need to clear the cache every once in a while; for instance
130 // in cases were a client group's member failed before performing the
131 // invocation while the other members did perform it.
132
133 InvocationResult result = null;
134 Timestamp ts = (Timestamp) oin.readObject();
135 if (ts != null) {
136 if (log.isDebugEnabled())
137 log.debug(ts.toString());
138 /* Check the cache, since the invocation included a timestamp */
139 synchronized (this) {
140 result = cache.get(ts);
141 if (result == null) {
142 /* Not in cache; dispatch it */
143 result = dispatcher.dispatch(oin);
144 /* Insert the result into the cache */
145 cache.put(ts, result, ts.viewSize);
146 if (log.isDebugEnabled())
147 log.debug("Invocation not in cache; inserted it: " + ts.viewSize);
148 } else {
149 if (log.isDebugEnabled())
150
151 log.debug("Invocation in cache; returning it.");
152 }
153 }
154 if (log.isDebugEnabled()) {
155 log.debug("Size of cache: " + cache.size());
156 log.debug(cache.toString());
157 }
158 }
159 return result;
160 }
161
162 /**
163 * The default implementation does not need access to the server object, and
164 * hence the call is ignored. Implementations that need access to the server
165 * object needs to override this method.
166 */
167 public void setServer(Object server) { }
168
169
170 ////////////////////////////////////////////////////////////////////////////////////////////
171 // Public methods (static)
172 ////////////////////////////////////////////////////////////////////////////////////////////
173
174 /**
175 * Returns the invocation identifier (<code>IID</code>) for the currently
176 * executing method.
177 */
178 public static IID getIdentifier()
179 {
180 return iids.get(Thread.currentThread());
181 }
182
183 } // END BasicDispatcher