001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.URI;
023import java.security.cert.X509Certificate;
024import java.util.zip.GZIPInputStream;
025import java.util.zip.GZIPOutputStream;
026
027import org.apache.activemq.command.ShutdownInfo;
028import org.apache.activemq.transport.FutureResponse;
029import org.apache.activemq.transport.util.TextWireFormat;
030import org.apache.activemq.util.ByteArrayOutputStream;
031import org.apache.activemq.util.IOExceptionSupport;
032import org.apache.activemq.util.IdGenerator;
033import org.apache.activemq.util.ServiceStopper;
034import org.apache.activemq.wireformat.WireFormat;
035import org.apache.http.Header;
036import org.apache.http.HttpHost;
037import org.apache.http.HttpRequest;
038import org.apache.http.HttpRequestInterceptor;
039import org.apache.http.HttpResponse;
040import org.apache.http.HttpStatus;
041import org.apache.http.auth.AuthScope;
042import org.apache.http.auth.UsernamePasswordCredentials;
043import org.apache.http.client.HttpClient;
044import org.apache.http.client.HttpResponseException;
045import org.apache.http.client.ResponseHandler;
046import org.apache.http.client.methods.HttpGet;
047import org.apache.http.client.methods.HttpHead;
048import org.apache.http.client.methods.HttpOptions;
049import org.apache.http.client.methods.HttpPost;
050import org.apache.http.client.params.CookiePolicy;
051import org.apache.http.client.params.HttpClientParams;
052import org.apache.http.conn.ClientConnectionManager;
053import org.apache.http.conn.params.ConnRoutePNames;
054import org.apache.http.conn.scheme.PlainSocketFactory;
055import org.apache.http.conn.scheme.Scheme;
056import org.apache.http.entity.ByteArrayEntity;
057import org.apache.http.impl.client.BasicResponseHandler;
058import org.apache.http.impl.client.DefaultHttpClient;
059import org.apache.http.impl.conn.PoolingClientConnectionManager;
060import org.apache.http.message.AbstractHttpMessage;
061import org.apache.http.params.HttpConnectionParams;
062import org.apache.http.params.HttpParams;
063import org.apache.http.protocol.HttpContext;
064import org.apache.http.util.EntityUtils;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the
070 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a>
071 * library
072 */
073public class HttpClientTransport extends HttpTransportSupport {
074
075    public static final int MAX_CLIENT_TIMEOUT = 30000;
076    private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
077    private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
078
079    private HttpClient sendHttpClient;
080    private HttpClient receiveHttpClient;
081
082    private final String clientID = CLIENT_ID_GENERATOR.generateId();
083    private boolean trace;
084    private HttpGet httpMethod;
085    private volatile int receiveCounter;
086
087    private int soTimeout = MAX_CLIENT_TIMEOUT;
088
089    private boolean useCompression = false;
090    protected boolean canSendCompressed = false;
091    private int minSendAsCompressedSize = 0;
092
093    public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
094        super(wireFormat, remoteUrl);
095    }
096
097    public FutureResponse asyncRequest(Object command) throws IOException {
098        return null;
099    }
100
101    @Override
102    public void oneway(Object command) throws IOException {
103
104        if (isStopped()) {
105            throw new IOException("stopped.");
106        }
107        HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
108        configureMethod(httpMethod);
109        String data = getTextWireFormat().marshalText(command);
110        byte[] bytes = data.getBytes("UTF-8");
111        if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
112            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
113            GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
114            stream.write(bytes);
115            stream.close();
116            httpMethod.addHeader("Content-Type", "application/x-gzip");
117            if (LOG.isTraceEnabled()) {
118                LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
119            }
120            bytes = bytesOut.toByteArray();
121        }
122        ByteArrayEntity entity = new ByteArrayEntity(bytes);
123        httpMethod.setEntity(entity);
124
125        HttpClient client = null;
126        HttpResponse answer = null;
127        try {
128            client = getSendHttpClient();
129            answer = client.execute(httpMethod);
130            int status = answer.getStatusLine().getStatusCode();
131            if (status != HttpStatus.SC_OK) {
132                throw new IOException("Failed to post command: " + command + " as response was: " + answer);
133            }
134            if (command instanceof ShutdownInfo) {
135                try {
136                    stop();
137                } catch (Exception e) {
138                    LOG.warn("Error trying to stop HTTP client: "+ e, e);
139                }
140            }
141        } catch (IOException e) {
142            throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
143        } finally {
144            if (answer != null) {
145                EntityUtils.consume(answer.getEntity());
146            }
147        }
148    }
149
150    @Override
151    public Object request(Object command) throws IOException {
152        return null;
153    }
154
155    private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
156        Header encoding = answer.getEntity().getContentEncoding();
157        if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
158            return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
159        } else {
160            return new DataInputStream(answer.getEntity().getContent());
161        }
162    }
163
164    @Override
165    public void run() {
166
167        if (LOG.isTraceEnabled()) {
168            LOG.trace("HTTP GET consumer thread starting: " + this);
169        }
170        HttpClient httpClient = getReceiveHttpClient();
171        URI remoteUrl = getRemoteUrl();
172
173        while (!isStopped() && !isStopping()) {
174
175            httpMethod = new HttpGet(remoteUrl.toString());
176            configureMethod(httpMethod);
177            HttpResponse answer = null;
178
179            try {
180                answer = httpClient.execute(httpMethod);
181                int status = answer.getStatusLine().getStatusCode();
182                if (status != HttpStatus.SC_OK) {
183                    if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
184                        LOG.debug("GET timed out");
185                        try {
186                            Thread.sleep(1000);
187                        } catch (InterruptedException e) {
188                            onException(new InterruptedIOException());
189                            Thread.currentThread().interrupt();
190                            break;
191                        }
192                    } else {
193                        onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
194                        break;
195                    }
196                } else {
197                    receiveCounter++;
198                    DataInputStream stream = createDataInputStream(answer);
199                    Object command = getTextWireFormat().unmarshal(stream);
200                    if (command == null) {
201                        LOG.debug("Received null command from url: " + remoteUrl);
202                    } else {
203                        doConsume(command);
204                    }
205                    stream.close();
206                }
207            } catch (IOException e) {
208                onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
209                break;
210            } finally {
211                if (answer != null) {
212                    try {
213                        EntityUtils.consume(answer.getEntity());
214                    } catch (IOException e) {
215                    }
216                }
217            }
218        }
219    }
220
221    // Properties
222    // -------------------------------------------------------------------------
223    public HttpClient getSendHttpClient() {
224        if (sendHttpClient == null) {
225            sendHttpClient = createHttpClient();
226        }
227        return sendHttpClient;
228    }
229
230    public void setSendHttpClient(HttpClient sendHttpClient) {
231        this.sendHttpClient = sendHttpClient;
232    }
233
234    public HttpClient getReceiveHttpClient() {
235        if (receiveHttpClient == null) {
236            receiveHttpClient = createHttpClient();
237        }
238        return receiveHttpClient;
239    }
240
241    public void setReceiveHttpClient(HttpClient receiveHttpClient) {
242        this.receiveHttpClient = receiveHttpClient;
243    }
244
245    // Implementation methods
246    // -------------------------------------------------------------------------
247    @Override
248    protected void doStart() throws Exception {
249
250        if (LOG.isTraceEnabled()) {
251            LOG.trace("HTTP GET consumer thread starting: " + this);
252        }
253        HttpClient httpClient = getReceiveHttpClient();
254        URI remoteUrl = getRemoteUrl();
255
256        HttpHead httpMethod = new HttpHead(remoteUrl.toString());
257        configureMethod(httpMethod);
258
259        // Request the options from the server so we can find out if the broker we are
260        // talking to supports GZip compressed content.  If so and useCompression is on
261        // then we can compress our POST data, otherwise we must send it uncompressed to
262        // ensure backwards compatibility.
263        HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
264        ResponseHandler<String> handler = new BasicResponseHandler() {
265            @Override
266            public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
267
268                for(Header header : response.getAllHeaders()) {
269                    if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
270                        LOG.info("Broker Servlet supports GZip compression.");
271                        canSendCompressed = true;
272                        break;
273                    }
274                }
275
276                return super.handleResponse(response);
277            }
278        };
279
280        try {
281            httpClient.execute(httpMethod, new BasicResponseHandler());
282            httpClient.execute(optionsMethod, handler);
283        } catch(Exception e) {
284            LOG.trace("Error on start: ", e);
285            throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
286        }
287
288        super.doStart();
289    }
290
291    @Override
292    protected void doStop(ServiceStopper stopper) throws Exception {
293        if (httpMethod != null) {
294            // In some versions of the JVM a race between the httpMethod and the completion
295            // of the method when using HTTPS can lead to a deadlock.  This hack attempts to
296            // detect that and interrupt the thread that's locked so that they can complete
297            // on another attempt.
298            for (int i = 0; i < 3; ++i) {
299                Thread abortThread = new Thread(new Runnable() {
300
301                    @Override
302                    public void run() {
303                        try {
304                            httpMethod.abort();
305                        } catch (Exception e) {
306                        }
307                    }
308                });
309
310                abortThread.start();
311                abortThread.join(2000);
312                if (abortThread.isAlive() && !httpMethod.isAborted()) {
313                    abortThread.interrupt();
314                }
315            }
316        }
317    }
318
319    protected HttpClient createHttpClient() {
320        DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
321        if (useCompression) {
322            client.addRequestInterceptor( new HttpRequestInterceptor() {
323                @Override
324                public void process(HttpRequest request, HttpContext context) {
325                    // We expect to received a compression response that we un-gzip
326                    request.addHeader("Accept-Encoding", "gzip");
327                }
328            });
329        }
330        if (getProxyHost() != null) {
331            HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
332            client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
333
334            if (client.getConnectionManager().getSchemeRegistry().get("http") == null) {
335                client.getConnectionManager().getSchemeRegistry().register(
336                    new Scheme("http", getProxyPort(), PlainSocketFactory.getSocketFactory()));
337            }
338
339            if(getProxyUser() != null && getProxyPassword() != null) {
340                client.getCredentialsProvider().setCredentials(
341                    new AuthScope(getProxyHost(), getProxyPort()),
342                    new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
343            }
344        }
345
346        HttpParams params = client.getParams();
347        HttpConnectionParams.setSoTimeout(params, soTimeout);
348        HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY);
349
350        return client;
351    }
352
353    protected ClientConnectionManager createClientConnectionManager() {
354        return new PoolingClientConnectionManager();
355    }
356
357    protected void configureMethod(AbstractHttpMessage method) {
358        method.setHeader("clientID", clientID);
359    }
360
361    public boolean isTrace() {
362        return trace;
363    }
364
365    public void setTrace(boolean trace) {
366        this.trace = trace;
367    }
368
369    @Override
370    public int getReceiveCounter() {
371        return receiveCounter;
372    }
373
374    public int getSoTimeout() {
375        return soTimeout;
376    }
377
378    public void setSoTimeout(int soTimeout) {
379        this.soTimeout = soTimeout;
380    }
381
382    public void setUseCompression(boolean useCompression) {
383        this.useCompression = useCompression;
384    }
385
386    public boolean isUseCompression() {
387        return this.useCompression;
388    }
389
390    public int getMinSendAsCompressedSize() {
391        return minSendAsCompressedSize;
392    }
393
394    /**
395     * Sets the minimum size that must be exceeded on a send before compression is used if
396     * the useCompression option is specified.  For very small payloads compression can be
397     * inefficient compared to the transmission size savings.
398     *
399     * Default value is 0.
400     *
401     * @param minSendAsCompressedSize
402     */
403    public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
404        this.minSendAsCompressedSize = minSendAsCompressedSize;
405    }
406
407    @Override
408    public X509Certificate[] getPeerCertificates() {
409        return null;
410    }
411
412    @Override
413    public void setPeerCertificates(X509Certificate[] certificates) {
414    }
415
416    @Override
417    public WireFormat getWireFormat() {
418        return getTextWireFormat();
419    }
420}