View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.jgroups;
19  
20  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21  import EDU.oswego.cs.dl.util.concurrent.Executor;
22  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
23  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.codehaus.activemq.message.Packet;
27  import org.codehaus.activemq.message.WireFormat;
28  import org.codehaus.activemq.transport.TransportChannelSupport;
29  import org.codehaus.activemq.util.JMSExceptionHelper;
30  import org.jgroups.Address;
31  import org.jgroups.Channel;
32  import org.jgroups.ChannelClosedException;
33  import org.jgroups.ChannelException;
34  import org.jgroups.ChannelNotConnectedException;
35  import org.jgroups.Message;
36  import org.jgroups.TimeoutException;
37  
38  import javax.jms.JMSException;
39  import java.io.IOException;
40  
41  /***
42   * A JGroups implementation of a TransportChannel
43   *
44   * @version $Revision: 1.9 $
45   */
46  public class JGroupsTransportChannel extends TransportChannelSupport implements Runnable {
47      private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
48  
49      private Channel channel;
50      private Address localAddress = null;
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private SynchronizedBoolean started;
54      private Object outboundLock;
55      private Executor executor;
56      private Thread thread; //need to change this - and use a thread pool
57      private boolean useAsyncSend = false;
58  
59      public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
60          this.wireFormat = wireFormat;
61          this.channel = channel;
62          this.executor = executor;
63          this.localAddress = channel.getLocalAddress();
64  
65          closed = new SynchronizedBoolean(false);
66          started = new SynchronizedBoolean(false);
67          outboundLock = new Object();
68          if (useAsyncSend) {
69              executor = new PooledExecutor(new BoundedBuffer(1000), 1);
70          }
71      }
72  
73      public String toString() {
74          return "JGroupsTransportChannel: " + channel;
75      }
76  
77      /***
78       * close the channel
79       */
80      public void stop() {
81          if (closed.commit(false, true)) {
82              super.stop();
83              try {
84                  stopExecutor(executor);
85                  channel.disconnect();
86                  channel.close();
87              }
88              catch (Exception e) {
89                  log.warn("Caught while closing: " + e + ". Now Closed", e);
90              }
91          }
92      }
93  
94      /***
95       * start listeneing for events
96       *
97       * @throws javax.jms.JMSException if an error occurs
98       */
99      public void start() throws JMSException {
100         if (started.commit(false, true)) {
101             thread = new Thread(this, toString());
102             if (isServerSide()) {
103                 thread.setDaemon(true);
104             }
105             thread.start();
106         }
107     }
108 
109 
110     /***
111      * Asynchronously send a Packet
112      *
113      * @param packet
114      * @throws javax.jms.JMSException
115      */
116     public void asyncSend(final Packet packet) throws JMSException {
117         if (executor != null) {
118             try {
119                 executor.execute(new Runnable() {
120                     public void run() {
121                         try {
122                             writePacket(packet);
123                         }
124                         catch (JMSException e) {
125                             onAsyncException(e);
126                         }
127                     }
128                 });
129             }
130             catch (InterruptedException e) {
131                 log.info("Caught: " + e, e);
132             }
133         }
134         else {
135             writePacket(packet);
136         }
137     }
138 
139 
140     public boolean isMulticast() {
141         return true;
142     }
143     
144     /***
145      * Can this wireformat process packets of this version
146      * @param version the version number to test
147      * @return true if can accept the version
148      */
149     public boolean canProcessWireFormatVersion(int version){
150         return wireFormat.canProcessWireFormatVersion(version);
151     }
152     
153     /***
154      * @return the current version of this wire format
155      */
156     public int getCurrentWireFormatVersion(){
157         return wireFormat.getCurrentWireFormatVersion();
158     }
159 
160     /***
161      * reads packets from a Socket
162      */
163     public void run() {
164         log.trace("JGroups consumer thread starting");
165         while (!closed.get()) {
166             try {
167                 Object value = channel.receive(0L);
168                 if (value instanceof Message) {
169                     Message message = (Message) value;
170 
171                     // lets discard messages coming from the local address
172                     // to avoid infinite loops when used with the JMS broker
173                     if (!localAddress.equals(message.getSrc())) {
174                         byte[] data = message.getBuffer();
175                         Packet packet = wireFormat.fromBytes(data);
176                         if (packet != null) {
177                             doConsumePacket(packet);
178                         }
179                     }
180                 }
181                 /*
182                 else {
183                     String type = "";
184                     if (value != null) {
185                         type = " of type: " + value.getClass();
186                     }
187                     log.warn("Expected instanceof Message but received: " + value + type);
188                 }
189                 */
190             }
191             catch (IOException e) {
192                 doClose(e);
193             }
194             catch (ChannelClosedException e) {
195                 stop();
196             }
197             catch (ChannelNotConnectedException e) {
198                 doClose(e);
199             }
200             catch (TimeoutException e) {
201                 // ignore timeouts
202             }
203         }
204     }
205 
206     /***
207      * writes the packet to the channel
208      */
209     protected void writePacket(Packet packet) throws JMSException {
210         try {
211             synchronized (outboundLock) {
212                 Address dest = null;
213                 Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
214                 channel.send(message);
215             }
216         }
217         catch (ChannelException e) {
218             throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
219         }
220         catch (IOException e) {
221             throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
222         }
223     }
224 
225 
226     private void doClose(Exception ex) {
227         if (!closed.get()) {
228             onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
229             stop();
230         }
231     }
232 }