001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.http; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.URI; 023import java.security.cert.X509Certificate; 024import java.util.zip.GZIPInputStream; 025import java.util.zip.GZIPOutputStream; 026 027import org.apache.activemq.command.ShutdownInfo; 028import org.apache.activemq.transport.FutureResponse; 029import org.apache.activemq.transport.util.TextWireFormat; 030import org.apache.activemq.util.ByteArrayOutputStream; 031import org.apache.activemq.util.IOExceptionSupport; 032import org.apache.activemq.util.IdGenerator; 033import org.apache.activemq.util.ServiceStopper; 034import org.apache.activemq.wireformat.WireFormat; 035import org.apache.http.Header; 036import org.apache.http.HttpHost; 037import org.apache.http.HttpRequest; 038import org.apache.http.HttpRequestInterceptor; 039import org.apache.http.HttpResponse; 040import org.apache.http.HttpStatus; 041import org.apache.http.auth.AuthScope; 042import org.apache.http.auth.UsernamePasswordCredentials; 043import org.apache.http.client.HttpClient; 044import org.apache.http.client.HttpResponseException; 045import org.apache.http.client.ResponseHandler; 046import org.apache.http.client.methods.HttpGet; 047import org.apache.http.client.methods.HttpHead; 048import org.apache.http.client.methods.HttpOptions; 049import org.apache.http.client.methods.HttpPost; 050import org.apache.http.client.params.CookiePolicy; 051import org.apache.http.client.params.HttpClientParams; 052import org.apache.http.conn.ClientConnectionManager; 053import org.apache.http.conn.params.ConnRoutePNames; 054import org.apache.http.conn.scheme.PlainSocketFactory; 055import org.apache.http.conn.scheme.Scheme; 056import org.apache.http.entity.ByteArrayEntity; 057import org.apache.http.impl.client.BasicResponseHandler; 058import org.apache.http.impl.client.DefaultHttpClient; 059import org.apache.http.impl.conn.PoolingClientConnectionManager; 060import org.apache.http.message.AbstractHttpMessage; 061import org.apache.http.params.HttpConnectionParams; 062import org.apache.http.params.HttpParams; 063import org.apache.http.protocol.HttpContext; 064import org.apache.http.util.EntityUtils; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the 070 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a> 071 * library 072 */ 073public class HttpClientTransport extends HttpTransportSupport { 074 075 public static final int MAX_CLIENT_TIMEOUT = 30000; 076 private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class); 077 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator(); 078 079 private HttpClient sendHttpClient; 080 private HttpClient receiveHttpClient; 081 082 private final String clientID = CLIENT_ID_GENERATOR.generateId(); 083 private boolean trace; 084 private HttpGet httpMethod; 085 private volatile int receiveCounter; 086 087 private int soTimeout = MAX_CLIENT_TIMEOUT; 088 089 private boolean useCompression = false; 090 protected boolean canSendCompressed = false; 091 private int minSendAsCompressedSize = 0; 092 093 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 094 super(wireFormat, remoteUrl); 095 } 096 097 public FutureResponse asyncRequest(Object command) throws IOException { 098 return null; 099 } 100 101 @Override 102 public void oneway(Object command) throws IOException { 103 104 if (isStopped()) { 105 throw new IOException("stopped."); 106 } 107 HttpPost httpMethod = new HttpPost(getRemoteUrl().toString()); 108 configureMethod(httpMethod); 109 String data = getTextWireFormat().marshalText(command); 110 byte[] bytes = data.getBytes("UTF-8"); 111 if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) { 112 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 113 GZIPOutputStream stream = new GZIPOutputStream(bytesOut); 114 stream.write(bytes); 115 stream.close(); 116 httpMethod.addHeader("Content-Type", "application/x-gzip"); 117 if (LOG.isTraceEnabled()) { 118 LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size()); 119 } 120 bytes = bytesOut.toByteArray(); 121 } 122 ByteArrayEntity entity = new ByteArrayEntity(bytes); 123 httpMethod.setEntity(entity); 124 125 HttpClient client = null; 126 HttpResponse answer = null; 127 try { 128 client = getSendHttpClient(); 129 answer = client.execute(httpMethod); 130 int status = answer.getStatusLine().getStatusCode(); 131 if (status != HttpStatus.SC_OK) { 132 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 133 } 134 if (command instanceof ShutdownInfo) { 135 try { 136 stop(); 137 } catch (Exception e) { 138 LOG.warn("Error trying to stop HTTP client: "+ e, e); 139 } 140 } 141 } catch (IOException e) { 142 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 143 } finally { 144 if (answer != null) { 145 EntityUtils.consume(answer.getEntity()); 146 } 147 } 148 } 149 150 @Override 151 public Object request(Object command) throws IOException { 152 return null; 153 } 154 155 private DataInputStream createDataInputStream(HttpResponse answer) throws IOException { 156 Header encoding = answer.getEntity().getContentEncoding(); 157 if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) { 158 return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent())); 159 } else { 160 return new DataInputStream(answer.getEntity().getContent()); 161 } 162 } 163 164 @Override 165 public void run() { 166 167 if (LOG.isTraceEnabled()) { 168 LOG.trace("HTTP GET consumer thread starting: " + this); 169 } 170 HttpClient httpClient = getReceiveHttpClient(); 171 URI remoteUrl = getRemoteUrl(); 172 173 while (!isStopped() && !isStopping()) { 174 175 httpMethod = new HttpGet(remoteUrl.toString()); 176 configureMethod(httpMethod); 177 HttpResponse answer = null; 178 179 try { 180 answer = httpClient.execute(httpMethod); 181 int status = answer.getStatusLine().getStatusCode(); 182 if (status != HttpStatus.SC_OK) { 183 if (status == HttpStatus.SC_REQUEST_TIMEOUT) { 184 LOG.debug("GET timed out"); 185 try { 186 Thread.sleep(1000); 187 } catch (InterruptedException e) { 188 onException(new InterruptedIOException()); 189 Thread.currentThread().interrupt(); 190 break; 191 } 192 } else { 193 onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 194 break; 195 } 196 } else { 197 receiveCounter++; 198 DataInputStream stream = createDataInputStream(answer); 199 Object command = getTextWireFormat().unmarshal(stream); 200 if (command == null) { 201 LOG.debug("Received null command from url: " + remoteUrl); 202 } else { 203 doConsume(command); 204 } 205 stream.close(); 206 } 207 } catch (IOException e) { 208 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); 209 break; 210 } finally { 211 if (answer != null) { 212 try { 213 EntityUtils.consume(answer.getEntity()); 214 } catch (IOException e) { 215 } 216 } 217 } 218 } 219 } 220 221 // Properties 222 // ------------------------------------------------------------------------- 223 public HttpClient getSendHttpClient() { 224 if (sendHttpClient == null) { 225 sendHttpClient = createHttpClient(); 226 } 227 return sendHttpClient; 228 } 229 230 public void setSendHttpClient(HttpClient sendHttpClient) { 231 this.sendHttpClient = sendHttpClient; 232 } 233 234 public HttpClient getReceiveHttpClient() { 235 if (receiveHttpClient == null) { 236 receiveHttpClient = createHttpClient(); 237 } 238 return receiveHttpClient; 239 } 240 241 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 242 this.receiveHttpClient = receiveHttpClient; 243 } 244 245 // Implementation methods 246 // ------------------------------------------------------------------------- 247 @Override 248 protected void doStart() throws Exception { 249 250 if (LOG.isTraceEnabled()) { 251 LOG.trace("HTTP GET consumer thread starting: " + this); 252 } 253 HttpClient httpClient = getReceiveHttpClient(); 254 URI remoteUrl = getRemoteUrl(); 255 256 HttpHead httpMethod = new HttpHead(remoteUrl.toString()); 257 configureMethod(httpMethod); 258 259 // Request the options from the server so we can find out if the broker we are 260 // talking to supports GZip compressed content. If so and useCompression is on 261 // then we can compress our POST data, otherwise we must send it uncompressed to 262 // ensure backwards compatibility. 263 HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString()); 264 ResponseHandler<String> handler = new BasicResponseHandler() { 265 @Override 266 public String handleResponse(HttpResponse response) throws HttpResponseException, IOException { 267 268 for(Header header : response.getAllHeaders()) { 269 if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) { 270 LOG.info("Broker Servlet supports GZip compression."); 271 canSendCompressed = true; 272 break; 273 } 274 } 275 276 return super.handleResponse(response); 277 } 278 }; 279 280 try { 281 httpClient.execute(httpMethod, new BasicResponseHandler()); 282 httpClient.execute(optionsMethod, handler); 283 } catch(Exception e) { 284 LOG.trace("Error on start: ", e); 285 throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage()); 286 } 287 288 super.doStart(); 289 } 290 291 @Override 292 protected void doStop(ServiceStopper stopper) throws Exception { 293 if (httpMethod != null) { 294 // In some versions of the JVM a race between the httpMethod and the completion 295 // of the method when using HTTPS can lead to a deadlock. This hack attempts to 296 // detect that and interrupt the thread that's locked so that they can complete 297 // on another attempt. 298 for (int i = 0; i < 3; ++i) { 299 Thread abortThread = new Thread(new Runnable() { 300 301 @Override 302 public void run() { 303 try { 304 httpMethod.abort(); 305 } catch (Exception e) { 306 } 307 } 308 }); 309 310 abortThread.start(); 311 abortThread.join(2000); 312 if (abortThread.isAlive() && !httpMethod.isAborted()) { 313 abortThread.interrupt(); 314 } 315 } 316 } 317 } 318 319 protected HttpClient createHttpClient() { 320 DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager()); 321 if (useCompression) { 322 client.addRequestInterceptor( new HttpRequestInterceptor() { 323 @Override 324 public void process(HttpRequest request, HttpContext context) { 325 // We expect to received a compression response that we un-gzip 326 request.addHeader("Accept-Encoding", "gzip"); 327 } 328 }); 329 } 330 if (getProxyHost() != null) { 331 HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort()); 332 client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy); 333 334 if (client.getConnectionManager().getSchemeRegistry().get("http") == null) { 335 client.getConnectionManager().getSchemeRegistry().register( 336 new Scheme("http", getProxyPort(), PlainSocketFactory.getSocketFactory())); 337 } 338 339 if(getProxyUser() != null && getProxyPassword() != null) { 340 client.getCredentialsProvider().setCredentials( 341 new AuthScope(getProxyHost(), getProxyPort()), 342 new UsernamePasswordCredentials(getProxyUser(), getProxyPassword())); 343 } 344 } 345 346 HttpParams params = client.getParams(); 347 HttpConnectionParams.setSoTimeout(params, soTimeout); 348 HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY); 349 350 return client; 351 } 352 353 protected ClientConnectionManager createClientConnectionManager() { 354 return new PoolingClientConnectionManager(); 355 } 356 357 protected void configureMethod(AbstractHttpMessage method) { 358 method.setHeader("clientID", clientID); 359 } 360 361 public boolean isTrace() { 362 return trace; 363 } 364 365 public void setTrace(boolean trace) { 366 this.trace = trace; 367 } 368 369 @Override 370 public int getReceiveCounter() { 371 return receiveCounter; 372 } 373 374 public int getSoTimeout() { 375 return soTimeout; 376 } 377 378 public void setSoTimeout(int soTimeout) { 379 this.soTimeout = soTimeout; 380 } 381 382 public void setUseCompression(boolean useCompression) { 383 this.useCompression = useCompression; 384 } 385 386 public boolean isUseCompression() { 387 return this.useCompression; 388 } 389 390 public int getMinSendAsCompressedSize() { 391 return minSendAsCompressedSize; 392 } 393 394 /** 395 * Sets the minimum size that must be exceeded on a send before compression is used if 396 * the useCompression option is specified. For very small payloads compression can be 397 * inefficient compared to the transmission size savings. 398 * 399 * Default value is 0. 400 * 401 * @param minSendAsCompressedSize 402 */ 403 public void setMinSendAsCompressedSize(int minSendAsCompressedSize) { 404 this.minSendAsCompressedSize = minSendAsCompressedSize; 405 } 406 407 @Override 408 public X509Certificate[] getPeerCertificates() { 409 return null; 410 } 411 412 @Override 413 public void setPeerCertificates(X509Certificate[] certificates) { 414 } 415 416 @Override 417 public WireFormat getWireFormat() { 418 return getTextWireFormat(); 419 } 420}