001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import org.apache.activemq.util.FactoryFinder; 020import org.apache.activemq.util.IOExceptionSupport; 021import org.apache.activemq.util.IntrospectionSupport; 022import org.apache.activemq.util.URISupport; 023import org.apache.activemq.wireformat.WireFormat; 024import org.apache.activemq.wireformat.WireFormatFactory; 025 026import java.io.IOException; 027import java.net.MalformedURLException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.net.UnknownHostException; 031import java.util.HashMap; 032import java.util.Map; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.Executor; 035 036public abstract class TransportFactory { 037 038 private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); 039 private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); 040 private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>(); 041 042 private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout"; 043 private static final String THREAD_NAME_FILTER = "threadName"; 044 045 public abstract TransportServer doBind(URI location) throws IOException; 046 047 public Transport doConnect(URI location, Executor ex) throws Exception { 048 return doConnect(location); 049 } 050 051 public Transport doCompositeConnect(URI location, Executor ex) throws Exception { 052 return doCompositeConnect(location); 053 } 054 055 /** 056 * Creates a normal transport. 057 * 058 * @param location 059 * @return the transport 060 * @throws Exception 061 */ 062 public static Transport connect(URI location) throws Exception { 063 TransportFactory tf = findTransportFactory(location); 064 return tf.doConnect(location); 065 } 066 067 /** 068 * Creates a normal transport. 069 * 070 * @param location 071 * @param ex 072 * @return the transport 073 * @throws Exception 074 */ 075 public static Transport connect(URI location, Executor ex) throws Exception { 076 TransportFactory tf = findTransportFactory(location); 077 return tf.doConnect(location, ex); 078 } 079 080 /** 081 * Creates a slimmed down transport that is more efficient so that it can be 082 * used by composite transports like reliable and HA. 083 * 084 * @param location 085 * @return the Transport 086 * @throws Exception 087 */ 088 public static Transport compositeConnect(URI location) throws Exception { 089 TransportFactory tf = findTransportFactory(location); 090 return tf.doCompositeConnect(location); 091 } 092 093 /** 094 * Creates a slimmed down transport that is more efficient so that it can be 095 * used by composite transports like reliable and HA. 096 * 097 * @param location 098 * @param ex 099 * @return the Transport 100 * @throws Exception 101 */ 102 public static Transport compositeConnect(URI location, Executor ex) throws Exception { 103 TransportFactory tf = findTransportFactory(location); 104 return tf.doCompositeConnect(location, ex); 105 } 106 107 public static TransportServer bind(URI location) throws IOException { 108 TransportFactory tf = findTransportFactory(location); 109 return tf.doBind(location); 110 } 111 112 public Transport doConnect(URI location) throws Exception { 113 try { 114 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 115 if( !options.containsKey("wireFormat.host") ) { 116 options.put("wireFormat.host", location.getHost()); 117 } 118 WireFormat wf = createWireFormat(options); 119 Transport transport = createTransport(location, wf); 120 Transport rc = configure(transport, wf, options); 121 if (!options.isEmpty()) { 122 throw new IllegalArgumentException("Invalid connect parameters: " + options); 123 } 124 return rc; 125 } catch (URISyntaxException e) { 126 throw IOExceptionSupport.create(e); 127 } 128 } 129 130 public Transport doCompositeConnect(URI location) throws Exception { 131 try { 132 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 133 WireFormat wf = createWireFormat(options); 134 Transport transport = createTransport(location, wf); 135 Transport rc = compositeConfigure(transport, wf, options); 136 if (!options.isEmpty()) { 137 throw new IllegalArgumentException("Invalid connect parameters: " + options); 138 } 139 return rc; 140 } catch (URISyntaxException e) { 141 throw IOExceptionSupport.create(e); 142 } 143 } 144 145 /** 146 * Allow registration of a transport factory without wiring via META-INF classes 147 * @param scheme 148 * @param tf 149 */ 150 public static void registerTransportFactory(String scheme, TransportFactory tf) { 151 TRANSPORT_FACTORYS.put(scheme, tf); 152 } 153 154 /** 155 * Factory method to create a new transport 156 * 157 * @throws IOException 158 * @throws UnknownHostException 159 */ 160 protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException { 161 throw new IOException("createTransport() method not implemented!"); 162 } 163 164 /** 165 * @param location 166 * @return 167 * @throws IOException 168 */ 169 public static TransportFactory findTransportFactory(URI location) throws IOException { 170 String scheme = location.getScheme(); 171 if (scheme == null) { 172 throw new IOException("Transport not scheme specified: [" + location + "]"); 173 } 174 TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); 175 if (tf == null) { 176 // Try to load if from a META-INF property. 177 try { 178 tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); 179 TRANSPORT_FACTORYS.put(scheme, tf); 180 } catch (Throwable e) { 181 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); 182 } 183 } 184 return tf; 185 } 186 187 protected WireFormat createWireFormat(Map<String, String> options) throws IOException { 188 WireFormatFactory factory = createWireFormatFactory(options); 189 WireFormat format = factory.createWireFormat(); 190 return format; 191 } 192 193 protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException { 194 String wireFormat = (String)options.remove("wireFormat"); 195 if (wireFormat == null) { 196 wireFormat = getDefaultWireFormatType(); 197 } 198 199 try { 200 WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat); 201 IntrospectionSupport.setProperties(wff, options, "wireFormat."); 202 return wff; 203 } catch (Throwable e) { 204 throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e); 205 } 206 } 207 208 protected String getDefaultWireFormatType() { 209 return "default"; 210 } 211 212 /** 213 * Fully configures and adds all need transport filters so that the 214 * transport can be used by the JMS client. 215 * 216 * @param transport 217 * @param wf 218 * @param options 219 * @return 220 * @throws Exception 221 */ 222 @SuppressWarnings("rawtypes") 223 public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { 224 transport = compositeConfigure(transport, wf, options); 225 226 transport = new MutexTransport(transport); 227 transport = new ResponseCorrelator(transport); 228 229 return transport; 230 } 231 232 /** 233 * Fully configures and adds all need transport filters so that the 234 * transport can be used by the ActiveMQ message broker. The main difference 235 * between this and the configure() method is that the broker does not issue 236 * requests to the client so the ResponseCorrelator is not needed. 237 * 238 * @param transport 239 * @param format 240 * @param options 241 * @return 242 * @throws Exception 243 */ 244 @SuppressWarnings("rawtypes") 245 public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { 246 if (options.containsKey(THREAD_NAME_FILTER)) { 247 transport = new ThreadNameFilter(transport); 248 } 249 transport = compositeConfigure(transport, format, options); 250 transport = new MutexTransport(transport); 251 return transport; 252 } 253 254 /** 255 * Similar to configure(...) but this avoid adding in the MutexTransport and 256 * ResponseCorrelator transport layers so that the resulting transport can 257 * more efficiently be used as part of a composite transport. 258 * 259 * @param transport 260 * @param format 261 * @param options 262 * @return 263 */ 264 @SuppressWarnings("rawtypes") 265 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 266 if (options.containsKey(WRITE_TIMEOUT_FILTER)) { 267 transport = new WriteTimeoutFilter(transport); 268 String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER); 269 if (soWriteTimeout!=null) { 270 ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout)); 271 } 272 } 273 IntrospectionSupport.setProperties(transport, options); 274 return transport; 275 } 276 277 @SuppressWarnings("rawtypes") 278 protected String getOption(Map options, String key, String def) { 279 String rc = (String) options.remove(key); 280 if( rc == null ) { 281 rc = def; 282 } 283 return rc; 284 } 285}