1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package jgroup.relacs.daemon;
20
21 import java.rmi.NoSuchObjectException;
22 import java.rmi.RemoteException;
23 import java.util.Collections;
24 import java.util.LinkedList;
25 import java.util.Queue;
26
27 import jgroup.core.MemberId;
28 import jgroup.core.View;
29 import jgroup.core.multicast.AckListener;
30
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41 final class SCopyOfSendBuffer
42 {
43
44
45
46
47
48
49 private static final Logger log = Logger.getLogger(SCopyOfSendBuffer.class);
50
51
52
53
54
55
56 @SuppressWarnings("unchecked")
57 private final Queue<MsgMcast> buffer = (Queue) Collections.synchronizedList(new LinkedList<MsgMcast>());;
58 @SuppressWarnings("unchecked")
59 private final Queue<MsgResult> ackbuffer = (Queue) Collections.synchronizedList(new LinkedList<MsgResult>());
60 private View view;
61 private long vid;
62 private int hlen;
63 private int nmessages;
64
65
66
67
68
69 SCopyOfSendBuffer() {}
70
71
72
73
74
75
76
77
78
79 void setView(View view, int hlen)
80 {
81 if (log.isDebugEnabled())
82 log.debug("Flushing buffers");
83 this.view = view;
84 this.hlen = hlen;
85 vid = view.getVid();
86
87 nmessages = 0;
88
89
90 for (MsgMcast msg : buffer) {
91 AckListener ackl = msg.ackl;
92 if (ackl != null) {
93 try {
94 if (log.isDebugEnabled())
95 log.debug("viewChange to AckListener");
96 ackl.viewChange();
97 if (log.isDebugEnabled())
98 log.debug("viewChange to AckListener completed");
99 } catch (NoSuchObjectException e) {
100
101
102
103
104
105
106 } catch (RemoteException e) {
107 log.warn("Failed to notify the AckListener of view change: " + ackl, e);
108 }
109 }
110 }
111 buffer.clear();
112 MsgResult msgr;
113 while ((msgr = (MsgResult) ackbuffer.poll()) != null) {
114 if (msgr.vid == vid)
115 insertMsgResult(msgr);
116 }
117 }
118
119
120
121
122
123 void insertMsgMcast(MsgMcast msg)
124 {
125 try {
126 if (msg.ackl != null) {
127 if (log.isDebugEnabled())
128 log.debug("notifyView to AckListener");
129 msg.ackl.notifyView(view);
130 if (log.isDebugEnabled())
131 log.debug("notifyView completed");
132 }
133 } catch (RemoteException e) {
134 log.warn("Failed to notify the AckListener of view change.", e);
135 }
136 buffer.add(msg);
137 msg.setAckArray(hlen);
138 }
139
140
141
142
143
144 void insertMsgResult(MsgResult result)
145 {
146 if (result.vid == vid) {
147 if (log.isDebugEnabled())
148 log.debug("Inserting result (same view): " + result);
149 for (MsgMcast message : buffer) {
150 if (message.mid <= result.dlvr)
151 message.setAck(result.hpos);
152 if (message.mid == result.mid && message.sender.equals(result.sender)) {
153 MemberId[] members = view.getMembers();
154 log.assertLog(members.length > result.vpos,
155 "BAD: Trying to send ack, but view seems to have less members: " + view);
156 try {
157 if (message.ackl != null)
158 message.ackl.ack(members[result.vpos], result.vpos, result.result);
159 } catch (RemoteException e) {
160 log.warn("Failed to ACK the message.", e);
161 }
162 }
163 }
164 removeAcknowledged();
165 } else {
166 if (log.isDebugEnabled())
167 log.debug("Inserting result (different view): " + result);
168 ackbuffer.add(result);
169 }
170 }
171
172
173
174
175
176
177
178
179
180 void suspect(HostData host)
181 {
182 if (host.hasValidViewIndex()) {
183 int hpos = host.getViewIndex();
184 for (MsgMcast message : buffer) {
185 message.setAck(hpos);
186 }
187 removeAcknowledged();
188 }
189 }
190
191
192
193
194
195
196 private void removeAcknowledged()
197 {
198 MsgMcast message;
199 while ((message = (MsgMcast) buffer.peek()) != null) {
200 if (message.isComplete())
201 buffer.poll();
202 else
203 break;
204 }
205 }
206
207
208 int getMid()
209 {
210 return ++nmessages;
211 }
212
213 boolean isEmpty()
214 {
215 return buffer.isEmpty();
216 }
217
218
219 }