1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21 import java.rmi.NoSuchObjectException;
22 import java.rmi.RemoteException;
23 import java.util.Iterator;
24
25 import jgroup.core.MemberId;
26 import jgroup.core.View;
27 import jgroup.core.multicast.AckListener;
28 import jgroup.util.Queue;
29
30 import org.apache.log4j.Logger;
31
32
33
34
35
36
37
38
39
40 final class SendBuffer
41 {
42
43
44
45
46
47
48 private static final Logger log = Logger.getLogger(SendBuffer.class);
49
50
51
52
53
54
55 private final Queue buffer = new Queue();
56 private final Queue ackbuffer = new Queue();
57 private View view;
58 private long vid;
59 private int hlen;
60 private int nmessages;
61
62
63
64
65
66 SendBuffer() { }
67
68
69
70
71
72
73
74
75
76 void setView(View view, int hlen)
77 {
78 if (log.isDebugEnabled())
79 log.debug("Flushing buffers");
80 this.view = view;
81 this.hlen = hlen;
82 vid = view.getVid();
83
84 nmessages = 0;
85
86
87 for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
88 AckListener ackl = ((MsgMcast) iter.next()).ackl;
89 if (ackl != null) {
90 try {
91 if (log.isDebugEnabled())
92 log.debug("viewChange to AckListener");
93 ackl.viewChange();
94 if (log.isDebugEnabled())
95 log.debug("viewChange to AckListener completed");
96 } catch (NoSuchObjectException e) {
97
98
99
100
101
102
103 } catch (RemoteException e) {
104 log.warn("Failed to notify the AckListener of view change: " + ackl, e);
105 }
106 }
107 }
108
109 buffer.clear();
110 MsgResult msgr;
111 while ((msgr = (MsgResult) ackbuffer.removeFirst()) != null) {
112 if (msgr.vid == vid)
113 insertMsgResult(msgr);
114 }
115 }
116
117
118
119
120
121 void insertMsgMcast(MsgMcast msg)
122 {
123 try {
124 if (msg.ackl != null) {
125 if (log.isDebugEnabled())
126 log.debug("notifyView to AckListener");
127 msg.ackl.notifyView(view);
128 if (log.isDebugEnabled())
129 log.debug("notifyView completed");
130 }
131 } catch (RemoteException e) {
132 log.warn("Failed to notify the AckListener of view change.", e);
133 }
134 buffer.insert(msg);
135 msg.setAckArray(hlen);
136 }
137
138
139
140
141
142 void insertMsgResult(MsgResult result)
143 {
144 if (result.vid == vid) {
145 if (log.isDebugEnabled())
146 log.debug("Inserting result (same view): " + result);
147 for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
148 MsgMcast message = (MsgMcast) iter.next();
149 if (message.mid <= result.dlvr)
150 message.setAck(result.hpos);
151 if (message.mid == result.mid && message.sender.equals(result.sender)) {
152 MemberId[] members = view.getMembers();
153 log.assertLog(members.length > result.vpos,
154 "BAD: Trying to send ack, but view seems to have less members: " + view);
155 try {
156 if (message.ackl != null)
157 message.ackl.ack(members[result.vpos], result.vpos, result.result);
158 } catch (RemoteException e) {
159 log.warn("Failed to ACK the message.", e);
160 }
161 }
162 }
163 removeAcknowledged();
164 } else {
165 if (log.isDebugEnabled())
166 log.debug("Inserting result (different view): " + result);
167 ackbuffer.insert(result);
168 }
169 }
170
171
172
173
174
175
176
177
178
179 void suspect(HostData host)
180 {
181 if (host.hasValidViewIndex()) {
182 int hpos = host.getViewIndex();
183 for (Iterator iter = buffer.iterator(); iter.hasNext(); ) {
184 MsgMcast message = (MsgMcast) iter.next();
185 message.setAck(hpos);
186 }
187 removeAcknowledged();
188 }
189 }
190
191
192
193
194
195
196 private void removeAcknowledged()
197 {
198 MsgMcast message;
199
200 while ((message = (MsgMcast) buffer.getFirst()) != null) {
201 if (message.isComplete())
202 buffer.removeFirst();
203 else
204 break;
205 }
206 }
207
208
209 int getMid()
210 {
211 return ++nmessages;
212 }
213
214 boolean isEmpty()
215 {
216 return buffer.isEmpty();
217 }
218
219
220 }