1 /*
2 *
3 * Copyright 2005 Sun Microsystems, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 * Note: Changes by the Jgroup team are marked ** GAHALO **.
18 *
19 */
20 package com.sun.jini.mahalo;
21
22 import java.io.Serializable;
23 import java.rmi.RemoteException;
24 import java.util.Date;
25 import java.util.Enumeration;
26 import java.util.List;
27 import java.util.Vector;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30
31 import net.jini.core.transaction.CannotAbortException;
32 import net.jini.core.transaction.CannotCommitException;
33 import net.jini.core.transaction.CannotJoinException;
34 import net.jini.core.transaction.TimeoutExpiredException;
35 import net.jini.core.transaction.Transaction;
36 import net.jini.core.transaction.TransactionException;
37 import net.jini.core.transaction.server.CrashCountException;
38 import net.jini.core.transaction.server.ServerTransaction;
39 import net.jini.core.transaction.server.TransactionConstants;
40 import net.jini.core.transaction.server.TransactionManager;
41 import net.jini.core.transaction.server.TransactionParticipant;
42 import net.jini.id.Uuid;
43 import net.jini.security.ProxyPreparer;
44
45 import com.sun.jini.constants.TimeConstants;
46 import com.sun.jini.constants.TxnConstants;
47 import com.sun.jini.landlord.LeasedResource;
48 import com.sun.jini.logging.Levels;
49 import com.sun.jini.mahalo.log.ClientLog;
50 import com.sun.jini.mahalo.log.LogException;
51 import com.sun.jini.mahalo.log.LogManager;
52 import com.sun.jini.thread.TaskManager;
53 import com.sun.jini.thread.WakeupManager;
54
55 /**
56 * TxnManagerTransaction is a class which
57 * captures the internal representation of a transaction
58 * in the TxnManagerImpl server. This class is associated
59 * with a transaction id.
60 * The information encapsulated includes the list of participants
61 * which have joined the transaction, the state of the
62 * transaction, the crash.
63 *
64 * The user of a ParticipantHolder must make the association
65 * between an instance of a ParticipantHolder and some sort
66 * of key or index.
67 *
68 * @author Sun Microsystems, Inc.
69 *
70 * ** GAHALO ** Added Serializable support.
71 */
72 class TxnManagerTransaction
73 implements TransactionConstants, TimeConstants, LeasedResource, Serializable
74 {
75 static final long serialVersionUID = -2088463193687796098L;
76
77 /*
78 * Table of valid state transitions which a
79 * TransactionManager may make.
80 *
81 * This represents the following diagram from the
82 * Jini(TM) Transaction Spec:
83 *
84 * ACTIVE ----> VOTING ------->COMMITTED
85 * \ |
86 * \ |
87 * ---------------------->ABORTED
88 *
89 *
90 * ACTIVE VOTING PREPARED NOTCHANGED COMMITTED ABORTED
91 * ----------------------------------------------------------------------
92 * ACTIVE true true false false false true
93 * VOTING false true false false true true
94 * PREPARED false false false false false false
95 * NOTCHANGED false false false false false false
96 * COMMITTED false false false false true false
97 * ABORTED false false false false false true
98 *
99 * The table is indexed using the ordered pair
100 * <current_state, next_state>. A value of true means
101 * that the transition is possible, while a false
102 * means that the transition is not possible.
103 *
104 * Note: Some rows are "{false, false, false, false}" as
105 * unused filler to account for the fact that the
106 * TransactionManager's valid states are a subset
107 * of the TransactionConstants.
108 *
109 * <zero>
110 * ACTIVE = 1
111 * VOTING = 2
112 * PREPARED = 3
113 * NOTCHANGED = 4
114 * COMMITTED = 5
115 * ABORTED = 6
116 */
117 private static final boolean states[][] = {
118 /* <zero> */ {false, false, false, false, false, false, false},
119 /* ACTIVE */ {false, true, true, false, false, false, true},
120 /* VOTING */ {false, false, true, false, false, true, true},
121 /* PREPARED */ {false, false, false, false, false, false, false},
122 /* NOTCHANGED*/ {false, false, false, false, false, false, false},
123 /* COMMITTED */ {false, false, false, false, false, true, false},
124 /* ABORTED */ {false, false, false, false, false, false, true}};
125
126 /**
127 * @serial
128 */
129 private List parts = new Vector();
130
131 /**
132 * @serial
133 */
134 private final ServerTransaction str;
135
136 /**
137 * @serial
138 */
139 private int trstate;
140
141 /**
142 * @serial
143 */
144 private long expires; //expiration time
145
146 /**
147 * ** GAHALO ** Set field to transient.
148 */
149 private transient LogManager logmgr;
150
151
152 /**
153 * "Parallelizing" the interaction between the manager
154 * and participants means using threads to interact with
155 * participants on behalf of the manager. In the thread
156 * pool model, a TaskManager provides a finite set of
157 * threads used to accomplish a variety of tasks.
158 * A Job encapsulates a body of work which needs to be
159 * performed during the two-phase commit: preparing,
160 * committing and aborting. Each work item is broken
161 * into smaller pieces of work- interactions with a
162 * single participant- each assigned to a task.
163 *
164 * When a transaction is committing, a PrepareJob is
165 * created and its tasks are scheduled. After completion,
166 * the PrepareJob's outcome is computed. Depending on
167 * the outcome, either an AbortJob or CommitJob is
168 * scheduled. When a transaction is aborted, an AbortJob
169 * is scheduled.
170 *
171 * A caller may specify a timeout value for commit/abort.
172 * The timeout represents the length of time a caller is
173 * willing to wait for participants to be instructed to
174 * roll-forward/back. Should this timeout expire, a
175 * TimeoutExpiredException is thrown. This causes the
176 * caller's thread to return back to the caller. Someone
177 * needs to finish contacting all the participants. This
178 * is accomplished by the SettlerTask. SettlerTasks must
179 * use a different thread pool from what is used by the
180 * various Job types, otherwise deadlock will occur.
181 *
182 * ** GAHALO ** Set field to transient.
183 */
184 private transient TaskManager threadpool;
185
186 /**
187 * ** GAHALO ** Set field to transient.
188 */
189 private transient WakeupManager wm;
190
191 /**
192 * ** GAHALO ** Set field to transient.
193 */
194 private transient TxnSettler settler;
195
196 /**
197 * ** GAHALO ** Set field to transient.
198 */
199 private transient Job job;
200
201 /**
202 * @serial
203 */
204 private Uuid uuid;
205
206 /**
207 * Interlock for the expiration time since
208 * lease renewal which set it may compete
209 * against lease checks which read it.
210 *
211 * ** GAHALO ** Set field to transient.
212 */
213 private transient Object leaseLock = new Object();
214
215
216 /**
217 * Interlock for Jobs is needed since many
218 * threads on behalf of many clients can
219 * simultaneously access or modify the Job
220 * associated with this transaction when
221 * when attempting to prepare, roll forward
222 * or roll back participants.
223 *
224 * ** GAHALO ** Set field to transient.
225 */
226 private transient Object jobLock = new Object();
227
228
229 /**
230 * Interlock for transaction state needed
231 * since many threads on behalf of many
232 * clients can simultaneously access or
233 * attempt to modify this transaction's
234 * state as a side effect of calling
235 * commit or abort.
236 *
237 * ** GAHALO ** Set field to transient.
238 */
239 private transient Object stateLock = new Object();
240
241
242 /** Logger for operation related messages */
243 private static final Logger operationsLogger =
244 TxnManagerImpl.operationsLogger;
245
246 /** Logger for transaction related messages */
247 private static final Logger transactionsLogger =
248 TxnManagerImpl.transactionsLogger;
249
250 /**
251 * Constructs a <code>TxnManagerTransaction</code>
252 *
253 * @param mgr <code>TransactionManager</code> which owns
254 * this internal representation.
255 * @param logmgr <code>LogManager</code> responsible for
256 * recording COMMITTED and ABORTED transactions
257 * to stable storage.
258 *
259 * @param id The transaction id
260 *
261 * @param threadpool The <code>TaskManager</code> which provides
262 * the pool of threads used to interact with
263 * participants.
264 *
265 * @param settler TxnSettler responsible for this transaction if
266 * unsettled.
267 */
268 TxnManagerTransaction(TransactionManager mgr, LogManager logmgr, long id,
269 TaskManager threadpool, WakeupManager wm, TxnSettler settler,
270 Uuid uuid)
271 {
272 if (logmgr == null)
273 throw new IllegalArgumentException("TxnManagerTransaction: " +
274 "log manager must be non-null");
275 if (mgr == null)
276 throw new IllegalArgumentException("TxnManagerTransaction: " +
277 "transaction manager must be non-null");
278
279 if (threadpool == null)
280 throw new IllegalArgumentException("TxnManagerTransaction: " +
281 "threadpool must be non-null");
282
283 if (wm == null)
284 throw new IllegalArgumentException("TxnManagerTransaction: " +
285 "wakeup manager must be non-null");
286
287 if (settler == null)
288 throw new IllegalArgumentException("TxnManagerTransaction: " +
289 "settler must be non-null");
290
291 if (uuid == null)
292 throw new IllegalArgumentException("TxnManagerTransaction: " +
293 "uuid must be non-null");
294
295 this.threadpool = threadpool;
296 this.wm = wm;
297 this.logmgr = logmgr ;
298 str = new ServerTransaction(mgr, id);
299 this.settler = settler;
300 this.uuid = uuid;
301
302 trstate = ACTIVE; //this is implied since ACTIVE is initial state
303 // Expires is set after object is created when the associated
304 // lease is constructed.
305 }
306
307
308 /**
309 * Convenience method which adds a given <code>ParticipantHandle</code>
310 * to the set of <code>ParticpantHandle</code>s associated with this
311 * transaction.
312 *
313 * @param handle The added handle
314 */
315 void add(ParticipantHandle handle)
316 throws InternalManagerException
317 {
318 if (operationsLogger.isLoggable(Level.FINER)) {
319 operationsLogger.entering(TxnManagerTransaction.class.getName(),
320 "add", handle);
321 }
322
323 if (handle == null)
324 throw new NullPointerException("ParticipantHolder: add: " +
325 "cannot add null handle");
326
327 //NOTE: if the same participant re-joins, then that is
328 // fine.
329
330 try {
331 if (transactionsLogger.isLoggable(Level.FINEST)) {
332 transactionsLogger.log(Level.FINEST,
333 "Adding ParticipantHandle: {0}", handle);
334 }
335 parts.add(handle);
336 } catch (Exception e) {
337 if (transactionsLogger.isLoggable(Level.SEVERE)) {
338 transactionsLogger.log(Level.SEVERE,
339 "Unable to add ParticipantHandle", e);
340 }
341 throw new InternalManagerException("TxnManagerTransaction: " +
342 "add: " + e.getMessage());
343 }
344 if (operationsLogger.isLoggable(Level.FINER)) {
345 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
346 "add");
347 }
348 }
349
350 /**
351 * Convenience method which allows the caller to modify the
352 * prepState associated with a given <code>ParticipantHandle</code>
353 *
354 * @param handle The <code>ParticipantHandle</code> being modified
355 *
356 * @param state The new prepstate
357 */
358 void modifyParticipant(ParticipantHandle handle, int state) {
359 if (operationsLogger.isLoggable(Level.FINER)) {
360 operationsLogger.entering(TxnManagerTransaction.class.getName(),
361 "modifyParticipant", new Object[] {handle, new Integer(state)});
362 }
363 ParticipantHandle ph = null;
364
365 if (handle == null)
366 throw new NullPointerException("ParticipantHolder: " +
367 "modifyParticipant: cannot modify null handle");
368
369 if (parts.contains(ph))
370 ph = (ParticipantHandle) parts.get(parts.indexOf(handle));
371
372 if (ph == null) {
373 if (operationsLogger.isLoggable(Level.FINER)) {
374 operationsLogger.exiting(
375 TxnManagerTransaction.class.getName(),
376 "modifyParticipant");
377 }
378 //TODO - ignore??
379 return;
380 }
381
382 ph.setPrepState(state);
383 if (operationsLogger.isLoggable(Level.FINER)) {
384 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
385 "modifyParticipant");
386 }
387 }
388
389
390 /**
391 * Changes the manager-side state of the transaction. This
392 * method makes only valid state changes and informs the
393 * caller if the change was successful. Calls to this method
394 * synchronize around the manager-side state variable appropriately.
395 *
396 * @param int state the new desired state
397 */
398 boolean modifyTxnState(int state) {
399 if (operationsLogger.isLoggable(Level.FINER)) {
400 operationsLogger.entering(TxnManagerTransaction.class.getName(),
401 "modifyTxnState", new Integer(state));
402 }
403 boolean result = false;
404 synchronized (stateLock) {
405 switch (state) {
406 case ACTIVE:
407 case VOTING:
408 case COMMITTED:
409 case ABORTED:
410 result = states[trstate][state];
411 break;
412
413 default:
414 throw new IllegalArgumentException("TxnManagerTransaction: " +
415 "modifyTxnState: invalid state");
416 }
417
418 if (result)
419 trstate = state;
420 }
421 if (operationsLogger.isLoggable(Level.FINER)) {
422 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
423 "modifyTxnState", Boolean.valueOf(result));
424 }
425 return result;
426 }
427
428
429 /**
430 * Implementation of the join method.
431 *
432 * @param part The joining <code>TransactionParticpant</code>
433 *
434 * @param crashCount The crashcount associated with the joining
435 * <code>TransactionParticipant</code>
436 *
437 * @see net.jini.core.transaction.server.TransactionParticipant
438 */
439 public void
440 join(TransactionParticipant preparedPart, long crashCount)
441 throws CannotJoinException, CrashCountException, RemoteException
442 {
443 if (operationsLogger.isLoggable(Level.FINER)) {
444 operationsLogger.entering(TxnManagerTransaction.class.getName(),
445 "join", new Object[] {preparedPart, new Long(crashCount)});
446 }
447 //if the lease has expired, or the state is not
448 //amenable there is no need to continue
449
450 if (getState() != ACTIVE)
451 throw new CannotJoinException("not active");
452
453 if ((getState() == ACTIVE) && (ensureCurrent() == false)) {
454 doAbort(0);
455 throw new CannotJoinException("Lease expired");
456 }
457
458 //Create a ParticipantHandle for the new participant
459 //and mark the transactional state as ACTIVE
460 try {
461 ParticipantHandle ph =
462 new ParticipantHandle(preparedPart, crashCount);
463 ParticipantHandle phtmp = (ParticipantHandle)
464 ((parts.contains(ph))?
465 parts.get(parts.indexOf(ph)):
466 null);
467
468 if (transactionsLogger.isLoggable(Level.FINEST)) {
469 transactionsLogger.log(Level.FINEST,
470 "Retrieved ParticipantHandle: {0}", phtmp);
471 }
472 if (phtmp != null) {
473 long oldcount = phtmp.getCrashCount();
474 if (oldcount == crashCount) {
475 return;
476 } else {
477 throw new CrashCountException("TxnManagerTransaction: " +
478 "join: old = " + oldcount +
479 " new = " + crashCount);
480 }
481 }
482
483 add(ph);
484
485 } catch (InternalManagerException ime) {
486 if (transactionsLogger.isLoggable(Level.SEVERE)) {
487 transactionsLogger.log(Level.SEVERE,
488 "TransactionParticipant unable to join", ime);
489 }
490 throw ime;
491 } catch (RemoteException re) {
492 if (transactionsLogger.isLoggable(Level.FINEST)) {
493 transactionsLogger.log(Level.FINEST,
494 "TransactionParticipant unable to be stored", re);
495 }
496 throw re;
497 }
498 if (operationsLogger.isLoggable(Level.FINER)) {
499 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
500 "join");
501 }
502 }
503
504
505 /**
506 * This method returns the state of the transaction.
507 * Since the purpose of the set of ParticipantHolders is
508 * to associate a Transaction with a group of
509 * participants joined the transaction, we would like
510 * to get the state of the transaction associated with
511 * the aforementioned set.
512 *
513 */
514 public int getState() {
515 if (operationsLogger.isLoggable(Level.FINER)) {
516 operationsLogger.entering(TxnManagerTransaction.class.getName(),
517 "getState");
518 }
519 synchronized (stateLock) {
520 if (operationsLogger.isLoggable(Level.FINER)) {
521 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
522 "getState", new Integer(trstate));
523 }
524 return trstate;
525 }
526 }
527
528
529 /**
530 * Commits the transaction.
531 * This initiates the two-phase commit protocol. First,
532 * each <code>net.jini.core.transaction.server.TransactionParticipant</code>
533 * in the set of participants joined in the
534 * <code>net.jini.core.transaction.server.Transaction</code>
535 * is instructed to vote and the votes are tallied. This is the
536 * first phase (prepare phase).
537 *
538 * Depending on the outcome of the votes, the transaction
539 * is considered committed or aborted. Once commit/abort
540 * status is known, the participants are notified with
541 * a message to either roll-forward (commit case) or
542 * roll-back (abort case). This is the roll-phase.
543 *
544 * Since there may be a one-to-many relationship between
545 * a transaction and its participants,
546 * <code>com.sun.jini.thread.TaskManager</code>s are used
547 * as a generic mechanism to provide the threads needed
548 * to interact with the participants.
549 *
550 */
551 void commit(long waitFor)
552 throws CannotCommitException, TimeoutExpiredException, RemoteException
553 {
554 if (operationsLogger.isLoggable(Level.FINER)) {
555 operationsLogger.entering(TxnManagerTransaction.class.getName(),
556 "commit", new Long(waitFor));
557 }
558 long starttime = System.currentTimeMillis();
559
560 //If the transaction has already expired or the state
561 //is not amenable, don't even try to continue
562
563 if ((getState() == ACTIVE) && (ensureCurrent() == false)) {
564 doAbort(0);
565 throw new CannotCommitException("Lease expired");
566 }
567
568 if (getState() == ABORTED)
569 throw new CannotCommitException("attempt to commit " +
570 "ABORTED transaction");
571
572
573 //Check to see if anyone joined the transaction. Even
574 //if no one has joined, at this point, attempt to
575 //get to the COMMITTED state through valid state changes
576
577 Vector joinvec = parthandles();
578
579 if (joinvec == null) {
580 if (!modifyTxnState(VOTING))
581 throw new CannotCommitException("attempt to commit " +
582 "ABORTED transaction");
583
584 if (modifyTxnState(COMMITTED))
585 return;
586 else
587 throw new CannotCommitException("attempt to commit " +
588 "ABORTED transaction");
589 }
590
591 try {
592 Enumeration joined = joinvec.elements();
593 int numparts = joinvec.size();
594 ParticipantHandle[] phs = new ParticipantHandle[numparts];
595 joinvec.copyInto(phs);
596
597 long now = starttime;
598 long transpired = 0;
599 long remainder = 0;
600
601 ClientLog log = logmgr.logFor(str.id);
602
603 if (transactionsLogger.isLoggable(Level.FINEST)) {
604 transactionsLogger.log(Level.FINEST,
605 "{0} TransactionParticipants have joined",
606 new Integer(numparts));
607 }
608
609 //If commit is called after recovery, do not
610 //log a CommitRecord since it already exists
611 //exists in the Log file for this transaction.
612 //Remember that a log is not invalidated until
613 //after the transaction is committed.
614 //
615 //Only new occurrences of activities requiring
616 //logging which happen after recovery should
617 //be added to the log file. So, we add records
618 //for voting and roll forward/back activity for
619 //ACTIVE participants.
620 //
621 //If the state cannot validly transition to VOTING,
622 //it is either because someone already aborted or
623 //committed. Only throw an exception if someone
624 //has previously aborted. In the case of a prior
625 //commit, fall through and wait for the CommitJob
626 //to complete.
627
628 int oldstate = getState();
629 Integer result = new Integer(ABORTED);
630 Exception alternateException = null;
631
632
633 //On an ACTIVE to VOTING transition, create
634 //and schedule a Prepare or PrepareAndCommitJob.
635 //If the state transition is VOTING to VOTING,
636 //then the PrepareJob has already been created
637 //in the past, so just fall through and wait
638 //for it to complete.
639 //
640 //Only log the commit on ACTIVE to VOTING
641 //transitions.
642
643 if (modifyTxnState(VOTING)) {
644
645 if (oldstate == ACTIVE)
646 log.write(new CommitRecord(phs));
647
648
649 //preparing a participant can never override
650 //the other activities (abort or commit),
651 //so only set when the job is null.
652
653 synchronized (jobLock) {
654 if (job == null) {
655 if (phs.length == 1)
656 job = new
657 PrepareAndCommitJob(
658 str, threadpool, wm, log, phs[0]);
659 else
660 job = new PrepareJob(str, threadpool, wm, log, phs);
661
662 job.scheduleTasks();
663 }
664 }
665
666 //Wait for the PrepareJob to complete.
667 //PrepareJobs are given maximum time for
668 //completion. This is required in order to
669 //know the transaction's completion status.
670 //Remember that the timeout ONLY controls how
671 //long the caller is willing to wait to inform
672 //participants. This means that a completion
673 //status for the transaction MUST be computed
674 //before consulting the timeout.
675 //Timeout is ignored until completion status
676 //is known. If extra time is left, wait for
677 //the remainder to inform participants.
678
679 //We must explicitly check for Job type
680 //because someone else could have aborted
681 //the transaction at this point.
682
683
684 synchronized (jobLock) {
685 if ((job instanceof PrepareJob) ||
686 (job instanceof PrepareAndCommitJob)) {
687 try {
688 if (job.isCompleted(Long.MAX_VALUE)) {
689 result = (Integer) job.computeResult();
690 if (result.intValue() == ABORTED &&
691 job instanceof PrepareAndCommitJob) {
692 PrepareAndCommitJob pj =
693 (PrepareAndCommitJob)job;
694 alternateException =
695 pj.getAlternateException();
696 }
697 }
698 } catch (JobNotStartedException jnse) {
699 //no participants voted, so do nothing
700 result = new Integer(NOTCHANGED);
701 } catch (ResultNotReadyException rnre) {
702 //consider aborted
703 } catch (JobException je) {
704 //consider aborted
705 }
706 }
707 }
708 } else {
709 //Cannot be VOTING, so we either have
710 //an abort or commit in progress.
711
712 if (getState() == ABORTED)
713 throw new CannotCommitException("transaction ABORTED");
714
715
716 //If a CommitJob is already in progress
717 //(the state is COMMITTED) cause a fall
718 //through to the code which waits for
719 //the CommitJob to complete.
720
721 if (getState() == COMMITTED)
722 result = new Integer(COMMITTED);
723 }
724
725 if (transactionsLogger.isLoggable(Level.FINEST)) {
726 transactionsLogger.log(Level.FINEST,
727 "Voting result: {0}",
728 TxnConstants.getName(result.intValue()));
729 }
730
731 switch (result.intValue()) {
732 case NOTCHANGED:
733 break;
734
735 case ABORTED:
736 now = System.currentTimeMillis();
737 transpired = now - starttime;
738 remainder = waitFor - transpired;
739
740 if (remainder >=0)
741 doAbort(remainder);
742 else
743 doAbort(0);
744
745 if (alternateException == null) {
746 throw new CannotCommitException(
747 "Unable to commit transaction: "
748 + getParticipantInfo());
749 } else {
750 throw new RemoteException(
751 "Problem communicating with participant",
752 alternateException);
753 }
754
755 case PREPARED:
756 //This entrypoint is entered if a PrepareJob
757 //tallied the votes with an outcome of
758 //PREPARED. In order to inform participants,
759 //a CommitJob must be scheduled.
760
761 if(modifyTxnState(COMMITTED)) {
762 //TODO - log committed state record?
763 synchronized (jobLock) {
764 job = new CommitJob(str, threadpool, wm, log, phs);
765 job.scheduleTasks();
766 }
767 } else {
768 throw new CannotCommitException("attempt to commit " +
769 "ABORTED transaction");
770 }
771
772 //Fall through to code with waits
773 //for CommitJob to complete.
774
775
776 case COMMITTED:
777 //This entrypoint is the starting place for the code
778 //which waits for a CommitJob to complete and
779 //computes its resulting outcome. In addition,
780 //the wait time is enforced. Should the wait time
781 //expire, a SettlerTask is scheduled on a thread
782 //pool. The SettlerTask is needed to complete
783 //the commit (instruct participants to roll-forward)
784 //on behalf of the thread which exits when
785 //the TimeoutExpiredException is thrown.
786 //
787 //It is reached when...
788 //
789 // a) A commit was called on the same transaction twice.
790 // When the thread comes in on the commit call, a
791 // CommitJob already exists and the state is COMMITTED.
792 //
793 // b) The normal case where a PrepareJob was found to
794 // have prepared the transaction and the tally of
795 // votes resulted in a PREPARED outcome. This causes
796 // the state to be changed to COMMITTED and a
797 // CommitJob to be created.
798 //
799 // c) A PrepareAndCommitJob has successfully prepared
800 // a participant which rolled its changes forward.
801 //
802 //Note: By checking to see if the CommitJob is already
803 // present, this check allows us to use the same
804 // wait-for-CommitJob code for the regular
805 // PREPARE/COMMIT, the COMMIT/COMMIT and
806 // the PREPAREANDCOMMIT cases.
807
808 synchronized (jobLock) {
809 //A prepareAndCommitJob is done at this
810 //point since the TransactionParticipant
811 //would have instructed itself to roll
812 //forward.
813
814 if (job instanceof PrepareAndCommitJob) {
815 if(!modifyTxnState(COMMITTED))
816 throw new CannotCommitException("transaction " +
817 "ABORTED");
818 break;
819 }
820
821
822 //If the abort already arrived, then stop
823
824 if (job instanceof AbortJob)
825 throw new CannotCommitException("transaction " +
826 "ABORTED");
827 }
828
829 if (getState() != COMMITTED)
830 throw new
831 InternalManagerException("TxnManagerTransaction: " +
832 "commit: " + job + " got bad state: " +
833 TxnConstants.getName(result.intValue()));
834
835
836 now = System.currentTimeMillis();
837 transpired = now - starttime;
838
839 boolean committed = false;
840
841 //If the commit is asynchronous then...
842 //
843 // a) check to see if the wait time has transpired
844 //
845 // b) If it hasn't, sleep for what's left from the wait time
846
847 try {
848 remainder = waitFor - transpired;
849 synchronized (jobLock) {
850 if (remainder <= 0 || !job.isCompleted(remainder)) {
851 /*
852 * Note - SettlerTask will kick off another Commit/Abort task for the same txn
853 * which will try go through the VOTING->Commit states again.
854 */
855 //TODO - Kill off existing task? Postpone SettlerTask?
856 settler.noteUnsettledTxn(str.id);
857 throw new TimeoutExpiredException(
858 "timeout expired", true);
859 } else {
860 result = (Integer) job.computeResult();
861 committed = true;
862 }
863 }
864 } catch (ResultNotReadyException rnre) {
865 //this should not happen, so flag
866 //as an error.
867 } catch (JobNotStartedException jnse) {
868 //an error
869 } catch (JobException je) {
870 //an error
871 }
872
873 if (committed)
874 break;
875
876 default:
877 throw new InternalManagerException("TxnManagerTransaction: " +
878 "commit: " + job + " got bad state: " +
879 TxnConstants.getName(result.intValue()));
880 }
881
882 //We don't care about the result from
883 //the CommitJob
884 log.invalidate();
885 } catch (RuntimeException rte) {
886 if (transactionsLogger.isLoggable(Level.FINEST)) {
887 transactionsLogger.log(Level.FINEST,
888 "Problem committing transaction",
889 rte);
890 }
891 throw rte;
892 } catch (LogException le) {
893 if (transactionsLogger.isLoggable(Level.FINEST)) {
894 transactionsLogger.log(Level.FINEST,
895 "Problem persisting transaction",
896 le);
897 }
898 throw new CannotCommitException("Unable to log");
899 }
900 if (operationsLogger.isLoggable(Level.FINER)) {
901 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
902 "commit");
903 }
904 }
905
906
907 /**
908 * Aborts the transaction.
909 * This method attempts to set the state of the transaction
910 * to the ABORTED state. If successful, it then creates
911 * an AbortJob which schedules tasks executed on a
912 * thread pool. These tasks interact with the participants
913 * joined in this transaction to inform them to roll back.
914 *
915 * @param waitFor Timeout value which controls how long,
916 * the caller is willing to wait for the
917 * participants joined in the transaction
918 * to be instructed to roll-back.
919 *
920 * @see com.sun.jini.mahalo.AbortJob
921 * @see com.sun.jini.mahalo.ParticipantTask
922 * @see com.sun.jini.thread.TaskManager
923 * @see net.jini.core.transaction.server.TransactionParticipant
924 */
925 void abort(long waitFor)
926 throws CannotAbortException, TimeoutExpiredException
927 {
928 if (operationsLogger.isLoggable(Level.FINER)) {
929 operationsLogger.entering(TxnManagerTransaction.class.getName(),
930 "abort", new Long(waitFor));
931 }
932 long starttime = System.currentTimeMillis();
933
934 /*
935 * Since lease cancellation process sets expiration to 0
936 * and then calls abort, can't reliably check expiration
937 * at this point.
938 */
939 //TODO - Change internal, lease logic to call overload w/o expiration check
940 //TODO - Add expiration check to abort for external clients
941 try {
942 Vector joinvec = parthandles();
943
944 if (joinvec == null) {
945 if (modifyTxnState(ABORTED))
946 return;
947 else
948 throw new
949 CannotAbortException("Transaction already COMMITTED");
950 }
951
952 Enumeration joined = joinvec.elements();
953 int numparts = joinvec.size();
954 ParticipantHandle[] phs = new ParticipantHandle[numparts];
955 joinvec.copyInto(phs);
956
957 ClientLog log = logmgr.logFor(str.id);
958
959
960 //When attempting to abort, if someone has already
961 //committed the transaction, then throw an Exception.
962 //
963 //If an abort is possible, and you find that an AbortJob
964 //is already in progress, let the existing AbortJob
965 //proceed.
966 //
967 //If an abort is possible, but a PrepareJob is in progress,
968 //go ahead an halt the PrepareJob and replace it with
969 //an AbortJob.
970
971 if (modifyTxnState(ABORTED)) {
972 log.write(new AbortRecord(phs));
973
974 synchronized (jobLock) {
975 if (!(job instanceof AbortJob)) {
976 if (job != null)
977 job.stop();
978 job = new AbortJob(str, threadpool, wm, log, phs);
979 job.scheduleTasks();
980 }
981 }
982 } else {
983 throw new CannotAbortException("Transaction already COMMITTED");
984 }
985
986
987 //This code waits for an AbortJob to complete and
988 //computes its resulting outcome. In addition,
989 //the wait time is enforced. Should the wait time
990 //expire, a SettlerTask is scheduled on a thread
991 //pool. The SettlerTask is needed to complete
992 //the abort (instruct participants to roll-back)
993 //on behalf of the thread which exits when
994 //the TimeoutExpiredException is thrown.
995
996 long now = System.currentTimeMillis();
997 long transpired = now - starttime;
998
999 Integer result = new Integer(ACTIVE);
1000 boolean aborted = false;
1001
1002 long remainder = waitFor - transpired;
1003
1004 try {
1005 synchronized (jobLock) {
1006 if (remainder<= 0 || !job.isCompleted(remainder)) {
1007 settler.noteUnsettledTxn(str.id);
1008 throw new TimeoutExpiredException(
1009 "timeout expired",false);
1010 } else {
1011 result = (Integer) job.computeResult();
1012 aborted = true;
1013 }
1014 }
1015 } catch (ResultNotReadyException rnre) {
1016 //should not happen, so flag as error
1017 } catch (JobNotStartedException jnse) {
1018 //error
1019 } catch (JobException je) {
1020 settler.noteUnsettledTxn(str.id);
1021 throw new TimeoutExpiredException("timeout expired", false);
1022 }
1023
1024
1025 if (!aborted)
1026 throw new InternalManagerException("TxnManagerTransaction: " +
1027 "abort: AbortJob got bad state: " +
1028 TxnConstants.getName(result.intValue()));
1029
1030 log.invalidate();
1031 } catch (RuntimeException rte) {
1032 if (transactionsLogger.isLoggable(Level.SEVERE)) {
1033 transactionsLogger.log(Level.SEVERE,
1034 "Problem aborting transaction",
1035 rte);
1036 }
1037 throw new InternalManagerException("TxnManagerTransaction: " +
1038 "abort: fatal error");
1039 } catch (LogException le) {
1040 if (transactionsLogger.isLoggable(Level.FINEST)) {
1041 transactionsLogger.log(Level.FINEST,
1042 "Problem persisting transaction",
1043 le);
1044 }
1045 throw new CannotAbortException("Unable to log");
1046 }
1047 if (operationsLogger.isLoggable(Level.FINER)) {
1048 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1049 "abort");
1050 }
1051 }
1052
1053 public Transaction getTransaction() {
1054 if (operationsLogger.isLoggable(Level.FINER)) {
1055 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1056 "getTransaction");
1057 }
1058 if (operationsLogger.isLoggable(Level.FINER)) {
1059 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1060 "getTransaction", str);
1061 }
1062 return str;
1063 }
1064
1065 public long getExpiration() {
1066 if (operationsLogger.isLoggable(Level.FINER)) {
1067 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1068 "getExpiration");
1069 }
1070 synchronized (leaseLock) {
1071 if (operationsLogger.isLoggable(Level.FINER)) {
1072 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1073 "getExpiration", new Date(expires));
1074 }
1075 return expires;
1076 }
1077 }
1078
1079 public void setExpiration(long newExpiration) {
1080 if (operationsLogger.isLoggable(Level.FINER)) {
1081 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1082 "setExpiration", new Date(newExpiration));
1083 }
1084 synchronized (leaseLock) {
1085 expires = newExpiration;
1086 }
1087 if (operationsLogger.isLoggable(Level.FINER)) {
1088 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1089 "setExpiration");
1090 }
1091 }
1092
1093 public Uuid getCookie() {
1094 if (operationsLogger.isLoggable(Level.FINER)) {
1095 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1096 "getCookie");
1097 }
1098 if (operationsLogger.isLoggable(Level.FINER)) {
1099 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1100 "getCookie", uuid);
1101 }
1102 return uuid;
1103 }
1104
1105 private void doAbort(long timeout) {
1106 if (operationsLogger.isLoggable(Level.FINER)) {
1107 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1108 "doAbort", new Long(timeout));
1109 }
1110 try {
1111 str.abort(timeout);
1112 } catch (RemoteException re) {
1113 //abort must have happened, so ignore
1114 if (transactionsLogger.isLoggable(Levels.HANDLED)) {
1115 transactionsLogger.log(Levels.HANDLED,
1116 "Trouble aborting transaction", re);
1117 }
1118 } catch (TimeoutExpiredException te) {
1119 //Swallow this because we really only
1120 //care about a scheduling a SettlerTask
1121 if (transactionsLogger.isLoggable(Levels.HANDLED)) {
1122 transactionsLogger.log(Levels.HANDLED,
1123 "Trouble aborting transaction", te);
1124 }
1125 } catch (TransactionException bte) {
1126 //If abort has problems, swallow
1127 //it because the abort must have
1128 //happened
1129 if (transactionsLogger.isLoggable(Levels.HANDLED)) {
1130 transactionsLogger.log(Levels.HANDLED,
1131 "Trouble aborting transaction", bte);
1132 }
1133 }
1134 if (operationsLogger.isLoggable(Level.FINER)) {
1135 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1136 "doAbort");
1137 }
1138
1139 }
1140
1141 synchronized boolean ensureCurrent() {
1142 if (operationsLogger.isLoggable(Level.FINER)) {
1143 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1144 "ensureCurrent");
1145 }
1146 long cur = System.currentTimeMillis();
1147 boolean result = false;
1148 long useby = getExpiration();
1149
1150 if (useby > cur)
1151 result = true;
1152 if (operationsLogger.isLoggable(Level.FINER)) {
1153 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1154 "ensureCurrent", Boolean.valueOf(result));
1155 }
1156 return result;
1157 }
1158
1159
1160 private Vector parthandles() {
1161 if (operationsLogger.isLoggable(Level.FINER)) {
1162 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1163 "parthandles");
1164 }
1165 if ( (parts == null ) || ( parts.size() == 0 ) )
1166 return null;
1167
1168 Vector vect = new Vector(parts);
1169
1170 if (transactionsLogger.isLoggable(Level.FINEST)) {
1171 transactionsLogger.log(Level.FINEST,
1172 "Retrieved {0} participants",
1173 new Integer(vect.size()));
1174 }
1175
1176 if (operationsLogger.isLoggable(Level.FINER)) {
1177 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1178 "parthandles");
1179 }
1180
1181 return vect;
1182 }
1183
1184 private String getParticipantInfo() {
1185 if (operationsLogger.isLoggable(Level.FINER)) {
1186 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1187 "getParticipantInfo");
1188 }
1189 if ( (parts == null ) || ( parts.size() == 0 ) )
1190 return "No participants";
1191
1192 if (transactionsLogger.isLoggable(Level.FINEST)) {
1193 transactionsLogger.log(Level.FINEST,
1194 "{0} participants joined", new Integer(parts.size()));
1195 }
1196 StringBuffer sb = new StringBuffer(parts.size() + " Participants: ");
1197 ParticipantHandle ph;
1198 for (int i=0; i < parts.size(); i++) {
1199 ph = (ParticipantHandle)parts.get(i);
1200 sb.append(
1201 "{" + i + ", "
1202 + ph.getPreParedParticipant().toString() + ", "
1203 + TxnConstants.getName(ph.getPrepState())
1204 + "} ");
1205 }
1206 if (operationsLogger.isLoggable(Level.FINER)) {
1207 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1208 "getParticipantInfo", sb.toString());
1209 }
1210
1211 return sb.toString();
1212 }
1213
1214 void restoreTransientState(ProxyPreparer preparer)
1215 throws RemoteException
1216 {
1217 if (operationsLogger.isLoggable(Level.FINER)) {
1218 operationsLogger.entering(TxnManagerTransaction.class.getName(),
1219 "restoreTransientState");
1220 }
1221 if ( (parts == null ) || ( parts.size() == 0 ) )
1222 return;
1223
1224 ParticipantHandle[] handles = (ParticipantHandle[])
1225 parts.toArray(new ParticipantHandle[parts.size()]);
1226 for (int i=0; i < handles.length; i++) {
1227 handles[i].restoreTransientState(preparer);
1228 if (transactionsLogger.isLoggable(Level.FINEST)) {
1229 transactionsLogger.log(Level.FINEST,
1230 "Restored transient state for {0}",
1231 handles[i]);
1232 }
1233 }
1234
1235 if (operationsLogger.isLoggable(Level.FINER)) {
1236 operationsLogger.exiting(TxnManagerTransaction.class.getName(),
1237 "restoreTransientState");
1238 }
1239 }
1240 }