1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.test.performance.upgrade;
20
21 import jgroup.test.performance.SpeedTest;
22
23
24 /**
25 * Test client thread that is able to work in a pool of threads for coordination.
26 * The class itself manages a pool of its intances. The ClientThreadPool instances are
27 * created and started (@see java.lang.Thread.start()), they are added to the
28 * pool. The threads wait until allStart() method is invoked and then all of them
29 * start at once executing method run1().
30 * The run cycle may be repeated by calling allStart().
31 * Additionally, method getResponseTime() may be invoked to check
32 * the result of the run1() computations.
33 *
34 * Note1: Many threads running in one jvm can only generate bigger traffic if and
35 * only if the jvm runs on a multiprosessor machine. Another possibility is to
36 * extend the coordination to "remote" threads.
37 *
38 * Note2: this class should be split into three for better reuse: one implementing
39 * the coordination scheme between the threads in pool, one implementing a pool
40 * container with typical membership operations like join, leave and one class or
41 * interface for the implementation of the actual logic.
42 * Thus we could have many thread pools, each managing of different, potentially
43 * heterogeneous "worker" Thread instances.
44 *
45 *
46 * @author Marcin Solarski
47 * @since Jgroup 2.1
48 */
49 class ClientThreadPool extends Thread {
50
51 public ClientThreadPool(int method, int calibReqCallNo, int rps, int times, SpeedTest server ) throws Exception{
52
53 if( ! initiated ) throw new RuntimeException("Not initialized properly");
54 test = new UpgradeClientTest(server);
55 synchronized (ClientThreadPool.class) {id = instances ++; }
56 AvRespTime[id][0] = test.calibrate(method, calibReqCallNo);
57 startCycleTime[0] = System.currentTimeMillis();
58 this.rps = rps;
59 this.times = times;
60 setName("TestClient " + id);
61 }
62
63
64 /**
65 * initializes the pool. The operation has to be invoked before any thread
66 * is created.
67 * @param threads number of threads to be coordinated in the pool
68 * @param cycles number of cycles of synchronization
69 */
70
71 public static void init(int threads, int cycles) {
72 threads_no = threads;
73 cycles_no = cycles;
74 //init the table for results, position 0 is reserved for calibration
75 AvRespTime= new float[threads][cycles+1];
76 startCycleTime = new long[cycles+2];
77 initiated = true;
78 }
79
80 /**
81 * the method implements synchronization of starting executing method run1()
82 * at once by all the threads in the pool.
83 */
84 public void run() {
85 try {
86 while ( true ) {
87 results_ready = false;
88 synchronized (poolSemaphore) {
89 active_instances++;
90 if(active_instances < threads_no) {
91 //System.out.println(getName() + " waiting for TestTread.class, active instances: " + active_instances);
92 poolSemaphore.wait();
93 //System.out.println(getName() + " after waiting for TestTread.class");
94
95 } else {
96
97 active_instances = 0;
98 synchronized (startSemaphore) {
99 synchronized (start_request) {
100 waiting_for_start = true;
101 start_request.notify();
102 //System.out.println(getName() + " (last thread) waiting for startSemaphore...." + waiting_for_start);
103 }
104 startSemaphore.wait();
105 waiting_for_start = false;
106
107 }
108 //System.out.println(getName() + " notifies all threads waiting for poolSemaphore");
109 poolSemaphore.notifyAll();
110 startCycleTime[cur_cycle+1] = System.currentTimeMillis();
111 }
112 if( exit ) return;
113 }
114
115 //do the work
116 run1();
117 //System.out.println(getName() + " result.notify for cycle " + cur_cycle);
118 synchronized (result) {cur_cycle++; results_ready = true; result.notifyAll();}
119 //notify that the result is ready gerRespoTime()
120 }
121 } catch(InterruptedException e) {
122 System.out.println(getName() + " interrupted");
123 }
124 }
125
126 /**
127 * the method implement the actual work to do by each of the threads in the pool
128 */
129 synchronized void run1() {
130 //System.out.println("Starting " + getName());
131 AvRespTime[id][cur_cycle+1] = test.generate_traffic(rps, times);
132 }
133
134 private int id;
135 private volatile Object result = new Object();
136 private volatile int cur_cycle = 0;
137 private volatile boolean results_ready = false;
138
139
140 private volatile static int instances=0, active_instances=0;
141 private volatile static Object startSemaphore = new Object();
142 private volatile static Object poolSemaphore = new Object();
143 private volatile static Object start_request = new Object();
144 private volatile static boolean waiting_for_start = false;
145 private static boolean exit = false;
146 private static boolean first_started = true;
147
148 private volatile static int threads_no = 0;
149 private volatile static int cycles_no = -1;
150 private volatile static boolean initiated = false;
151
152 //Test specific
153 private int rps, times;
154 private UpgradeClientTest test;
155 private static float AvRespTime[][];
156 private static long startCycleTime[];
157
158 /**
159 * the method returns the number of threads in the pool.
160 */
161 public static synchronized int getRunningInstances() {
162 return instances;
163 }
164
165
166 /**
167 * the method makes all threads to stop after the current execution
168 * of method run1() is over.
169 */
170 public static synchronized void allStop() {
171 exit = true;
172 synchronized (startSemaphore) {startSemaphore.notify();}
173
174 }
175
176 /**
177 * waits until the results of the given cycle are ready
178 */
179 public void CycleDone(int cycle) {
180 synchronized (result) {
181 if(cur_cycle >= cycle) return ;
182 while (cur_cycle < cycle -1 ) {
183 try {
184 result.wait();
185 } catch(InterruptedException e) {}
186 }
187 if( results_ready) {
188 //System.out.println(getName() + " returning from CycleDone " + cur_cycle);
189 return;
190 } else {
191 try {
192 //System.out.println(getName() + " has to wait in CycleDone in the last cycle because the result are not ready " + cur_cycle);
193 result.wait();
194 } catch(InterruptedException e) {}
195 }
196 }
197 }
198
199
200
201 /**
202 * the method triggers all threads in the pool to start executing
203 * method run1()
204 */
205 public static void allStart() {
206 //System.out.println("Starting all threads");
207 if (! first_started) {
208 try {
209 synchronized (poolSemaphore ) {
210 poolSemaphore.wait();
211 }
212 } catch(InterruptedException e) {}
213 first_started = false;
214 }
215
216 while( true ) {
217 try {
218 synchronized( start_request ) {
219 if( waiting_for_start ) {
220 //System.out.println("waiting for start is true");
221 waiting_for_start = false;
222 break;
223 } else {
224 //System.out.println("waiting for start is false");
225 start_request.wait();
226 }
227 }
228 } catch(InterruptedException e) {}
229 }
230
231 //System.out.println("Sending startSemaphore.notify()");
232 synchronized( startSemaphore ) {
233 startSemaphore.notify();
234 }
235 }
236
237 /**
238 * returns the times at which a given cycle started
239 * @param cycle number of the cycle
240 * @return time of the cycle start
241 */
242 public static long getStartCycleTime(int cycle) {
243 if(cycle<0 || cycle>cycles_no) return -1;
244 return startCycleTime[cycle];
245 }
246
247 /**
248 * returns the computations of method run1(). In this case the avrage response
249 * time of calls to the server.
250 */
251 public static float getRespTime(int thread, int cycle) {
252 //if(thread>0 && thread <threads_no && cycle>=0 && cycle < cycles_no)
253 return AvRespTime[thread][cycle];
254 //else return 0;
255 }
256
257 } // END ClientThreadPool