View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.http;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.Packet;
23  import org.codehaus.activemq.message.TextWireFormat;
24  import org.codehaus.activemq.util.Callback;
25  import org.codehaus.activemq.util.ExceptionTemplate;
26  import org.codehaus.activemq.util.JMSExceptionHelper;
27  
28  import javax.jms.JMSException;
29  import java.io.DataInputStream;
30  import java.io.IOException;
31  import java.io.OutputStreamWriter;
32  import java.io.Writer;
33  import java.net.HttpURLConnection;
34  import java.net.MalformedURLException;
35  import java.net.URL;
36  
37  /***
38   * @version $Revision: 1.3 $
39   */
40  public class HttpTransportChannel extends HttpTransportChannelSupport {
41      private static final Log log = LogFactory.getLog(HttpTransportChannel.class);
42      private URL url;
43      private HttpURLConnection sendConnection;
44      private HttpURLConnection receiveConnection;
45  
46  
47      public HttpTransportChannel(TextWireFormat wireFormat, String remoteUrl) throws MalformedURLException {
48          super(wireFormat, remoteUrl);
49          url = new URL(remoteUrl);
50      }
51  
52      public void asyncSend(Packet packet) throws JMSException {
53          try {
54              HttpURLConnection connection = getSendConnection();
55              String text = getWireFormat().toString(packet);
56              Writer writer = new OutputStreamWriter(connection.getOutputStream());
57              writer.write(text);
58              writer.flush();
59              int answer = connection.getResponseCode();
60              if (answer != HttpURLConnection.HTTP_OK) {
61                  throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
62              }
63          }
64          catch (IOException e) {
65              throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
66          }
67      }
68  
69      public void stop() {
70          ExceptionTemplate template = new ExceptionTemplate();
71          if (sendConnection != null) {
72              template.run(new Callback() {
73                  public void execute() throws Throwable {
74                      sendConnection.disconnect();
75                  }
76              });
77          }
78          if (receiveConnection != null) {
79              template.run(new Callback() {
80                  public void execute() throws Throwable {
81                      receiveConnection.disconnect();
82                  }
83              });
84          }
85          super.stop();
86          Throwable firstException = template.getFirstException();
87          if (firstException != null) {
88              log.warn("Failed to shut down cleanly: " + firstException, firstException);
89          }
90      }
91  
92      public boolean isMulticast() {
93          return false;
94      }
95  
96      public void run() {
97          log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
98          String remoteUrl = getRemoteUrl();
99          while (!getClosed().get()) {
100             try {
101                 HttpURLConnection connection = getReceiveConnection();
102                 int answer = connection.getResponseCode();
103                 if (answer != HttpURLConnection.HTTP_OK) {
104                     if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
105                         log.trace("GET timed out");
106                     }
107                     else {
108                         log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
109                     }
110                 }
111                 else {
112                     Packet packet = getWireFormat().readPacket(new DataInputStream(connection.getInputStream()));
113                     //Packet packet = getWireFormat().fromString(connection.getContent().toString());
114                     if (packet == null) {
115                         log.warn("Received null packet from url: " + remoteUrl);
116                     }
117                     else {
118                         doConsumePacket(packet);
119                     }
120                 }
121             }
122             catch (Exception e) {
123                 if (!getClosed().get()) {
124                     log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
125                 }
126                 else {
127                     log.trace("Caught error after closed: " + e, e);
128                 }
129             }
130         }
131     }
132 
133 
134     // Implementation methods
135     //-------------------------------------------------------------------------
136     protected synchronized HttpURLConnection getSendConnection() throws IOException {
137         sendConnection = (HttpURLConnection) url.openConnection();
138         sendConnection.setDoOutput(true);
139         sendConnection.setRequestMethod("POST");
140         configureConnection(sendConnection);
141         sendConnection.connect();
142         return sendConnection;
143     }
144 
145     protected synchronized HttpURLConnection getReceiveConnection() throws IOException {
146         receiveConnection = (HttpURLConnection) url.openConnection();
147         receiveConnection.setDoOutput(false);
148         receiveConnection.setDoInput(true);
149         receiveConnection.setRequestMethod("GET");
150         configureConnection(receiveConnection);
151         receiveConnection.connect();
152         return receiveConnection;
153     }
154 
155 
156     protected void configureConnection(HttpURLConnection connection) {
157         String clientID = getClientID();
158         if (clientID != null) {
159             connection.setRequestProperty("clientID", clientID);
160             //connection.addRequestProperty("clientID", clientID);
161         }
162     }
163 }