View Javadoc

1   /*
2    * Copyright (c) 1998-2005 The Jgroup Team.
3    *
4    * This program is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU Lesser General Public License version 2 as
6    * published by the Free Software Foundation.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Lesser General Public License for more details.
12   *
13   * You should have received a copy of the GNU Lesser General Public License
14   * along with this program; if not, write to the Free Software
15   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16   *
17   */
18  package jgroup.relacs.mss;
19  
20  import java.net.DatagramPacket;
21  import java.util.concurrent.Executors;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.concurrent.ScheduledFuture;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  
27  import jgroup.relacs.events.Event;
28  import jgroup.util.ThreadMonitor;
29  
30  import org.apache.log4j.Logger;
31  
32  /**
33   *  The <code>EventHandler</code> class handles all events, packets and
34   *  timeout events generated by the various mss layer components and by
35   *  the network interface threads.
36   *
37   *  @author Hein Meling
38   *  @since Jgroup 3.0
39   */
40  public class EventHandler
41    implements NIListener
42  {
43  
44    ////////////////////////////////////////////////////////////////////////////////////////////
45    // Logger
46    ////////////////////////////////////////////////////////////////////////////////////////////
47  
48    /** Obtain logger for this class */
49    private static final Logger log = Logger.getLogger(EventHandler.class);
50  
51  
52    ////////////////////////////////////////////////////////////////////////////////////////////
53    // Fields
54    ////////////////////////////////////////////////////////////////////////////////////////////
55  
56    /** Upper layer */
57    private final EhandlerUser ehuser;
58  
59    /** The executor used to schedule events, timeouts and packets */
60    private final ScheduledExecutorService executorService;
61  
62  
63    ////////////////////////////////////////////////////////////////////////////////////////////
64    // Constructor
65    ////////////////////////////////////////////////////////////////////////////////////////////
66  
67    /**
68     * Constructs and starts the event handler.
69     *  
70     * @param ehuser
71     *   reference to the receiver of upcall events.
72     */
73    EventHandler(EhandlerUser ehuser)
74    {
75      /* Initialize the upper layer reference */
76      this.ehuser = ehuser;
77      /* Thread management */
78      ThreadFactory tFactory = new ThreadFactory() {
79        public Thread newThread(Runnable r) {
80          Thread ehandler = new Thread(r, "MssEventHandler");
81          ehandler.setDaemon(true);
82          ThreadMonitor.add(ehandler);
83          return ehandler;
84        }
85      };
86      executorService = Executors.newSingleThreadScheduledExecutor(tFactory);
87    }
88  
89  
90    ////////////////////////////////////////////////////////////////////////////////////////////
91    // Methods
92    ////////////////////////////////////////////////////////////////////////////////////////////
93  
94    /** 
95     * Stops the EventHandler task execution.
96     */
97    void kill()
98    {
99      executorService.shutdown();
100   }
101 
102   /**
103    * Schedule remote packet for delivery up to the mss layer.
104    * 
105    * @see jgroup.relacs.mss.NIListener#rnotify(java.net.DatagramPacket)
106    */
107   public void rnotify(final DatagramPacket msg)
108   {
109     Runnable task = new Runnable() {
110       /* Insert the packet in the upcall queue */
111       public void run() {
112         if (log.isDebugEnabled())
113           log.debug("remote msg: " + msg);
114         ehuser.rreceive(msg);
115         if (log.isDebugEnabled())
116           log.debug("remote msg execution completed");
117       }
118     };
119     executorService.submit(task);
120     if (log.isDebugEnabled())
121       log.debug("remote msg submitted for execution");
122   }
123 
124   /**
125    * Schedule event for delivery down to the mss layer.
126    * 
127    * @param event the event to be passed to the mss layer.
128    */
129   void lnotify(final Event event)
130   {
131     Runnable task = new Runnable() {
132       /* Insert the event in the downcall queue */
133       public void run() {
134         if (log.isDebugEnabled())
135           log.debug("local msg: " + event);
136         ehuser.lreceive(event);
137         if (log.isDebugEnabled())
138           log.debug("local msg execution completed");
139       }
140     };
141     executorService.submit(task);
142     if (log.isDebugEnabled())
143       log.debug("local msg submitted for execution");
144   }
145 
146   /**
147    * Schedule a new timeout event.  The event will be handled
148    * appropriately at the given time.
149    *
150    * @param timeout
151    *   Timeout length in milliseconds till the event should be handled.
152    * @param event
153    *   The event description to associate this timeout value with.
154    */
155   ScheduledEvent setTimeout(long timeout, final ScheduledEvent event)
156   {
157     Runnable task = new Runnable() {
158       /* Insert the event in the downcall queue */
159       public void run() {
160         if (log.isDebugEnabled())
161           log.debug("Delivering timeout event: " + event);
162         ehuser.treceive(event);
163         if (log.isDebugEnabled())
164           log.debug("Timeout execution completed");
165       }
166     };
167     ScheduledFuture handle = executorService.schedule(task, timeout, TimeUnit.MILLISECONDS);
168     event.setTimeout(timeout, handle);
169     return event;
170   }
171 
172 
173   /**
174    * Abort the timeout associated to the specified event.
175    */
176   void abortTimeout(ScheduledEvent event)
177   {
178     if (event != null)
179       event.abortTimeout();
180   }
181   
182 } // END EventHandler