1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.http;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.Packet;
23 import org.codehaus.activemq.message.TextWireFormat;
24 import org.codehaus.activemq.util.Callback;
25 import org.codehaus.activemq.util.ExceptionTemplate;
26 import org.codehaus.activemq.util.JMSExceptionHelper;
27
28 import javax.jms.JMSException;
29 import java.io.DataInputStream;
30 import java.io.IOException;
31 import java.io.OutputStreamWriter;
32 import java.io.Writer;
33 import java.net.HttpURLConnection;
34 import java.net.MalformedURLException;
35 import java.net.URL;
36
37 /***
38 * @version $Revision: 1.3 $
39 */
40 public class HttpTransportChannel extends HttpTransportChannelSupport {
41 private static final Log log = LogFactory.getLog(HttpTransportChannel.class);
42 private URL url;
43 private HttpURLConnection sendConnection;
44 private HttpURLConnection receiveConnection;
45
46
47 public HttpTransportChannel(TextWireFormat wireFormat, String remoteUrl) throws MalformedURLException {
48 super(wireFormat, remoteUrl);
49 url = new URL(remoteUrl);
50 }
51
52 public void asyncSend(Packet packet) throws JMSException {
53 try {
54 HttpURLConnection connection = getSendConnection();
55 String text = getWireFormat().toString(packet);
56 Writer writer = new OutputStreamWriter(connection.getOutputStream());
57 writer.write(text);
58 writer.flush();
59 int answer = connection.getResponseCode();
60 if (answer != HttpURLConnection.HTTP_OK) {
61 throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
62 }
63 }
64 catch (IOException e) {
65 throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
66 }
67 }
68
69 public void stop() {
70 ExceptionTemplate template = new ExceptionTemplate();
71 if (sendConnection != null) {
72 template.run(new Callback() {
73 public void execute() throws Throwable {
74 sendConnection.disconnect();
75 }
76 });
77 }
78 if (receiveConnection != null) {
79 template.run(new Callback() {
80 public void execute() throws Throwable {
81 receiveConnection.disconnect();
82 }
83 });
84 }
85 super.stop();
86 Throwable firstException = template.getFirstException();
87 if (firstException != null) {
88 log.warn("Failed to shut down cleanly: " + firstException, firstException);
89 }
90 }
91
92 public boolean isMulticast() {
93 return false;
94 }
95
96 public void run() {
97 log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
98 String remoteUrl = getRemoteUrl();
99 while (!getClosed().get()) {
100 try {
101 HttpURLConnection connection = getReceiveConnection();
102 int answer = connection.getResponseCode();
103 if (answer != HttpURLConnection.HTTP_OK) {
104 if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
105 log.trace("GET timed out");
106 }
107 else {
108 log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
109 }
110 }
111 else {
112 Packet packet = getWireFormat().readPacket(new DataInputStream(connection.getInputStream()));
113
114 if (packet == null) {
115 log.warn("Received null packet from url: " + remoteUrl);
116 }
117 else {
118 doConsumePacket(packet);
119 }
120 }
121 }
122 catch (Exception e) {
123 if (!getClosed().get()) {
124 log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
125 }
126 else {
127 log.trace("Caught error after closed: " + e, e);
128 }
129 }
130 }
131 }
132
133
134
135
136 protected synchronized HttpURLConnection getSendConnection() throws IOException {
137 sendConnection = (HttpURLConnection) url.openConnection();
138 sendConnection.setDoOutput(true);
139 sendConnection.setRequestMethod("POST");
140 configureConnection(sendConnection);
141 sendConnection.connect();
142 return sendConnection;
143 }
144
145 protected synchronized HttpURLConnection getReceiveConnection() throws IOException {
146 receiveConnection = (HttpURLConnection) url.openConnection();
147 receiveConnection.setDoOutput(false);
148 receiveConnection.setDoInput(true);
149 receiveConnection.setRequestMethod("GET");
150 configureConnection(receiveConnection);
151 receiveConnection.connect();
152 return receiveConnection;
153 }
154
155
156 protected void configureConnection(HttpURLConnection connection) {
157 String clientID = getClientID();
158 if (clientID != null) {
159 connection.setRequestProperty("clientID", clientID);
160
161 }
162 }
163 }