1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.test.performance.upgrade;
20
21 import jgroup.test.performance.SpeedTest;
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 class ClientThreadPool extends Thread {
50
51 public ClientThreadPool(int method, int calibReqCallNo, int rps, int times, SpeedTest server ) throws Exception{
52
53 if( ! initiated ) throw new RuntimeException("Not initialized properly");
54 test = new UpgradeClientTest(server);
55 synchronized (ClientThreadPool.class) {id = instances ++; }
56 AvRespTime[id][0] = test.calibrate(method, calibReqCallNo);
57 startCycleTime[0] = System.currentTimeMillis();
58 this.rps = rps;
59 this.times = times;
60 setName("TestClient " + id);
61 }
62
63
64
65
66
67
68
69
70
71 public static void init(int threads, int cycles) {
72 threads_no = threads;
73 cycles_no = cycles;
74
75 AvRespTime= new float[threads][cycles+1];
76 startCycleTime = new long[cycles+2];
77 initiated = true;
78 }
79
80
81
82
83
84 public void run() {
85 try {
86 while ( true ) {
87 results_ready = false;
88 synchronized (poolSemaphore) {
89 active_instances++;
90 if(active_instances < threads_no) {
91
92 poolSemaphore.wait();
93
94
95 } else {
96
97 active_instances = 0;
98 synchronized (startSemaphore) {
99 synchronized (start_request) {
100 waiting_for_start = true;
101 start_request.notify();
102
103 }
104 startSemaphore.wait();
105 waiting_for_start = false;
106
107 }
108
109 poolSemaphore.notifyAll();
110 startCycleTime[cur_cycle+1] = System.currentTimeMillis();
111 }
112 if( exit ) return;
113 }
114
115
116 run1();
117
118 synchronized (result) {cur_cycle++; results_ready = true; result.notifyAll();}
119
120 }
121 } catch(InterruptedException e) {
122 System.out.println(getName() + " interrupted");
123 }
124 }
125
126
127
128
129 synchronized void run1() {
130
131 AvRespTime[id][cur_cycle+1] = test.generate_traffic(rps, times);
132 }
133
134 private int id;
135 private volatile Object result = new Object();
136 private volatile int cur_cycle = 0;
137 private volatile boolean results_ready = false;
138
139
140 private volatile static int instances=0, active_instances=0;
141 private volatile static Object startSemaphore = new Object();
142 private volatile static Object poolSemaphore = new Object();
143 private volatile static Object start_request = new Object();
144 private volatile static boolean waiting_for_start = false;
145 private static boolean exit = false;
146 private static boolean first_started = true;
147
148 private volatile static int threads_no = 0;
149 private volatile static int cycles_no = -1;
150 private volatile static boolean initiated = false;
151
152
153 private int rps, times;
154 private UpgradeClientTest test;
155 private static float AvRespTime[][];
156 private static long startCycleTime[];
157
158
159
160
161 public static synchronized int getRunningInstances() {
162 return instances;
163 }
164
165
166
167
168
169
170 public static synchronized void allStop() {
171 exit = true;
172 synchronized (startSemaphore) {startSemaphore.notify();}
173
174 }
175
176
177
178
179 public void CycleDone(int cycle) {
180 synchronized (result) {
181 if(cur_cycle >= cycle) return ;
182 while (cur_cycle < cycle -1 ) {
183 try {
184 result.wait();
185 } catch(InterruptedException e) {}
186 }
187 if( results_ready) {
188
189 return;
190 } else {
191 try {
192
193 result.wait();
194 } catch(InterruptedException e) {}
195 }
196 }
197 }
198
199
200
201
202
203
204
205 public static void allStart() {
206
207 if (! first_started) {
208 try {
209 synchronized (poolSemaphore ) {
210 poolSemaphore.wait();
211 }
212 } catch(InterruptedException e) {}
213 first_started = false;
214 }
215
216 while( true ) {
217 try {
218 synchronized( start_request ) {
219 if( waiting_for_start ) {
220
221 waiting_for_start = false;
222 break;
223 } else {
224
225 start_request.wait();
226 }
227 }
228 } catch(InterruptedException e) {}
229 }
230
231
232 synchronized( startSemaphore ) {
233 startSemaphore.notify();
234 }
235 }
236
237
238
239
240
241
242 public static long getStartCycleTime(int cycle) {
243 if(cycle<0 || cycle>cycles_no) return -1;
244 return startCycleTime[cycle];
245 }
246
247
248
249
250
251 public static float getRespTime(int thread, int cycle) {
252
253 return AvRespTime[thread][cycle];
254
255 }
256
257 }