View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.http;
19  
20  import org.apache.commons.httpclient.HttpClient;
21  import org.apache.commons.httpclient.HttpMethod;
22  import org.apache.commons.httpclient.HttpStatus;
23  import org.apache.commons.httpclient.methods.GetMethod;
24  import org.apache.commons.httpclient.methods.PostMethod;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.codehaus.activemq.message.Packet;
28  import org.codehaus.activemq.message.TextWireFormat;
29  import org.codehaus.activemq.util.JMSExceptionHelper;
30  
31  import javax.jms.JMSException;
32  import java.io.DataInputStream;
33  import java.io.IOException;
34  
35  /***
36   * A HTTP {@link org.codehaus.activemq.transport.TransportChannel} which uses the
37   * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a> library
38   *
39   * @version $Revision: 1.1 $
40   */
41  public class HttpClientTransportChannel extends HttpTransportChannelSupport {
42      private static final Log log = LogFactory.getLog(HttpClientTransportChannel.class);
43  
44      private HttpClient sendHttpClient;
45      private HttpClient receiveHttpClient;
46  
47      public HttpClientTransportChannel(TextWireFormat wireFormat, String remoteUrl) {
48          super(wireFormat, remoteUrl);
49      }
50  
51      public void asyncSend(Packet packet) throws JMSException {
52          PostMethod httpMethod = new PostMethod(getRemoteUrl());
53          configureMethod(httpMethod);
54          httpMethod.setRequestBody(getWireFormat().toString(packet));
55          try {
56              int answer = getSendHttpClient().executeMethod(httpMethod);
57              if (answer != HttpStatus.SC_OK) {
58                  throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
59              }
60          }
61          catch (IOException e) {
62              throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
63          }
64      }
65  
66      public boolean isMulticast() {
67          return false;
68      }
69  
70      public void run() {
71          log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
72          HttpClient httpClient = getReceiveHttpClient();
73          String remoteUrl = getRemoteUrl();
74          while (!getClosed().get()) {
75              GetMethod httpMethod = new GetMethod(remoteUrl);
76              configureMethod(httpMethod);
77              try {
78                  int answer = httpClient.executeMethod(httpMethod);
79                  if (answer != HttpStatus.SC_OK) {
80                      if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
81                          log.info("GET timed out");
82                      }
83                      else {
84                          log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
85                      }
86                  }
87                  else {
88                      Packet packet = getWireFormat().readPacket(new DataInputStream(httpMethod.getResponseBodyAsStream()));
89                      if (packet == null) {
90                          log.warn("Received null packet from url: " + remoteUrl);
91                      }
92                      else {
93                          doConsumePacket(packet);
94                      }
95                  }
96              }
97              catch (IOException e) {
98                  log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
99              }
100         }
101     }
102 
103     // Properties
104     //-------------------------------------------------------------------------
105     public HttpClient getSendHttpClient() {
106         if (sendHttpClient == null) {
107             sendHttpClient = createHttpClient();
108         }
109         return sendHttpClient;
110     }
111 
112     public void setSendHttpClient(HttpClient sendHttpClient) {
113         this.sendHttpClient = sendHttpClient;
114     }
115 
116     public HttpClient getReceiveHttpClient() {
117         if (receiveHttpClient == null) {
118             receiveHttpClient = createHttpClient();
119         }
120         return receiveHttpClient;
121     }
122 
123     public void setReceiveHttpClient(HttpClient receiveHttpClient) {
124         this.receiveHttpClient = receiveHttpClient;
125     }
126 
127     // Implementation methods
128     //-------------------------------------------------------------------------
129     protected HttpClient createHttpClient() {
130         return new HttpClient();
131     }
132 
133     protected void configureMethod(HttpMethod method) {
134         String clientID = getClientID();
135         if (clientID != null) {
136             method.setRequestHeader("clientID", clientID);
137         }
138     }
139 }