View Javadoc

1   /*
2    * Copyright (c) 1998-2002 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.test.performance.upgrade;
20  
21  import jgroup.test.performance.SpeedTest;
22  
23  
24  /**
25   *  Test client thread that is able to work in a pool of threads for coordination.
26   *  The class itself manages a pool of its intances. The ClientThreadPool instances are 
27   *  created and started (@see java.lang.Thread.start()), they are added to the
28   *  pool. The threads wait until allStart() method is invoked and then all of them
29   *  start at once executing method run1(). 
30   *  The run cycle may be repeated by calling allStart().
31   *  Additionally, method getResponseTime() may be invoked to check
32   *  the result of the run1() computations.
33   *
34   *  Note1: Many threads running in one jvm can only generate bigger traffic if and 
35   *  only if the jvm runs on a multiprosessor machine. Another possibility is to 
36   *  extend the coordination to "remote" threads.
37   *
38   * Note2: this class should be split into three for better reuse: one implementing
39   * the coordination scheme between the threads in pool, one implementing a pool 
40   * container with typical membership operations like join, leave and one class or
41   * interface for the implementation of the actual logic.
42   * Thus we could have many thread pools, each managing of different, potentially
43   * heterogeneous "worker" Thread instances.
44   *
45   * 
46   *  @author       Marcin Solarski
47   *  @since        Jgroup 2.1
48   */
49  class ClientThreadPool extends Thread {
50      
51    public ClientThreadPool(int method, int calibReqCallNo, int rps, int times, SpeedTest server ) throws Exception{
52  
53      if( ! initiated ) throw new RuntimeException("Not initialized properly");   
54      test = new UpgradeClientTest(server);
55      synchronized (ClientThreadPool.class) {id = instances ++;  }
56      AvRespTime[id][0] = test.calibrate(method, calibReqCallNo);
57      startCycleTime[0] = System.currentTimeMillis();
58      this.rps = rps;
59      this.times = times;
60      setName("TestClient " + id);
61    }
62  
63    
64    /**
65     *  initializes the pool. The operation has to be invoked before any thread
66     *  is created.
67     *  @param threads number of threads to be coordinated in the pool
68     *  @param cycles number of cycles of synchronization 
69     */
70  
71    public static void init(int threads, int cycles) {
72      threads_no = threads;
73      cycles_no = cycles;
74      //init the table for results, position 0 is reserved for calibration
75            AvRespTime= new float[threads][cycles+1];
76      startCycleTime = new long[cycles+2];
77      initiated = true;
78    }
79  
80    /**
81     * the method implements synchronization of starting executing method run1()
82     * at once by all the threads in the pool.
83     */
84    public void run() {
85      try {
86        while ( true ) { 
87          results_ready = false;
88          synchronized (poolSemaphore) {
89             active_instances++;
90             if(active_instances < threads_no) {
91               //System.out.println(getName() + " waiting for TestTread.class, active instances: " + active_instances);
92               poolSemaphore.wait();
93               //System.out.println(getName() + " after waiting for TestTread.class");
94  
95             } else {
96  
97               active_instances = 0;
98               synchronized (startSemaphore) {
99                 synchronized (start_request) {
100                  waiting_for_start = true; 
101                  start_request.notify();
102                  //System.out.println(getName() + " (last thread) waiting for startSemaphore...." + waiting_for_start);
103                }
104                startSemaphore.wait();
105                waiting_for_start = false; 
106          
107              }
108              //System.out.println(getName() + " notifies all threads waiting for poolSemaphore");
109              poolSemaphore.notifyAll();
110              startCycleTime[cur_cycle+1] = System.currentTimeMillis();
111            }
112            if( exit ) return;
113         }
114 
115         //do the work
116         run1();
117         //System.out.println(getName() + " result.notify for cycle " + cur_cycle);
118         synchronized (result) {cur_cycle++; results_ready = true; result.notifyAll();}
119         //notify that the result is ready  gerRespoTime() 
120       }
121     } catch(InterruptedException e) { 
122       System.out.println(getName() + " interrupted");
123     }
124   }
125 
126   /**
127    * the method implement the actual work to do by each of the threads in the pool
128    */
129   synchronized void run1() {
130     //System.out.println("Starting " + getName());
131     AvRespTime[id][cur_cycle+1] = test.generate_traffic(rps, times);  
132   }
133 
134   private int id;
135   private volatile  Object  result = new Object();
136   private volatile int cur_cycle = 0;
137   private volatile boolean  results_ready = false;
138 
139 
140   private volatile static  int   instances=0, active_instances=0;
141   private volatile static Object startSemaphore = new Object();
142   private volatile static Object poolSemaphore = new Object(); 
143   private volatile static Object start_request = new Object(); 
144   private volatile static boolean  waiting_for_start = false;
145   private static boolean  exit = false;
146   private static boolean  first_started = true;
147 
148   private volatile static int threads_no = 0;
149   private volatile static int cycles_no = -1;
150   private volatile static boolean initiated = false;
151 
152   //Test specific 
153   private int rps, times;
154   private UpgradeClientTest test;
155   private static float AvRespTime[][];
156   private static long startCycleTime[];
157 
158   /**
159    * the method returns the number of threads in the pool.
160    */
161   public static synchronized int getRunningInstances() {
162     return instances;
163   }
164 
165 
166   /**
167    * the method makes all threads to stop after the current execution
168    * of method run1() is over.
169    */
170   public static synchronized void allStop() {
171     exit = true;
172     synchronized (startSemaphore) {startSemaphore.notify();}
173 
174   }
175 
176   /**
177    * waits until the results of the given cycle are ready
178    */
179   public  void CycleDone(int cycle) {
180     synchronized (result) {
181       if(cur_cycle  >= cycle) return ;
182              while (cur_cycle < cycle -1 ) {
183          try {
184            result.wait();
185                } catch(InterruptedException e) {}
186       }
187       if( results_ready) {
188         //System.out.println(getName() + " returning from CycleDone " + cur_cycle);
189         return;
190       } else {
191         try {
192           //System.out.println(getName() + " has to wait in CycleDone in the last cycle because the result are not ready " + cur_cycle);
193           result.wait();
194         } catch(InterruptedException e) {}
195       }
196     }
197   }
198 
199 
200 
201   /**
202    * the method triggers all threads in the pool to start executing
203    * method run1()
204    */
205   public static  void allStart() {
206     //System.out.println("Starting all threads");
207     if (! first_started) {
208       try {
209         synchronized (poolSemaphore ) {
210           poolSemaphore.wait();
211         }
212       } catch(InterruptedException e) {}
213       first_started = false;
214     }
215 
216     while( true ) {
217       try {
218         synchronized( start_request ) { 
219           if( waiting_for_start ) { 
220             //System.out.println("waiting for start is true"); 
221             waiting_for_start = false;
222             break;
223           } else {
224             //System.out.println("waiting for start is false"); 
225             start_request.wait();
226           }
227         }
228       } catch(InterruptedException e) {}
229     }
230     
231     //System.out.println("Sending startSemaphore.notify()");
232     synchronized( startSemaphore ) { 
233       startSemaphore.notify(); 
234     }
235   }
236 
237   /**
238    * returns the times at which a given cycle started
239    *  @param cycle number of the cycle
240    *  @return time of the cycle start
241    */
242   public static long getStartCycleTime(int cycle) {
243      if(cycle<0 || cycle>cycles_no) return -1;
244      return startCycleTime[cycle];
245   }
246     
247   /**
248    * returns the computations of method run1(). In this case the avrage response
249    * time of calls to the server.
250    */
251   public static float getRespTime(int thread, int cycle) {
252     //if(thread>0 && thread <threads_no && cycle>=0 && cycle < cycles_no)
253       return AvRespTime[thread][cycle];
254     //else return 0;
255   }
256 
257 } // END ClientThreadPool