View Javadoc

1   /*
2    * Copyright (c) 1998-2004 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  
19  package jgroup.test.multicast;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.ObjectInputStream;
24  import java.io.ObjectOutputStream;
25  import java.io.OutputStream;
26  import java.rmi.RemoteException;
27  
28  import jgroup.core.GroupManager;
29  import jgroup.core.JgroupException;
30  import jgroup.core.MemberId;
31  import jgroup.core.MembershipListener;
32  import jgroup.core.View;
33  import jgroup.core.multicast.AckListener;
34  import jgroup.core.multicast.MulticastListener;
35  import jgroup.core.multicast.MulticastService;
36  import jgroup.relacs.gm.DispatcherService;
37  
38  /**
39   * Simple multicast/total order test server.
40   *
41   * @author Hein Meling
42   * @since Jgroup 2.2
43   */
44  public class TotalOrderTestSend
45    implements MulticastListener, MembershipListener
46  {
47  
48    ////////////////////////////////////////////////////////////////////////////////////////////
49    // Static fields
50    ////////////////////////////////////////////////////////////////////////////////////////////
51  
52    /** Protocol name used to distinguish messages from other protocols. */
53    private static final String PROTOCOL_NAME = "Total2";
54  
55  
56    ////////////////////////////////////////////////////////////////////////////////////////////
57    // Fields
58    ////////////////////////////////////////////////////////////////////////////////////////////
59  
60    private volatile boolean waitForView = false;
61  
62  
63    ////////////////////////////////////////////////////////////////////////////////////////////
64    // Constructor
65    ////////////////////////////////////////////////////////////////////////////////////////////
66  
67    private TotalOrderTestSend()
68      throws JgroupException, IOException
69    {
70      GroupManager gm = GroupManager.getGroupManager(this);
71      final DispatcherService dispatcher =
72        (DispatcherService) gm.getService(DispatcherService.class);
73      final MulticastService multicastService =
74        (MulticastService) gm.getService(MulticastService.class);
75      Thread t = new Thread() {
76        public void run() {
77          byte[] b = new byte[100];
78          boolean doMcast;
79          while (true) {
80            doMcast = true;
81            System.out.println("Here");
82            try {
83              OutputStream bout = multicastService.getMessage(PROTOCOL_NAME);
84              ObjectOutputStream out = new ObjectOutputStream(bout);
85              out.writeUTF("Hei du!");
86              out.close();
87  //            SynchAckListener ackListener = new SynchAckListener(dispatcher);
88              GroupAckListener ackListener = new GroupAckListener();
89              while (!waitForView && doMcast) {
90                System.out.println("cnt=" + cnt++);
91                multicastService.mcast(bout, ackListener);
92                doMcast = false;
93              }
94              Thread.sleep(200);
95              System.out.println(ackListener.getResult());
96  //            Object[] results = (Object[]) ackListener.getResults();
97  //            for (int i = 0; i < results.length; i++) {
98  //              System.out.println("i=" + i + ": " + results[i]);
99  //            }
100           } catch (Exception e) {
101             e.printStackTrace();
102           }
103         }
104       }
105     };
106     t.start();  
107   }
108 
109 int cnt = 0;
110   ////////////////////////////////////////////////////////////////////////////////////////////
111   // Main method
112   ////////////////////////////////////////////////////////////////////////////////////////////
113 
114   public static void main(String[] argv)
115     throws Exception
116   {
117     TotalOrderTestSend tots = new TotalOrderTestSend();
118   }
119 
120   ////////////////////////////////////////////////////////////////////////////////////////////
121   // Methods from MembershipListener
122   ////////////////////////////////////////////////////////////////////////////////////////////
123 
124   public void viewChange(View view)
125   {
126     System.out.println("View: " + view);
127     waitForView = false;
128   }
129 
130   public void prepareChange()
131   {
132     waitForView = true;
133   }
134 
135   public void hasLeft() { }
136 
137 
138   ////////////////////////////////////////////////////////////////////////////////////////////
139   // Methods from MulticastListener
140   ////////////////////////////////////////////////////////////////////////////////////////////
141 
142   /**
143    *  Returns a string naming the protocol implemented by this multicast
144    *  listener.
145    */
146   public String getProtocolName() 
147   {
148     return PROTOCOL_NAME;
149   }
150 
151   public Object deliverStream(InputStream stream, MemberId sender, int seqNo)
152   {
153     try {
154       ObjectInputStream ios = new ObjectInputStream(stream);
155       String s = ios.readUTF();
156 //      System.out.println("TotalOrderTestSend: deliverStream: " + s);
157     } catch (IOException e) {
158       e.printStackTrace();
159     }
160     return "Hei du og! " + sender + ": " + seqNo;
161   }
162 
163 
164   public Object deliverObject(Object obj, MemberId sender, int seqNo)
165   {
166     System.out.println("MulticastTestServer: deliverObject: " + obj);
167     return null;
168   }
169 
170   ////////////////////////////////////////////////////////////////////////////////////////////
171   // Nested class implementing the AckListener interface
172   ////////////////////////////////////////////////////////////////////////////////////////////
173 
174   /** Handles multicast return messages */
175   private class GroupAckListener
176     implements AckListener
177   {
178 
179     /** States */
180     private static final int STARTING  = 1;
181     private static final int WAITING   = 2;
182     private static final int COMPLETED = 3;
183 
184     /** Current state */
185     private int state = STARTING;
186 
187     /** Number of unacknowledge messages */
188     private int missing;
189 
190     /** Results from servers */
191     private Object[] results;
192 
193     /** Flag completed servers */
194     private boolean[] completed;
195 
196     /** Update internal state */
197     private synchronized void updateState()
198     {
199       if (missing==0) {
200         state = COMPLETED;
201         notifyAll();
202       }
203     }
204 
205     /* (non-javadoc)
206      * @see jgroup.core.multicast.AckListener#ack(MemberId, int, Object)
207      */
208     public void ack(MemberId id, int pos, Object obj)
209       throws RemoteException 
210     {
211       System.out.println("Recieved ACK from "+id+", pos="+pos+", with reply " + obj);
212 
213       results[pos] = obj;
214       completed[pos] = true;
215       missing--;
216       updateState();
217     }
218 
219     /* (non-javadoc)
220      * @see jgroup.core.multicast.AckListener#notifyView(View)
221      */
222     public void notifyView(View view)
223       throws RemoteException
224     {
225       missing = view.size();
226       results = new Object[missing];
227       completed = new boolean[missing];
228       state = WAITING;
229     }
230 
231     /* (non-javadoc)
232      * @see jgroup.core.multicast.AckListener#viewChange()
233      */
234     public void viewChange()
235       throws RemoteException
236     {
237       missing = 0;
238       updateState();
239     }
240     
241     /** Retrieves the invocation result */
242     public synchronized Object getResult()
243     {
244       while (state != COMPLETED) {
245         try { wait(); } catch (InterruptedException iex) {}
246       }
247 
248       // Select first result
249       for (int i = 0; i < results.length; ++i)
250         if (completed[i])
251           return results[i];
252 
253       // No results returned
254       System.out.println("No results!");
255       return null;      
256     }
257 
258   } // END GroupAckListener
259 
260 } // END MulticastObjectServer