View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.composite;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.TimeoutExpiredException;
24  import org.codehaus.activemq.message.Packet;
25  import org.codehaus.activemq.message.PacketListener;
26  import org.codehaus.activemq.message.Receipt;
27  import org.codehaus.activemq.message.WireFormat;
28  import org.codehaus.activemq.transport.TransportChannel;
29  import org.codehaus.activemq.transport.TransportChannelProvider;
30  import org.codehaus.activemq.transport.TransportChannelSupport;
31  
32  import javax.jms.ExceptionListener;
33  import javax.jms.JMSException;
34  import java.net.URI;
35  import java.util.ArrayList;
36  import java.util.Collections;
37  import java.util.List;
38  
39  /***
40   * A Compsite implementation of a TransportChannel
41   *
42   * @version $Revision: 1.20 $
43   */
44  public class CompositeTransportChannel extends TransportChannelSupport {
45      private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
46  
47      protected WireFormat wireFormat;
48      protected List uris;
49      protected TransportChannel channel;
50      protected SynchronizedBoolean closed;
51      protected SynchronizedBoolean started;
52      protected int maximumRetries = 10;
53      protected long failureSleepTime = 500L;
54      protected URI currentURI;
55      private long establishConnectionTimeout = 30000L;
56  
57  
58      public CompositeTransportChannel(WireFormat wireFormat) {
59          this.wireFormat = wireFormat;
60          this.uris = Collections.synchronizedList(new ArrayList());
61          closed = new SynchronizedBoolean(false);
62          started = new SynchronizedBoolean(false);
63      }
64  
65      public CompositeTransportChannel(WireFormat wireFormat, List uris) {
66          this(wireFormat);
67          this.uris.addAll(uris);
68      }
69  
70      public String toString() {
71          return "CompositeTransportChannel: " + channel;
72      }
73  
74      public void start() throws JMSException {
75          if (started.commit(false, true)) {
76              establishConnection(establishConnectionTimeout);
77          }
78      }
79  
80      /***
81       * close the channel
82       */
83      public void stop() {
84          if (closed.commit(false, true)) {
85              if (channel != null) {
86                  try {
87                      channel.stop();
88                  }
89                  catch (Exception e) {
90                      log.warn("Caught while closing: " + e + ". Now Closed", e);
91                  }
92                  finally {
93                      channel = null;
94                      super.stop();
95                  }
96              }
97          }
98      }
99  
100     public Receipt send(Packet packet) throws JMSException {
101         return getChannel().send(packet);
102     }
103 
104 
105     public Receipt send(Packet packet, int timeout) throws JMSException {
106         return getChannel().send(packet, timeout);
107     }
108 
109 
110     public void asyncSend(Packet packet) throws JMSException {
111         getChannel().asyncSend(packet);
112     }
113 
114     public void setPacketListener(PacketListener listener) {
115         super.setPacketListener(listener);
116         if (channel != null) {
117             channel.setPacketListener(listener);
118         }
119     }
120 
121 
122     public void setExceptionListener(ExceptionListener listener) {
123         super.setExceptionListener(listener);
124         if (channel != null) {
125             channel.setExceptionListener(listener);
126         }
127     }
128 
129 
130     public boolean isMulticast() {
131         return false;
132     }
133 
134     // Properties
135     //-------------------------------------------------------------------------
136 
137 
138     /***
139      * Return the maximum amount of time spent trying to establish a connection
140      * or a negative number to keep going forever
141      *
142      * @return
143      */
144     public long getEstablishConnectionTimeout() {
145         return establishConnectionTimeout;
146     }
147 
148     public void setEstablishConnectionTimeout(long establishConnectionTimeout) {
149         this.establishConnectionTimeout = establishConnectionTimeout;
150     }
151 
152     public int getMaximumRetries() {
153         return maximumRetries;
154     }
155 
156     public void setMaximumRetries(int maximumRetries) {
157         this.maximumRetries = maximumRetries;
158     }
159 
160     public long getFailureSleepTime() {
161         return failureSleepTime;
162     }
163 
164     public void setFailureSleepTime(long failureSleepTime) {
165         this.failureSleepTime = failureSleepTime;
166     }
167 
168     public List getUris() {
169         return uris;
170     }
171 
172     public void setUris(List list) {
173         synchronized (uris) {
174             uris.clear();
175             uris.addAll(list);
176         }
177     }
178     
179     /***
180      * Can this wireformat process packets of this version
181      * @param version the version number to test
182      * @return true if can accept the version
183      */
184     public boolean canProcessWireFormatVersion(int version){
185         return channel != null ? channel.canProcessWireFormatVersion(version) : true;
186     }
187     
188     /***
189      * @return the current version of this wire format
190      */
191     public int getCurrentWireFormatVersion(){
192         return channel != null ? channel.getCurrentWireFormatVersion() : 1;
193     }
194 
195     // Implementation methods
196     //-------------------------------------------------------------------------
197     
198     protected void establishConnection(long timeout) throws JMSException {
199 
200         // lets try connect
201         boolean connected = false;
202         long time = failureSleepTime;
203         long startTime = System.currentTimeMillis();
204 
205         for (int i = 0; !connected && (i < maximumRetries || maximumRetries <= 0) && !closed.get() && !isPendingStop(); i++) {
206             List list = new ArrayList(getUris());
207             if (i > 0) {
208                 if (maximumRetries > 0 || timeout > 0) {
209                     long current = System.currentTimeMillis();
210                     if (timeout >= 0) {
211                         if (current + time > startTime + timeout) {
212                             time = startTime + timeout - current;
213                         }
214                     }
215                     if (current > startTime + timeout || time <= 0) {
216                         throw new TimeoutExpiredException("Could not connect to any of the URIs: " + list);
217                     }
218                 }
219                 log.info("Could not connect; sleeping for: " + time + " millis and trying again");
220                 try {
221                     Thread.sleep(time);
222                 }
223                 catch (InterruptedException e) {
224                     log.warn("Sleep interupted: " + e, e);
225                 }
226                 if (maximumRetries > 0) {
227                     time *= 2;
228                 }
229             }
230 
231             while (!connected && !list.isEmpty() && !closed.get() && !isPendingStop()) {
232                 URI uri = extractURI(list);
233                 try {
234                     attemptToConnect(uri);
235                     configureChannel();
236                     connected = true;
237                     currentURI = uri;
238                 }
239                 catch (JMSException e) {
240                     log.info("Could not connect to: " + uri + ". Reason: " + e);
241                 }
242             }
243 
244         }
245         if (!connected && !closed.get()) {
246             StringBuffer buffer = new StringBuffer("");
247             Object[] uriArray = getUris().toArray();
248             for (int i = 0; i < uriArray.length; i++) {
249                 buffer.append(uriArray[i]);
250                 if (i < (uriArray.length - 1)) {
251                     buffer.append(",");
252                 }
253             }
254             JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
255             throw jmsEx;
256         }
257 
258     }
259 
260 
261     protected TransportChannel getChannel() throws JMSException {
262         if (channel == null) {
263             throw new JMSException("No TransportChannel connection available");
264         }
265         return channel;
266     }
267 
268     protected void configureChannel() {
269         ExceptionListener exceptionListener = getExceptionListener();
270         if (exceptionListener != null) {
271             channel.setExceptionListener(exceptionListener);
272         }
273         PacketListener packetListener = getPacketListener();
274         if (packetListener != null) {
275             channel.setPacketListener(packetListener);
276         }
277     }
278 
279 
280     protected URI extractURI(List list) throws JMSException {
281         int idx = 0;
282         if (list.size() > 1) {
283             do {
284                 idx = (int) (Math.random() * list.size());
285             }
286             while (idx < 0 || idx >= list.size());
287         }
288         return (URI) list.remove(idx);
289     }
290 
291     protected void attemptToConnect(URI uri) throws JMSException {
292         channel = TransportChannelProvider.create(wireFormat, uri);
293         if (started.get()) {
294             channel.start();
295         }
296     }
297 }