1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.mss;
20
21 import java.io.IOException;
22 import java.net.DatagramPacket;
23 import java.net.InetAddress;
24 import java.net.MulticastSocket;
25 import java.net.SocketException;
26
27 import jgroup.core.EndPoint;
28
29 import org.apache.log4j.Logger;
30 import org.apache.log4j.NDC;
31
32 /**
33 * The <code>MulticastNetworkInterface</code> class implements a
34 * multicast version of the network interface.
35 *
36 * @author Salvatore Cammarata
37 * @author Hein Meling
38 * @author Marcin Solarski
39 * @since Jgroup 1.2
40 */
41 final class MulticastNetworkInterface
42 extends Thread
43 implements NetworkInterface, MssConstants
44 {
45
46 ////////////////////////////////////////////////////////////////////////////////////////////
47 // Logger
48 ////////////////////////////////////////////////////////////////////////////////////////////
49
50 /** Obtain logger for this class */
51 private static final Logger log = Logger.getLogger(MulticastNetworkInterface.class);
52
53
54 ////////////////////////////////////////////////////////////////////////////////////////////
55 // Fields
56 ////////////////////////////////////////////////////////////////////////////////////////////
57
58 /** Upper layer */
59 private NIListener niListener;
60
61 /** Multicast internet address */
62 private InetAddress address;
63
64 /** Multicast Socket */
65 private MulticastSocket group;
66
67 /** Multicast port */
68 private int port;
69
70 /** Receive buffer length */
71 private int bufferlen;
72
73 /** True when the thread should continue to run */
74 private boolean carryOn;
75
76
77 ////////////////////////////////////////////////////////////////////////////////////////////
78 // Constructor
79 ////////////////////////////////////////////////////////////////////////////////////////////
80
81 /**
82 * Constructs and starts the multicast network interface layer.
83 *
84 * @param name
85 * Thread identifier.
86 * @param niListener
87 * Reference to the upper listener layer through which multicast
88 * receive events are passed as upcall invocations.
89 * @param mcastEndpoint
90 * Multicast group address.
91 * @param bufferlen
92 * Size of receive buffer.
93 */
94 MulticastNetworkInterface(String name, NIListener niListener, EndPoint mcastEndpoint, int bufferlen, int multicastTTL)
95 throws IOException
96 {
97 /* Initialize thread */
98 super(name);
99 this.setPriority(NI_PRIORITY);
100 this.setDaemon(true);
101
102 if (!mcastEndpoint.isMulticastEndPoint())
103 throw new IOException("Endpoint is not a multicast endpoint: " + mcastEndpoint);
104
105 this.niListener = niListener;
106 this.bufferlen = bufferlen;
107 this.address = mcastEndpoint.getAddress();
108 this.port = mcastEndpoint.getPort();
109 this.carryOn = true;
110
111 /* Initialize multicast socket for this endpoint. */
112 boolean partitionSimulator = Boolean.getBoolean("jgroup.simulator");
113 try {
114 if (partitionSimulator)
115 group = new UnreliableMulticastSocket(port);
116 else
117 group = new MulticastSocket(port);
118 } catch (SocketException e) {
119 throw new IOException("Error initializing multicast socket: " + mcastEndpoint
120 + "\n" + e.getMessage());
121 }
122
123 try {
124 /* Join the multicast group for this endpoint address. */
125 group.joinGroup(address);
126 } catch (IOException e) {
127 throw new IOException("Unable to join multicast group: " + address);
128 }
129
130 if (multicastTTL > 0 && multicastTTL < 17)
131 group.setTimeToLive(multicastTTL);
132 else
133 log.error("Multicast TTL must be in the range [1,16].");
134 if (log.isDebugEnabled()) {
135 log.debug("Multicast TTL = " + group.getTimeToLive());
136 log.debug("Multicast socket initialized: " + mcastEndpoint + " " + bufferlen);
137 }
138 }
139
140
141 ////////////////////////////////////////////////////////////////////////////////////////////
142 // Methods from NetworkInterface
143 ////////////////////////////////////////////////////////////////////////////////////////////
144
145 // Javadoc comment inherited from NetworkInterface
146 public void send(byte[] buffer, int buflen)
147 {
148 DatagramPacket packet = new DatagramPacket(buffer, buflen, address, port);
149
150 try {
151 group.send(packet);
152 } catch (IOException e) {
153 log.warn("Error sending packet to ALL", e);
154 }
155 }
156
157
158 // Javadoc comment inherited from NetworkInterface
159 public void send(EndPoint dest, byte[] buffer, int buflen)
160 {
161 DatagramPacket packet =
162 new DatagramPacket(buffer, buflen, dest.getAddress(), dest.getPort());
163
164 try {
165 group.send(packet);
166 } catch (IOException e) {
167 log.warn("Error sending packet to " + dest, e);
168 }
169 }
170
171
172 // Javadoc comment inherited by NetworkInterface
173 public void doStart()
174 {
175 start();
176 }
177
178
179 // Javadoc comment inherited by NetworkInterface
180 public void doStop()
181 {
182 carryOn = false;
183 }
184
185
186 ////////////////////////////////////////////////////////////////////////////////////////////
187 // Methods from Object
188 ////////////////////////////////////////////////////////////////////////////////////////////
189
190 /**
191 * Clean up system resources used by this class. This method will be
192 * invoked when there are no more references to this object, and thus
193 * it should be garbage collected.
194 */
195 protected void finalize()
196 {
197 if (group != null) {
198 try {
199 group.leaveGroup(address);
200 } catch (IOException e) {
201 log.warn("Unable to cleanly leave the multicast group: " + address, e);
202 }
203 group.close();
204 group = null;
205 }
206 carryOn = false;
207 }
208
209
210 ////////////////////////////////////////////////////////////////////////////////////////////
211 // Methods from Thread
212 ////////////////////////////////////////////////////////////////////////////////////////////
213
214 /**
215 * Run method of the thread. It blocks until a
216 * <code>DatagramPacket</code> is received from the network.
217 */
218 public void run()
219 {
220 while (carryOn) {
221 try {
222 byte[] buf = new byte[bufferlen];
223 DatagramPacket packet = new DatagramPacket(buf, bufferlen);
224 group.receive(packet);
225 InetAddress sender = packet.getAddress();
226 if (packet.getLength() > 0) {
227 if (log.isDebugEnabled()) {
228 NDC.push(sender.getHostName());
229 log.debug("multicast packet.length=" + packet.getLength()
230 + ", sender=" + sender.getHostName()
231 + ", mcast-group=" + address.getHostAddress());
232 }
233 niListener.rnotify(packet);
234 } else if (log.isDebugEnabled()) {
235 NDC.push(sender.getHostName());
236 /*
237 * The <code>UnreliableMulticastSocket</code> has set the packet size to zero,
238 * which means that we should discard it.
239 */
240 log.debug("Discarding multicast packet from " + sender.getHostName());
241 }
242 } catch (IOException e) {
243 log.warn("Error receiving packets", e);
244 } finally {
245 if (log.isDebugEnabled())
246 NDC.pop();
247 }
248 }
249 /* The thread was stopped, cleaning up system resources. */
250 this.finalize();
251 }
252
253 } // END MulticastNetworkInterface