View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.ember;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.WireFormat;
25  import org.codehaus.activemq.transport.TransportChannelSupport;
26  import pyrasun.eio.EIOGlobalContext;
27  import pyrasun.eio.services.EmberServiceController;
28  import pyrasun.eio.services.EmberServiceException;
29  import pyrasun.eio.services.bytearray.ByteArrayServerClient;
30  import pyrasun.eio.services.bytearray.ByteArrayServerClientListener;
31  
32  import javax.jms.JMSException;
33  import java.io.IOException;
34  
35  /***
36   * An EmberIO (using NIO) implementation of a TransportChannel
37   *
38   * @version $Revision: 1.15 $
39   */
40  public class EmberTransportChannel extends TransportChannelSupport implements ByteArrayServerClientListener {
41  
42      private static final Log log = LogFactory.getLog(EmberTransportChannel.class);
43  
44      private WireFormat wireFormat;
45      private EIOGlobalContext context;
46      private EmberServiceController controller;
47      private ByteArrayServerClient client;
48  
49      private SynchronizedBoolean closed;
50      private SynchronizedBoolean started;
51  
52  
53      /***
54       * Construct basic helpers
55       */
56      protected EmberTransportChannel(WireFormat wireFormat) {
57          this.wireFormat = wireFormat;
58  
59          closed = new SynchronizedBoolean(false);
60          started = new SynchronizedBoolean(false);
61      }
62  
63      /***
64       * Connect to a remote Node - e.g. a Broker
65       */
66      public EmberTransportChannel(WireFormat wireFormat, EIOGlobalContext context, EmberServiceController controller, ByteArrayServerClient client) {
67          this(wireFormat);
68          this.context = context;
69          this.client = client;
70          this.controller = controller;
71          client.setListener(this);
72      }
73  
74      /***
75       * close the channel
76       */
77      public void stop() {
78          super.stop();
79          if (closed.commit(false, true)) {
80              try {
81                  // on the server side don't shut down the controller, the server does that
82                  if (controller != null) {
83                      controller.stopAll();
84                  }
85                  if (context != null) {
86                      context.stop();
87                  }
88              }
89              catch (EmberServiceException e) {
90                  log.error("Caught while closing: " + e, e);
91              }
92          }
93      }
94  
95      /***
96       * start listeneing for events
97       *
98       * @throws JMSException if an error occurs
99       */
100     public void start() throws JMSException {
101         if (started.commit(false, true)) {
102 
103             try {
104                 // when using a transport channel created from a server
105                 // we don't need to initialise these things
106                 if (context != null) {
107                     context.start();
108                 }
109                 if (controller != null) {
110                     controller.startAll();
111                 }
112             }
113             catch (EmberServiceException e) {
114                 JMSException jmsEx = new JMSException("Error starting NIO client: " + e.getMessage());
115                 jmsEx.setLinkedException(e);
116                 throw jmsEx;
117             }
118         }
119     }
120 
121 
122     /***
123      * Asynchronously send a Packet
124      *
125      * @param packet
126      * @throws JMSException
127      */
128     public void asyncSend(Packet packet) throws JMSException {
129         try {
130             byte[] bytes = wireFormat.toBytes(packet);
131             // lets sync for now to avoid multiple threads writing to the same socket
132             synchronized (client) {
133                 client.write(bytes);
134             }
135         }
136         catch (IOException e) {
137             throw createJMSException("Failed to write packet: " + packet + ". ", e);
138         }
139     }
140 
141 
142     public boolean isMulticast() {
143         return false;
144     }
145 
146     /***
147      * Factory method to create a JMSException which is linked to the base exception
148      */
149     protected JMSException createJMSException(String message, Exception ex) {
150         JMSException jmsEx = new JMSException(message + ex.getMessage());
151         jmsEx.setLinkedException(ex);
152         return jmsEx;
153     }
154 
155     /***
156      * pretty print for object
157      *
158      * @return String representation of this object
159      */
160     public String toString() {
161         return "EmberTransportChannel: " + client;
162     }
163 
164     public void newMessage(ByteArrayServerClient client, Object msg) {
165         byte[] bytes = (byte[]) msg;
166         Packet packet = null;
167         try {
168             packet = wireFormat.fromBytes(bytes);
169             doConsumePacket(packet);
170         }
171         catch (IOException e) {
172             log.error("Could not parse byte[] of size: " + bytes.length + ". Reason: " + e, e);
173         }
174 
175     }
176     
177     /***
178      * Can this wireformat process packets of this version
179      * @param version the version number to test
180      * @return true if can accept the version
181      */
182     public boolean canProcessWireFormatVersion(int version){
183         return wireFormat.canProcessWireFormatVersion(version);
184     }
185     
186     /***
187      * @return the current version of this wire format
188      */
189     public int getCurrentWireFormatVersion(){
190         return wireFormat.getCurrentWireFormatVersion();
191     }
192 }