1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.composite;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.TimeoutExpiredException;
24 import org.codehaus.activemq.message.Packet;
25 import org.codehaus.activemq.message.PacketListener;
26 import org.codehaus.activemq.message.Receipt;
27 import org.codehaus.activemq.message.WireFormat;
28 import org.codehaus.activemq.transport.TransportChannel;
29 import org.codehaus.activemq.transport.TransportChannelProvider;
30 import org.codehaus.activemq.transport.TransportChannelSupport;
31
32 import javax.jms.ExceptionListener;
33 import javax.jms.JMSException;
34 import java.net.URI;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.List;
38
39 /***
40 * A Compsite implementation of a TransportChannel
41 *
42 * @version $Revision: 1.20 $
43 */
44 public class CompositeTransportChannel extends TransportChannelSupport {
45 private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
46
47 protected WireFormat wireFormat;
48 protected List uris;
49 protected TransportChannel channel;
50 protected SynchronizedBoolean closed;
51 protected SynchronizedBoolean started;
52 protected int maximumRetries = 10;
53 protected long failureSleepTime = 500L;
54 protected URI currentURI;
55 private long establishConnectionTimeout = 30000L;
56
57
58 public CompositeTransportChannel(WireFormat wireFormat) {
59 this.wireFormat = wireFormat;
60 this.uris = Collections.synchronizedList(new ArrayList());
61 closed = new SynchronizedBoolean(false);
62 started = new SynchronizedBoolean(false);
63 }
64
65 public CompositeTransportChannel(WireFormat wireFormat, List uris) {
66 this(wireFormat);
67 this.uris.addAll(uris);
68 }
69
70 public String toString() {
71 return "CompositeTransportChannel: " + channel;
72 }
73
74 public void start() throws JMSException {
75 if (started.commit(false, true)) {
76 establishConnection(establishConnectionTimeout);
77 }
78 }
79
80 /***
81 * close the channel
82 */
83 public void stop() {
84 if (closed.commit(false, true)) {
85 if (channel != null) {
86 try {
87 channel.stop();
88 }
89 catch (Exception e) {
90 log.warn("Caught while closing: " + e + ". Now Closed", e);
91 }
92 finally {
93 channel = null;
94 super.stop();
95 }
96 }
97 }
98 }
99
100 public Receipt send(Packet packet) throws JMSException {
101 return getChannel().send(packet);
102 }
103
104
105 public Receipt send(Packet packet, int timeout) throws JMSException {
106 return getChannel().send(packet, timeout);
107 }
108
109
110 public void asyncSend(Packet packet) throws JMSException {
111 getChannel().asyncSend(packet);
112 }
113
114 public void setPacketListener(PacketListener listener) {
115 super.setPacketListener(listener);
116 if (channel != null) {
117 channel.setPacketListener(listener);
118 }
119 }
120
121
122 public void setExceptionListener(ExceptionListener listener) {
123 super.setExceptionListener(listener);
124 if (channel != null) {
125 channel.setExceptionListener(listener);
126 }
127 }
128
129
130 public boolean isMulticast() {
131 return false;
132 }
133
134
135
136
137
138 /***
139 * Return the maximum amount of time spent trying to establish a connection
140 * or a negative number to keep going forever
141 *
142 * @return
143 */
144 public long getEstablishConnectionTimeout() {
145 return establishConnectionTimeout;
146 }
147
148 public void setEstablishConnectionTimeout(long establishConnectionTimeout) {
149 this.establishConnectionTimeout = establishConnectionTimeout;
150 }
151
152 public int getMaximumRetries() {
153 return maximumRetries;
154 }
155
156 public void setMaximumRetries(int maximumRetries) {
157 this.maximumRetries = maximumRetries;
158 }
159
160 public long getFailureSleepTime() {
161 return failureSleepTime;
162 }
163
164 public void setFailureSleepTime(long failureSleepTime) {
165 this.failureSleepTime = failureSleepTime;
166 }
167
168 public List getUris() {
169 return uris;
170 }
171
172 public void setUris(List list) {
173 synchronized (uris) {
174 uris.clear();
175 uris.addAll(list);
176 }
177 }
178
179 /***
180 * Can this wireformat process packets of this version
181 * @param version the version number to test
182 * @return true if can accept the version
183 */
184 public boolean canProcessWireFormatVersion(int version){
185 return channel != null ? channel.canProcessWireFormatVersion(version) : true;
186 }
187
188 /***
189 * @return the current version of this wire format
190 */
191 public int getCurrentWireFormatVersion(){
192 return channel != null ? channel.getCurrentWireFormatVersion() : 1;
193 }
194
195
196
197
198 protected void establishConnection(long timeout) throws JMSException {
199
200
201 boolean connected = false;
202 long time = failureSleepTime;
203 long startTime = System.currentTimeMillis();
204
205 for (int i = 0; !connected && (i < maximumRetries || maximumRetries <= 0) && !closed.get() && !isPendingStop(); i++) {
206 List list = new ArrayList(getUris());
207 if (i > 0) {
208 if (maximumRetries > 0 || timeout > 0) {
209 long current = System.currentTimeMillis();
210 if (timeout >= 0) {
211 if (current + time > startTime + timeout) {
212 time = startTime + timeout - current;
213 }
214 }
215 if (current > startTime + timeout || time <= 0) {
216 throw new TimeoutExpiredException("Could not connect to any of the URIs: " + list);
217 }
218 }
219 log.info("Could not connect; sleeping for: " + time + " millis and trying again");
220 try {
221 Thread.sleep(time);
222 }
223 catch (InterruptedException e) {
224 log.warn("Sleep interupted: " + e, e);
225 }
226 if (maximumRetries > 0) {
227 time *= 2;
228 }
229 }
230
231 while (!connected && !list.isEmpty() && !closed.get() && !isPendingStop()) {
232 URI uri = extractURI(list);
233 try {
234 attemptToConnect(uri);
235 configureChannel();
236 connected = true;
237 currentURI = uri;
238 }
239 catch (JMSException e) {
240 log.info("Could not connect to: " + uri + ". Reason: " + e);
241 }
242 }
243
244 }
245 if (!connected && !closed.get()) {
246 StringBuffer buffer = new StringBuffer("");
247 Object[] uriArray = getUris().toArray();
248 for (int i = 0; i < uriArray.length; i++) {
249 buffer.append(uriArray[i]);
250 if (i < (uriArray.length - 1)) {
251 buffer.append(",");
252 }
253 }
254 JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
255 throw jmsEx;
256 }
257
258 }
259
260
261 protected TransportChannel getChannel() throws JMSException {
262 if (channel == null) {
263 throw new JMSException("No TransportChannel connection available");
264 }
265 return channel;
266 }
267
268 protected void configureChannel() {
269 ExceptionListener exceptionListener = getExceptionListener();
270 if (exceptionListener != null) {
271 channel.setExceptionListener(exceptionListener);
272 }
273 PacketListener packetListener = getPacketListener();
274 if (packetListener != null) {
275 channel.setPacketListener(packetListener);
276 }
277 }
278
279
280 protected URI extractURI(List list) throws JMSException {
281 int idx = 0;
282 if (list.size() > 1) {
283 do {
284 idx = (int) (Math.random() * list.size());
285 }
286 while (idx < 0 || idx >= list.size());
287 }
288 return (URI) list.remove(idx);
289 }
290
291 protected void attemptToConnect(URI uri) throws JMSException {
292 channel = TransportChannelProvider.create(wireFormat, uri);
293 if (started.get()) {
294 channel.start();
295 }
296 }
297 }