1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package com.sun.jini.mahalo;
20
21 import java.rmi.RemoteException;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Vector;
25
26 import jgroup.core.GroupManager;
27 import jgroup.core.MemberId;
28 import jgroup.core.MembershipListener;
29 import jgroup.core.MembershipService;
30 import jgroup.core.View;
31 import jgroup.core.protocols.Anycast;
32 import jgroup.core.protocols.Atomic;
33 import jgroup.jini.txn.InternalGroupTransactionManager;
34 import jgroup.jini.txn.InternalPassiveGroupTransactionManager;
35 import net.jini.core.lease.LeaseDeniedException;
36 import net.jini.core.transaction.CannotAbortException;
37 import net.jini.core.transaction.CannotCommitException;
38 import net.jini.core.transaction.CannotJoinException;
39 import net.jini.core.transaction.TimeoutExpiredException;
40 import net.jini.core.transaction.UnknownTransactionException;
41 import net.jini.core.transaction.server.CrashCountException;
42 import net.jini.core.transaction.server.TransactionManager;
43 import net.jini.core.transaction.server.TransactionParticipant;
44
45 import org.apache.log4j.Logger;
46
47 import com.sun.jini.start.LifeCycle;
48
49
50
51
52
53
54
55
56
57 public class PassiveGroupMahalo
58
59 implements TransactionManager, MembershipListener, InternalPassiveGroupTransactionManager,
60 InternalGroupTransactionManager
61 {
62
63
64
65
66
67 private final static Logger log = Logger.getLogger(PassiveGroupMahalo.class);
68
69
70
71
72
73
74 private static final long serialVersionUID = -6405403802047754780L;
75
76
77 private static InternalPassiveGroupTransactionManager ipgtm;
78 private InternalGroupTransactionManager igtm;
79
80 private MembershipService pgms;
81
82
83 private MemberId me;
84
85
86 private boolean leader;
87
88
89 private GroupTxnManagerImpl tm;
90
91
92 private long prepareStart;
93 private long prepareEnd;
94 private long commitStart;
95 private long commitEnd;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 public PassiveGroupMahalo(String[] configArgs, LifeCycle lifeCycle)
118 throws Exception
119 {
120 if (log.isDebugEnabled())
121 log.debug("Trying to start pgtm...");
122
123
124 GroupManager gm = GroupManager.getGroupManager(this);
125
126
127 ipgtm = (InternalPassiveGroupTransactionManager) gm
128 .getService(InternalPassiveGroupTransactionManager.class);
129 igtm = (InternalGroupTransactionManager) gm
130 .getService(InternalGroupTransactionManager.class);
131
132
133 pgms = (MembershipService) gm.getService(MembershipService.class);
134 me = pgms.getMyIdentifier();
135
136 tm = new GroupTxnManagerImpl(configArgs, lifeCycle, false);
137
138 if (log.isDebugEnabled())
139 log.debug("Started PGTM!");
140 }
141
142
143
144
145
146
147
148
149
150 public void viewChange(View view)
151 {
152 if (log.isDebugEnabled()) {
153 log.debug("View id: " + view);
154 }
155
156 leader = view.isLeader(me);
157 if (log.isDebugEnabled() && view.size() > 1 && leader) {
158 log.debug("I am the leader replica.");
159 }
160 }
161
162
163
164
165 public void prepareChange()
166 {
167 }
168
169
170
171
172 public void hasLeft()
173 {
174 }
175
176
177
178
179
180
181
182
183
184 @Anycast
185 public TransactionManager.Created create(long lease)
186 throws LeaseDeniedException, RemoteException
187 {
188 if (log.isDebugEnabled())
189 log.debug("Invoking tm.create on mahalo..");
190
191 TransactionManager.Created created = tm.create(lease);
192
193 if (log.isDebugEnabled()) {
194 log.debug("PassiveGroupTransientMahalo.create() created txn with id: " + created.id);
195
196 }
197
198 GroupTxnManagerTransaction tmt = (GroupTxnManagerTransaction) tm.txns.get(new Long(created.id));
199 try {
200 igtm.internalCreate(tmt, created.id);
201 } catch (RemoteException e) {
202 log.debug(e.getMessage());
203 }
204 return created;
205 }
206
207
208
209
210
211
212 @Atomic
213 public void join(long id, TransactionParticipant part, long crashCount)
214 throws UnknownTransactionException, CannotJoinException, CrashCountException, RemoteException
215 {
216 if (log.isDebugEnabled()) {
217 log.debug("PassiveGroupTransientMahalo.join() triggered with txn id: " + id);
218 }
219
220 tm.join(id, part, crashCount);
221 if (log.isDebugEnabled()) {
222
223 log.debug("PassiveGroupTransientMahalo.join() done with txn id: " + id);
224 }
225 }
226
227
228
229
230
231 @Anycast
232 public int getState(long id) throws UnknownTransactionException
233 {
234 int state = tm.getState(id);
235 if (log.isDebugEnabled())
236 log.debug("PassiveGroupTransientMahalo.getState() for txn id: " + id + " is " + state);
237 return state;
238 }
239
240
241
242
243
244 @Anycast
245 public void commit(long id)
246 throws UnknownTransactionException, CannotCommitException, RemoteException
247 {
248 if (log.isDebugEnabled())
249 log.debug("PassiveGroupTransientMahalo.commit() invoked with txn id: " + id);
250
251 tm.commit(id);
252
253 if (log.isDebugEnabled())
254 log.debug("PassiveGroupTransientMahalo.commit() finished with txn id: " + id);
255 }
256
257
258
259
260
261 @Anycast
262 public void commit(long id, long waitFor)
263 throws UnknownTransactionException, CannotCommitException,
264 TimeoutExpiredException, RemoteException
265 {
266 if (log.isDebugEnabled())
267 log.debug("PassiveGroupTransientMahalo.commit() invoked with txn id: " + id);
268
269 tm.commit(id, waitFor);
270
271 if (log.isDebugEnabled())
272 log.debug("PassiveGroupTransientMahalo.commit() finished with txn id: " + id);
273 }
274
275
276
277
278
279 @Anycast
280 public void abort(long id)
281 throws UnknownTransactionException, CannotAbortException
282 {
283 tm.abort(id);
284 if (log.isDebugEnabled())
285 log.debug("Transaction " + id + " aborted.");
286 }
287
288
289
290
291
292 @Anycast
293 public void abort(long id, long waitFor) throws UnknownTransactionException,
294 CannotAbortException, TimeoutExpiredException
295 {
296 if (log.isDebugEnabled())
297 log.debug("Transaction " + id + " aborted.");
298 tm.abort(id, waitFor);
299 }
300
301
302
303
304
305 public Object getState(MemberId[] dests)
306 {
307
308
309 return tm.txns;
310 }
311
312
313
314
315 public void putState(Object state, MemberId[] sources)
316 {
317 if (log.isDebugEnabled())
318 log.debug("PassiveGroupTransientMahalo.putState() invoked.");
319
320 if (state != null) {
321 Map remoteTxns = (Map) state;
322
323 synchronized (remoteTxns)
324 {
325 for(Iterator iter=remoteTxns.keySet().iterator();iter.hasNext();) {
326 Long remoteTransactionID = (Long) iter.next();
327 GroupTxnManagerTransaction remoteTxn = (GroupTxnManagerTransaction) remoteTxns
328 .get(remoteTransactionID);
329 GroupTxnManagerTransaction localTxn = (GroupTxnManagerTransaction) tm.txns
330 .get(remoteTransactionID);
331
332 if (localTxn == null) {
333 remoteTxn.setLogManager(tm.getLogManager());
334 remoteTxn.setTaskManager(tm.getTaskManager());
335 remoteTxn.setWakeupManager(tm.getWakeupManager());
336 remoteTxn.setSettler(tm);
337 remoteTxn.setLocks();
338 tm.txns.put(remoteTransactionID, remoteTxn);
339
340 if (log.isDebugEnabled())
341 log.debug("Transaction " + remoteTransactionID + "added");
342
343
344
345
346 } else {
347 if (log.isDebugEnabled())
348 log.debug("Transaction " + remoteTransactionID + "existed locally");
349 }
350
351 }
352 }
353 }
354 }
355
356
357
358
359
360
361
362
363
364 public void transPrepared(Object txnMgr, long tid)
365 throws RemoteException
366 {
367
368 prepareStart = System.nanoTime();
369 if (log.isDebugEnabled())
370 log.debug("backupTransaction() - txn id: " + tid);
371
372 GroupTxnManagerTransaction mgrTransaction = (GroupTxnManagerTransaction) txnMgr;
373
374
375 mgrTransaction.setLocks();
376
377 if (tm.txns.containsKey(tid)) {
378
379
380
381 GroupTxnManagerTransaction current = (GroupTxnManagerTransaction) tm.txns.get(new Long(tid));
382 if (current.getState() != mgrTransaction.getState())
383
384 tm.txns.remove(tid);
385 else {
386 prepareEnd = System.nanoTime();
387 return;
388 }
389 }
390
391 mgrTransaction.setLogManager(tm.getLogManager());
392 mgrTransaction.setTaskManager(tm.getTaskManager());
393 mgrTransaction.setWakeupManager(tm.getWakeupManager());
394 mgrTransaction.setSettler(tm);
395 tm.txns.put(new Long(tid), mgrTransaction);
396 prepareEnd = System.nanoTime();
397
398 }
399
400
401
402
403
404 public void transCommitted(long id)
405 throws UnknownTransactionException, RemoteException
406 {
407 commitStart = System.nanoTime();
408
409 log.debug("entering finalCommit() - txn id: " + id);
410
411
412 Long txnid = new Long(id);
413
414 if (tm.txns.containsKey(txnid))
415 tm.txns.remove(txnid);
416 else
417 throw new UnknownTransactionException("No transaction with id " + id + " found");
418
419 if (log.isDebugEnabled())
420 log.debug("leaving finalCommit() - txn id: " + id);
421
422 commitEnd = System.nanoTime();
423 log.warn("execute: " + ((prepareEnd - prepareStart)/1000000) + "\t" + ((commitEnd - commitStart)/1000000));
424 }
425
426
427
428
429
430
431
432
433
434 public void printTxns(Map<Long,GroupTxnManagerTransaction> txns)
435 {
436 Vector<GroupTxnManagerTransaction> keys = new Vector<GroupTxnManagerTransaction>(txns
437 .values());
438 int n = keys.size();
439 log.debug("Current Transactions:\n");
440 for (int i = 0; i < n; i++) {
441 GroupTxnManagerTransaction tmt = (GroupTxnManagerTransaction) keys.get(i);
442 if (tmt != null) {
443 long id = tmt.getID();
444 int state = tmt.getState();
445 String stateString = "Not valid!";
446
447
448
449
450
451
452
453
454 log.debug("Txn-id: " + id);
455 switch (state) {
456 case 1: stateString = "active";
457 break;
458 case 2: stateString = "voting";
459 break;
460 case 3: stateString = "prepared";
461 break;
462 case 4: stateString = "notchanged";
463 break;
464 case 5: stateString = "committed";
465 break;
466 case 6: stateString = "aborted";
467 break;
468 }
469 log.debug(" State = " + stateString + "\n");
470 }
471 }
472 log.debug("");
473 }
474
475 public String toString()
476 {
477 return "PassiveGroupTransientMahalo";
478 }
479
480
481
482
483
484
485
486
487
488
489
490 protected static InternalPassiveGroupTransactionManager getIPGTM()
491 {
492 return ipgtm;
493 }
494
495
496
497
498
499
500
501
502
503 public void internalCreate(Object txnMgr, long id)
504 throws RemoteException
505 {
506 if (log.isDebugEnabled())
507 log.debug("internalCreate() - txn id: " + id);
508
509 if (leader)
510 return;
511
512
513 GroupTxnManagerTransaction mgrTransaction = (GroupTxnManagerTransaction) txnMgr;
514
515 mgrTransaction.setLogManager(tm.getLogManager());
516 mgrTransaction.setTaskManager(tm.getTaskManager());
517 mgrTransaction.setWakeupManager(tm.getWakeupManager());
518 mgrTransaction.setSettler(tm);
519 mgrTransaction.setLocks();
520 tm.txns.put(new Long(id), mgrTransaction);
521 }
522
523
524
525
526 public void internalCommit(long id)
527 throws UnknownTransactionException, CannotCommitException, RemoteException
528 {
529 throw new UnsupportedOperationException("Not implemented.");
530 }
531
532 }