View Javadoc

1   /*
2    * Copyright (c) 1998-2006 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  package com.sun.jini.mahalo;
19  
20  import java.lang.reflect.Field;
21  import java.rmi.RemoteException;
22  import java.security.AccessController;
23  import java.security.PrivilegedAction;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.Map;
28  
29  import jgroup.core.ExternalGMIListener;
30  import jgroup.core.GroupManager;
31  import jgroup.core.MembershipListener;
32  import jgroup.core.View;
33  import jgroup.core.protocols.Anycast;
34  import jgroup.core.protocols.Atomic;
35  import jgroup.jini.txn.InternalGroupTransactionManager;
36  import net.jini.core.lease.LeaseDeniedException;
37  import net.jini.core.transaction.CannotAbortException;
38  import net.jini.core.transaction.CannotCommitException;
39  import net.jini.core.transaction.CannotJoinException;
40  import net.jini.core.transaction.TimeoutExpiredException;
41  import net.jini.core.transaction.UnknownTransactionException;
42  import net.jini.core.transaction.server.CrashCountException;
43  import net.jini.core.transaction.server.TransactionManager;
44  import net.jini.core.transaction.server.TransactionParticipant;
45  
46  import org.apache.log4j.Logger;
47  
48  import com.sun.jini.mahalo.log.LogManager;
49  import com.sun.jini.start.LifeCycle;
50  import com.sun.jini.thread.TaskManager;
51  import com.sun.jini.thread.WakeupManager;
52  
53  /**
54   * Transient, non-persistent Transaction Manager with replication support.
55   * Uses GMI Leadercast semantics to create transaction and transfer state 
56   * to the replicas. GMI with total ordering is used in the transaction 
57   * join and voting process.<p>
58   * 
59   * To start the Transaction Manager, it is preferred to use the  
60   * {@link com.sun.jini.start.ServiceStarter} framework, which is included 
61   * in the Jini distribution.
62   * 
63   * @author Rohnny Moland
64   */
65  public class GroupTransientMahalo
66    implements TransactionManager, MembershipListener, ExternalGMIListener, InternalGroupTransactionManager
67  {
68  
69    ////////////////////////////////////////////////////////////////////////////////////////////
70    // Logger
71    ////////////////////////////////////////////////////////////////////////////////////////////
72    
73    private final static Logger log = Logger.getLogger(GroupTransientMahalo.class);
74  
75  
76    ////////////////////////////////////////////////////////////////////////////////////////////
77    // Fields
78    ////////////////////////////////////////////////////////////////////////////////////////////
79  
80    private static final long serialVersionUID = -1345915566687227879L;
81  
82    /** The mahalo object */
83    private TxnManagerImpl                  tm;
84    private InternalGroupTransactionManager igtm;
85    
86    /** Reflected fields from mahalo */
87    private Map                             txns;
88    private LogManager                      logmgr;
89    private TaskManager                     taskpool;
90    private WakeupManager                   taskWakeupMgr;
91  
92    /** Map holding new transactions which should be merged into txns when a state change occur */
93    private Map<Long,TxnManagerTransaction> tmpTxns;
94    
95    
96    ////////////////////////////////////////////////////////////////////////////////////////////
97    // Constructors
98    ////////////////////////////////////////////////////////////////////////////////////////////
99  
100   /**
101    * Construct a new instance of <code>TxnManagerImpl</code> and 
102    * initialize Jgroup services and fields. 
103    *
104    * @param configArgs <code>String</code> array whose elements are
105    *                   the arguments to use when creating the server.
106    * @param lifeCycle  instance of <code>LifeCycle</code> that, if
107    *                   non-<code>null</code>, will cause this object's
108    *                   <code>unregister</code> method to be invoked during
109    *                   shutdown to notify the service starter framework that
110    *                   the reference to this service's implementation can be
111    *                   'released' for garbage collection. A value of
112    *                   <code>null</code> for this argument is allowed.
113    *
114    * @throws Exception If there was a problem initializing the service.
115    */
116   public GroupTransientMahalo(String[] configArgs, LifeCycle lifeCycle) 
117     throws Exception
118   {
119     /** Obtains the group manager */
120     GroupManager gm = GroupManager.getGroupManager(this);
121     
122     igtm = (InternalGroupTransactionManager) 
123       gm.getService(InternalGroupTransactionManager.class);
124 
125     /** Create an instance of the mahalo transaction manager */
126     tm = new TxnManagerImpl(configArgs, lifeCycle, false);
127 
128     /** Get the necessary fields from mahalo using reflection */
129     this.txns = (Map) getField(tm, "txns");
130     this.logmgr = (LogManager) getField(tm, "logmgr");
131     this.taskpool = (TaskManager) getField(tm, "taskpool");
132     this.taskWakeupMgr = (WakeupManager) getField(tm, "taskWakeupMgr");
133     
134     tmpTxns = Collections.synchronizedMap(new HashMap<Long, TxnManagerTransaction>());
135   }
136 
137   
138   ////////////////////////////////////////////////////////////////////////////////////////////
139   // Methods from MembershipListener
140   ////////////////////////////////////////////////////////////////////////////////////////////
141   
142   /* (non-Javadoc)
143    * @see jgroup.core.MembershipListener#viewChange(jgroup.core.View)
144    */
145   public void viewChange(View view)
146   {
147     if (log.isDebugEnabled())
148       log.debug("view: " + view);
149   }
150 
151   
152   /* (non-Javadoc)
153    * @see jgroup.core.MembershipListener#prepareChange()
154    */
155   public void prepareChange()
156   {
157   }
158 
159 
160   /* (non-Javadoc)
161    * @see jgroup.core.MembershipListener#hasLeft()
162    */
163   public void hasLeft()
164   {
165   }
166  
167   
168   ////////////////////////////////////////////////////////////////////////////////////////////
169   // Methods from TransactionManager
170   ////////////////////////////////////////////////////////////////////////////////////////////
171 
172   /* (non-Javadoc)
173    * @see net.jini.core.transaction.server.TransactionManager#create(long)
174    */
175 //  @Leadercast
176 //  public TransactionManager.Created create(long lease)
177 //    throws LeaseDeniedException, RemoteException
178 //  {
179 //    TransactionManager.Created created = tm.create(lease);
180 //    if (log.isDebugEnabled()) 
181 //      log.debug("Created transaction with id: " + created.id);
182 //    
183 //    TxnManagerTransaction tmt = (TxnManagerTransaction) txns.get(new Long(created.id));
184 //    tmpTxns.put(new Long(created.id), tmt);
185 //    return created;
186 //  }
187 
188   
189   /* (non-Javadoc)
190    * @see net.jini.core.transaction.server.TransactionManager#create(long)
191    */
192   @Anycast
193   public TransactionManager.Created create(long lease) 
194     throws LeaseDeniedException, RemoteException
195   {
196     TransactionManager.Created created = tm.create(lease);
197     if (log.isDebugEnabled())
198       log.debug("Created transaction with id: " + created.id);
199 
200     TxnManagerTransaction tmt = (TxnManagerTransaction) txns.get(new Long(created.id));    
201     try {
202       igtm.internalCreate(tmt, created.id);
203     } catch (RemoteException e) {
204       log.debug(e.getMessage());
205     }
206     return created;
207   }
208   
209   
210   /* (non-Javadoc)
211    * @see net.jini.core.transaction.server.TransactionManager#join(long, 
212    * net.jini.core.transaction.server.TransactionParticipant, long)
213    */
214   @Atomic 
215   public void join(long id, TransactionParticipant part, long crashCount) 
216     throws UnknownTransactionException, CannotJoinException, CrashCountException, RemoteException
217   {
218     if (log.isDebugEnabled())
219       log.debug("Joining TM participant: " + part + " into transaction with ID=" + id);
220 
221     tm.join(id, part, crashCount);
222   }
223 
224 
225   /* (non-Javadoc)
226    * @see net.jini.core.transaction.server.TransactionManager#getState(long)
227    */
228   @Anycast 
229   public int getState(long id) throws UnknownTransactionException
230   {
231     int state = tm.getState(id);
232     if (log.isDebugEnabled())
233       log.debug("GroupTransientMahalo.getState() for txn id: " + id + " is " + state);
234     return state;
235   }
236 
237   
238   /* (non-Javadoc)
239    * @see net.jini.core.transaction.server.TransactionManager#commit(long)
240    */
241   @Atomic
242   public void commit(long id) 
243     throws UnknownTransactionException, CannotCommitException, RemoteException
244   {
245     if (log.isDebugEnabled())
246       log.debug("Started voting process for transaction with ID=" + id);
247     tm.commit(id);
248     
249     if (log.isDebugEnabled())
250       log.debug("GroupTransientMahalo.commit() finished with txn id: " + id);
251   }
252 
253   
254   /* (non-Javadoc)
255    * @see net.jini.core.transaction.server.TransactionManager#commit(long, long)
256    */
257   @Atomic 
258   public void commit(long id, long waitFor) 
259     throws UnknownTransactionException, CannotCommitException, TimeoutExpiredException,
260       RemoteException
261   {
262     tm.commit(id, waitFor);    
263   }
264 
265   
266   /* (non-Javadoc)
267    * @see net.jini.core.transaction.server.TransactionManager#abort(long)
268    */
269   @Atomic 
270   public void abort(long id) 
271     throws UnknownTransactionException, CannotAbortException
272   {
273     if (log.isDebugEnabled())
274       log.debug("Transaction " + id + " aborted.");
275     tm.abort(id);
276   }
277   
278   
279   /* (non-Javadoc)
280    * @see net.jini.core.transaction.server.TransactionManager#abort(long, long)
281    */
282   @Atomic 
283   public void abort(long id, long waitFor) 
284     throws UnknownTransactionException, CannotAbortException, TimeoutExpiredException
285   {
286     if (log.isDebugEnabled())
287       log.debug("Transaction " + id + " aborted.");
288     tm.abort(id, waitFor);
289   }
290 
291 
292   ////////////////////////////////////////////////////////////////////////////////////////////
293   // Methods from StateListener
294   ////////////////////////////////////////////////////////////////////////////////////////////
295 
296   /* (non-Javadoc)
297    * @see jgroup.core.StateListener#getState()
298    */
299   public Object getState()
300   {
301     Object txnsState = tmpTxns;
302 //    tmpTxns.clear();
303     /**
304      * Need a way to clear transactions which
305      * have already been transferred from leader
306      * to the followers.
307      */
308     return txnsState;
309   }
310 
311   
312   /* (non-Javadoc)
313    * @see jgroup.core.StateListener#putState(java.lang.Object)
314    */
315   @SuppressWarnings("unchecked")
316   public void putState(Object state)
317   {
318     setField(tm, "txns", this.txns);    
319     synchronized (txns) {
320       Map<Long, TxnManagerTransaction> transactions = (Map<Long, TxnManagerTransaction>) state;
321       for (Iterator<Long> iter = transactions.keySet().iterator(); iter.hasNext();) {
322         Long tmpTxnsID = iter.next();        
323         TxnManagerTransaction txnMgr = transactions.remove(tmpTxnsID);
324         
325         if (log.isDebugEnabled())
326           log.debug("TM NEWSTATE: Merged in transaction with ID=" + tmpTxnsID);
327         
328         setField(txnMgr, "logmgr", this.logmgr);
329         setField(txnMgr, "threadpool", this.taskpool);
330         setField(txnMgr, "wm", this.taskWakeupMgr);
331         setField(txnMgr, "settler", tm);
332         setField(txnMgr, "leaseLock", new Object());
333         setField(txnMgr, "jobLock", new Object());
334         setField(txnMgr, "stateLock", new Object());
335         txns.put(tmpTxnsID, txnMgr);
336       }
337     }
338   }
339 
340 
341   ////////////////////////////////////////////////////////////////////////////////////////////
342   // Methods from InternalGroupTransactionManager
343   ////////////////////////////////////////////////////////////////////////////////////////////
344 
345   /* (non-Javadoc)
346    * @see jgroup.jini.txn.InternalGroupTransactionManager#internalCreate(java.lang.Object, long)
347    */
348   @SuppressWarnings("unchecked")
349   public void internalCreate(Object txnMgr, long id) throws RemoteException
350   {
351     if (log.isDebugEnabled())
352       log.debug("internalCreate() - txn id: " + id);
353 
354     setField(tm, "txns", this.txns);
355     TxnManagerTransaction mgrTransaction = (TxnManagerTransaction) txnMgr;
356 
357     setField(mgrTransaction, "logmgr", this.logmgr);
358     setField(mgrTransaction, "threadpool", this.taskpool);
359     setField(mgrTransaction, "wm", this.taskWakeupMgr);
360     setField(mgrTransaction, "settler", tm);
361     setField(mgrTransaction, "leaseLock", new Object());
362     setField(mgrTransaction, "jobLock", new Object());
363     setField(mgrTransaction, "stateLock", new Object());
364    
365     txns.put(new Long(id), mgrTransaction);    
366   }
367 
368 
369   /* (non-Javadoc)
370    * @see jgroup.jini.txn.InternalGroupTransactionManager#internalCommit(long)
371    */
372   public void internalCommit(long id) 
373     throws UnknownTransactionException, CannotCommitException, RemoteException
374   {
375     // Well. This is not implemented at the moment.
376   }
377 
378 
379   ////////////////////////////////////////////////////////////////////////////////////////////
380   // Instance methods
381   ////////////////////////////////////////////////////////////////////////////////////////////
382     
383   /**
384    * Returns value of a field using reflection.
385    * 
386    * @param obj Object that contains the field to be reflected
387    * @param name Name of field to be reflected
388    * @return Value of the reflected field  
389    */
390   private Object getField(Object obj, String name) 
391   { 
392     Class c = obj.getClass();    
393     final Field field;
394     Object tmp = null;
395        
396     try {
397       field = c.getDeclaredField(name);      
398       AccessController.doPrivileged(new PrivilegedAction<Object>()
399       {
400         public Object run() {
401           field.setAccessible(true);
402           return null;
403         }
404       });
405       
406       tmp = field.get(obj);
407     } catch (SecurityException e) {
408       log.error(e.getMessage());
409     } catch (NoSuchFieldException e) {
410       log.error(e.getMessage());
411     } catch (IllegalArgumentException e) {
412       log.error(e.getMessage());
413     } catch (IllegalAccessException e) {
414       log.error(e.getMessage());
415     }
416     return tmp;
417   }
418   
419   
420   /**
421    * Set new value of a field using reflection.
422    * 
423    * @param obj Object that contains the field to be reflected
424    * @param name Name of field to be reflected
425    * @param value New value of reflected field
426    */
427   private void setField(Object obj, String name, Object value)
428   {
429     Class c = obj.getClass();   
430     final Field field;
431        
432     try {
433       field = c.getDeclaredField(name);
434       AccessController.doPrivileged(new PrivilegedAction<Object>()
435       {
436         public Object run() {
437           field.setAccessible(true);
438           return null;
439         }
440       });
441       
442       field.set(obj, value);
443     } catch (SecurityException e) {
444       log.error(e.getMessage());
445     } catch (NoSuchFieldException e) {
446       log.error(e.getMessage());
447     } catch (IllegalArgumentException e) {
448       log.error(e.getMessage());
449     } catch (IllegalAccessException e) {
450       log.error(e.getMessage());
451     } 
452   }
453 
454 
455   public String toString()
456   {
457     return "GroupTransientMahalo";
458   }
459 
460 
461   public int hashCode() 
462   {
463     // FIXME..
464     return tmpTxns.hashCode();
465   }
466 
467 } // END GroupTransientMahalo