1 /* 2 * Copyright (c) 1998-2006 The Jgroup Team. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2 as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU Lesser General Public License for more details. 12 * 13 * You should have received a copy of the GNU Lesser General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 16 * 17 */ 18 19 package jgroup.relacs.gmi.protocols; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.ObjectInputStream; 24 import java.util.Collections; 25 import java.util.HashMap; 26 import java.util.Map; 27 28 import jgroup.core.IID; 29 import jgroup.relacs.gm.TimestampService.Timestamp; 30 import jgroup.relacs.gmi.GroupInvocationDispatcher; 31 import jgroup.relacs.gmi.InvocationResult; 32 import jgroup.util.CacheMap; 33 34 import org.apache.log4j.Logger; 35 36 /** 37 * BasicDispatcher is the super class for all protocols, and must be subclassed. 38 * 39 * @author Hein Meling <hein.meling@uis.no> 40 */ 41 public class BasicDispatcher 42 implements ProtocolDispatcher 43 { 44 45 //////////////////////////////////////////////////////////////////////////////////////////// 46 // Logger 47 //////////////////////////////////////////////////////////////////////////////////////////// 48 49 /** Obtain logger for this class */ 50 private static final Logger log = Logger.getLogger(BasicDispatcher.class); 51 52 53 //////////////////////////////////////////////////////////////////////////////////////////// 54 // Static fields (common to all protocols) 55 //////////////////////////////////////////////////////////////////////////////////////////// 56 57 /** Map containing (con)current invocation identifiers */ 58 private static final Map<Thread,IID> iids = 59 Collections.synchronizedMap(new HashMap<Thread,IID>(7)); 60 61 62 //////////////////////////////////////////////////////////////////////////////////////////// 63 // Fields 64 //////////////////////////////////////////////////////////////////////////////////////////// 65 66 /** Cache of invocation results */ 67 private final CacheMap<Timestamp,InvocationResult> cache = 68 new CacheMap<Timestamp,InvocationResult>(); 69 70 /** Method invocation dispatcher */ 71 private final GroupInvocationDispatcher dispatcher; 72 73 74 //////////////////////////////////////////////////////////////////////////////////////////// 75 // Constructor 76 //////////////////////////////////////////////////////////////////////////////////////////// 77 78 public BasicDispatcher(GroupInvocationDispatcher dispatcher) 79 { 80 this.dispatcher = dispatcher; 81 } 82 83 84 //////////////////////////////////////////////////////////////////////////////////////////// 85 // ProtocolDispatcher interface 86 //////////////////////////////////////////////////////////////////////////////////////////// 87 88 /* (non-Javadoc) 89 * @see jgroup.relacs.gmi.protocols.ProtocolDispatcher#dispatch(java.io.InputStream) 90 */ 91 public InvocationResult dispatch(InputStream in) 92 throws IOException 93 { 94 try { 95 ObjectInputStream oin = new ObjectInputStream(in); 96 IID iid = (IID) oin.readObject(); 97 iids.put(Thread.currentThread(), iid); 98 99 InvocationResult result = checkCache(oin); 100 if (result == null) { 101 /* No timestamp attached with the invocation; just dispatch it */ 102 if (log.isDebugEnabled()) 103 log.debug(iid); 104 result = dispatcher.dispatch(oin); 105 } 106 return result; 107 108 } catch (IOException ioex) { 109 log.warn("Failed reading IID or Timestamp object from stream", ioex); 110 } catch (ClassNotFoundException cnfex) { 111 log.warn("ClassNotFoundException during unmarshalling of IID or Timestamp", cnfex); 112 } finally { 113 // Always remove the iid, independent of success or failure of the invocation 114 iids.remove(Thread.currentThread()); 115 } 116 return null; 117 } 118 119 /** 120 * Check if the stream contains a timestamp, and if it does check if the 121 * timestamp has already been executed locally; in which case we just return 122 * the previous result stored in the cache. Otherwise we either dispatch the 123 * invocation locally and store the result in the cache. <code>null</code> is 124 * returned if the stream has no timestamp. 125 */ 126 private InvocationResult checkCache(ObjectInputStream oin) 127 throws IOException, ClassNotFoundException 128 { 129 // FIXME need to clear the cache every once in a while; for instance 130 // in cases were a client group's member failed before performing the 131 // invocation while the other members did perform it. 132 133 InvocationResult result = null; 134 Timestamp ts = (Timestamp) oin.readObject(); 135 if (ts != null) { 136 if (log.isDebugEnabled()) 137 log.debug(ts.toString()); 138 /* Check the cache, since the invocation included a timestamp */ 139 synchronized (this) { 140 result = cache.get(ts); 141 if (result == null) { 142 /* Not in cache; dispatch it */ 143 result = dispatcher.dispatch(oin); 144 /* Insert the result into the cache */ 145 cache.put(ts, result, ts.viewSize); 146 if (log.isDebugEnabled()) 147 log.debug("Invocation not in cache; inserted it: " + ts.viewSize); 148 } else { 149 if (log.isDebugEnabled()) 150 151 log.debug("Invocation in cache; returning it."); 152 } 153 } 154 if (log.isDebugEnabled()) { 155 log.debug("Size of cache: " + cache.size()); 156 log.debug(cache.toString()); 157 } 158 } 159 return result; 160 } 161 162 /** 163 * The default implementation does not need access to the server object, and 164 * hence the call is ignored. Implementations that need access to the server 165 * object needs to override this method. 166 */ 167 public void setServer(Object server) { } 168 169 170 //////////////////////////////////////////////////////////////////////////////////////////// 171 // Public methods (static) 172 //////////////////////////////////////////////////////////////////////////////////////////// 173 174 /** 175 * Returns the invocation identifier (<code>IID</code>) for the currently 176 * executing method. 177 */ 178 public static IID getIdentifier() 179 { 180 return iids.get(Thread.currentThread()); 181 } 182 183 } // END BasicDispatcher