1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.http;
19
20 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.TextWireFormat;
25 import org.codehaus.activemq.transport.TransportChannelListener;
26 import org.codehaus.activemq.util.JMSExceptionHelper;
27
28 import javax.jms.JMSException;
29 import javax.servlet.ServletException;
30 import javax.servlet.http.HttpServlet;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33 import java.io.BufferedReader;
34 import java.io.DataOutputStream;
35 import java.io.IOException;
36 import java.util.HashMap;
37 import java.util.Map;
38
39 /***
40 * @version $Revision: 1.2 $
41 */
42 public class HttpTunnelServlet extends HttpServlet {
43
44 private static final Log log = LogFactory.getLog(HttpTunnelServlet.class);
45
46 private TransportChannelListener listener;
47 private TextWireFormat wireFormat;
48 private Map clients = new HashMap();
49 private long requestTimeout = 30000L;
50
51 public void init() throws ServletException {
52 super.init();
53 listener = (TransportChannelListener) getServletContext().getAttribute("transportChannelListener");
54 if (listener == null) {
55 throw new ServletException("No such attribute 'transportChannelListener' available in the ServletContext");
56 }
57 wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat");
58 if (wireFormat == null) {
59 throw new ServletException("No such attribute 'wireFormat' available in the ServletContext");
60 }
61 }
62
63 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
64
65 Packet packet = null;
66 try {
67 HttpServerTransportChannel transportChannel = getTransportChannel(request);
68 if (transportChannel == null) {
69 return;
70 }
71 packet = (Packet) transportChannel.getChannel().poll(requestTimeout);
72 }
73 catch (InterruptedException e) {
74
75 }
76 if (packet == null) {
77 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
78 }
79 else {
80 try {
81 wireFormat.writePacket(packet, new DataOutputStream(response.getOutputStream()));
82 }
83 catch (JMSException e) {
84 throw JMSExceptionHelper.newIOException(e);
85 }
86 }
87 }
88
89 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
90 HttpServerTransportChannel transportChannel = getTransportChannel(request);
91 if (transportChannel == null) {
92 response.setStatus(HttpServletResponse.SC_NOT_FOUND);
93 }
94 else {
95 try {
96 Packet packet = wireFormat.fromString(readRequestBody(request));
97 transportChannel.getPacketListener().consume(packet);
98 }
99 catch (IOException e) {
100 log.error("Caught: " + e, e);
101 }
102 catch (JMSException e) {
103 throw JMSExceptionHelper.newIOException(e);
104 }
105 }
106 }
107
108 protected String readRequestBody(HttpServletRequest request) throws IOException {
109 StringBuffer buffer = new StringBuffer();
110 BufferedReader reader = request.getReader();
111 while (true) {
112 String line = reader.readLine();
113 if (line == null) {
114 break;
115 }
116 else {
117 buffer.append(line);
118 buffer.append("\n");
119 }
120 }
121 return buffer.toString();
122 }
123
124 protected HttpServerTransportChannel getTransportChannel(HttpServletRequest request) {
125 String clientID = request.getHeader("clientID");
126 if (clientID == null) {
127 clientID = request.getParameter("clientID");
128 }
129 if (clientID == null) {
130 log.warn("No clientID header so ignoring request");
131 return null;
132 }
133 synchronized (this) {
134 HttpServerTransportChannel answer = (HttpServerTransportChannel) clients.get(clientID);
135 if (answer == null) {
136 answer = createTransportChannel();
137 clients.put(clientID, answer);
138 listener.addClient(answer);
139 }
140 else {
141
142 keepAlivePing(answer);
143 }
144 return answer;
145 }
146 }
147
148 /***
149 * Disable this channel from being auto-disconnected after a timeout period
150 */
151 protected void keepAlivePing(HttpServerTransportChannel channel) {
152 /*** TODO */
153 }
154
155 protected HttpServerTransportChannel createTransportChannel() {
156 return new HttpServerTransportChannel(new BoundedLinkedQueue(10));
157 }
158 }