1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.relacs.daemon;
20
21 import java.io.IOException;
22 import java.io.ObjectInput;
23 import java.io.OutputStream;
24
25 import jgroup.core.MemberId;
26 import jgroup.core.multicast.AckListener;
27 import jgroup.relacs.events.MulticastRequest;
28 import jgroup.relacs.mss.MssConstants;
29 import jgroup.relacs.types.Flag;
30 import jgroup.relacs.types.GroupId;
31 import jgroup.relacs.types.GroupIndex;
32 import jgroup.relacs.types.MemberIdImpl;
33 import jgroup.relacs.types.MessageId;
34 import jgroup.relacs.types.MessageLen;
35 import jgroup.relacs.types.ViewId;
36 import jgroup.util.InMessage;
37 import jgroup.util.MsgFactory;
38 import jgroup.util.OutMessage;
39
40 /**
41 * The <code>MsgMcast</code> class
42 *
43 * @author Alberto Montresor
44 * @since Jgroup 0.1
45 */
46 public final class MsgMcast
47 implements MssConstants, DaemonMsg
48 {
49
50 ////////////////////////////////////////////////////////////////////////////////////////////
51 // Position constants
52 ////////////////////////////////////////////////////////////////////////////////////////////
53
54 private static final int START_FIXED = MSS_HEADER_SIZE;
55
56 private static final int FIXED_SIZE = GroupId.SIZE + Flag.SIZE*2 + MemberIdImpl.SIZE + MessageLen.SIZE;
57 private static final int START_VAR = START_FIXED + FIXED_SIZE;
58
59 private static final int VAR_SIZE = GroupIndex.SIZE + MessageId.SIZE + ViewId.SIZE;
60 private static final int START_DATA = START_VAR + VAR_SIZE;
61
62
63 ////////////////////////////////////////////////////////////////////////////////////////////
64 // Serialized fields
65 ////////////////////////////////////////////////////////////////////////////////////////////
66
67 /** True if this message carries a single object */
68 boolean isObject;
69
70 /** True if an ack is required */
71 boolean ackr;
72
73 /** Identifier of the sender */
74 MemberId sender;
75
76 /** Position of the sender in view identified by <code>vid</code> */
77 int hpos;
78
79 /** View identifier */
80 long vid;
81
82 /** Message identifier */
83 int mid;
84
85 /** Ack listener */
86 AckListener ackl;
87
88 /** Message to be sent */
89 private OutMessage outmsg;
90
91 /** Message received from the Mss */
92 private InMessage inmsg;
93
94 /** Set of acks associated with this message */
95 // FIXME: Should be moved in an external datastructure
96 private boolean[] ackset;
97
98 /** Number of false values in ackset */
99 private int nfalse;
100
101 /** MsgMcast message size (excluding the mss header) */
102 private int size;
103
104
105 ////////////////////////////////////////////////////////////////////////////////////////////
106 // Constructors and additional methods
107 ////////////////////////////////////////////////////////////////////////////////////////////
108
109 /*
110 * Constructs a <code>MsgMcast</code> data structure starting from a
111 * m-received message.
112 */
113 private MsgMcast(InMessage inmsg)
114 throws IOException, ClassNotFoundException
115 {
116 isObject = Flag.unmarshal(inmsg);
117 ackr = Flag.unmarshal(inmsg);
118 sender = new MemberIdImpl();
119 sender.readExternal(inmsg);
120 size = MessageLen.unmarshal(inmsg);
121 hpos = GroupIndex.unmarshal(inmsg);
122 vid = ViewId.unmarshal(inmsg);
123 mid = MessageId.unmarshal(inmsg);
124
125 this.inmsg = inmsg;
126 inmsg.seek(START_DATA);
127 outmsg = new OutMessage(inmsg);
128 }
129
130
131 /**
132 * Creates an uncompleted <code>MsgMcast</code> object; other fields
133 * will be set when the information will be available.
134 */
135 MsgMcast(MulticastRequest mcastRequest)
136 throws IOException
137 {
138 this.outmsg = mcastRequest.getPayload();
139 /*
140 * Computes the total size of this MsgMcast message, including that
141 * application level data of the mcast. This excludes the size
142 * of the mss header fields. Note that we can use the outmsg.size()
143 * method to compute the application data size, since only app data
144 * has been written to the outmsg stream.
145 */
146 size = outmsg.size() + FIXED_SIZE + VAR_SIZE;
147
148 // Store info for local message
149 this.isObject = mcastRequest.isObject();
150 this.ackl = mcastRequest.getAckListener();
151 this.ackr = (ackl != null);
152 this.sender = mcastRequest.getSender();
153
154 // Writes fixed part of the header
155 outmsg.seek(START_FIXED);
156 GroupId.marshal(outmsg, mcastRequest.getGid());
157 Flag.marshal(outmsg, isObject);
158 Flag.marshal(outmsg, ackr);
159 sender.writeExternal(outmsg);
160 MessageLen.marshal(outmsg, size);
161 }
162
163
164 /**
165 * Complete the message structure with the data related to the view
166 * in which the message is sent.
167 *
168 * @param hpos host index in the view.
169 * @param vid view identifier.
170 * @param mid message identifier.
171 */
172 void complete(int hpos, long vid, int mid)
173 throws IOException
174 {
175 // Writes variable part of the header
176 outmsg.seek(START_VAR);
177 GroupIndex.marshal(outmsg, hpos);
178 ViewId.marshal(outmsg, vid);
179 MessageId.marshal(outmsg, mid);
180
181 // Store info for local message
182 this.vid = vid;
183 this.hpos = hpos;
184 this.mid = mid;
185
186 inmsg = new InMessage(outmsg);
187 }
188
189
190 ////////////////////////////////////////////////////////////////////////////////////////////
191 // Static methods
192 ////////////////////////////////////////////////////////////////////////////////////////////
193
194 /**
195 *
196 */
197 public static OutputStream createOutputStream()
198 {
199 OutMessage stream = MsgFactory.get();
200 stream.seek(START_DATA);
201 return stream;
202 }
203
204
205 /**
206 * Returns a <code>MsgMcast</code> object that contains the decoding
207 * of the m-received input stream.
208 *
209 * @param inmsg the message input stream to decode
210 */
211 static MsgMcast unmarshal(ObjectInput inmsg)
212 throws IOException, ClassNotFoundException
213 {
214 return new MsgMcast((InMessage) inmsg);
215 }
216
217
218 /**
219 *
220 */
221 InMessage getInMessage()
222 {
223 /*
224 * This must be synchronized on the inmsg object so as to avoid
225 * concurrent access to the inmsg object, since most methods in
226 * the InMessage class modify its state (the position field in
227 * particular), including methods such as seek(), read(), insert()
228 * and so on. Such concurrent access may occur when two competing
229 * threads (Ehandler and Dispatcher) want to access the inmsg object.
230 * For example, one thread may be reading the inmsg object for
231 * delivery to an upper-layer (see IntGroupHandler.deliverStream()),
232 * while another thread wants to send the inmsg to another member of
233 * the group, and thus needs to seek to the START_DATA position;
234 * see MsgMcast.getInMessage().
235 */
236 synchronized (inmsg) {
237 inmsg.seek(START_DATA);
238 return inmsg;
239 }
240 }
241
242
243 ////////////////////////////////////////////////////////////////////////////////////////////
244 // Methods from DaemonMsg
245 ////////////////////////////////////////////////////////////////////////////////////////////
246
247 public int size()
248 {
249 return size;
250 }
251
252 public OutMessage getOutMessage()
253 {
254 return outmsg;
255 }
256
257
258 ////////////////////////////////////////////////////////////////////////////////////////////
259 // Ack management
260 ////////////////////////////////////////////////////////////////////////////////////////////
261
262 void setAckArray(int len)
263 {
264 ackset = new boolean[len];
265 nfalse = len;
266 }
267
268 void setAck(int pos)
269 {
270 if (!ackset[pos])
271 nfalse--;
272 ackset[pos] = true;
273 }
274
275 boolean isComplete()
276 {
277 return (nfalse == 0);
278 }
279
280
281 /**
282 * Returns a string representation of this object
283 */
284 public String toString()
285 {
286 StringBuilder b = new StringBuilder();
287 b.append("[MsgMcast: ");
288 b.append("mid=");
289 b.append(mid);
290 b.append(", sender=");
291 b.append(sender);
292 b.append(", vid=");
293 b.append(vid);
294 if (inmsg != null) {
295 b.append(", ");
296 b.append(inmsg);
297 }
298 b.append("]");
299 return b.toString();
300 }
301
302 } // END MsgMcast