001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Map; 024 025import javax.jms.IllegalStateException; 026import javax.jms.InvalidDestinationException; 027import javax.jms.JMSException; 028 029import org.apache.activemq.command.ActiveMQBytesMessage; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQMessage; 032import org.apache.activemq.command.CommandTypes; 033import org.apache.activemq.command.ConsumerId; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatch; 037import org.apache.activemq.command.ProducerId; 038import org.apache.activemq.selector.SelectorParser; 039import org.apache.activemq.util.IOExceptionSupport; 040import org.apache.activemq.util.IntrospectionSupport; 041import org.apache.activemq.util.JMSExceptionSupport; 042 043/** 044 * 045 */ 046@Deprecated 047public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { 048 049 private final ActiveMQConnection connection; 050 private final ConsumerInfo info; 051 // These are the messages waiting to be delivered to the client 052 private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel(); 053 054 private int deliveredCounter; 055 private MessageDispatch lastDelivered; 056 private boolean eosReached; 057 private byte buffer[]; 058 private int pos; 059 private Map<String, Object> jmsProperties; 060 061 private ProducerId producerId; 062 private long nextSequenceId; 063 private final long timeout; 064 private boolean firstReceived; 065 066 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout) 067 throws JMSException { 068 this.connection = connection; 069 070 if (dest == null) { 071 throw new InvalidDestinationException("Don't understand null destinations"); 072 } else if (dest.isTemporary()) { 073 String physicalName = dest.getPhysicalName(); 074 075 if (physicalName == null) { 076 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 077 } 078 079 String connectionID = connection.getConnectionInfo().getConnectionId().getValue(); 080 081 if (physicalName.indexOf(connectionID) < 0) { 082 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 083 } 084 085 if (connection.isDeleted(dest)) { 086 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 087 } 088 } 089 090 if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1"); 091 this.timeout = timeout; 092 093 this.info = new ConsumerInfo(consumerId); 094 this.info.setSubscriptionName(name); 095 096 if (selector != null && selector.trim().length() != 0) { 097 selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) "; 098 } else { 099 selector = "JMSType='org.apache.activemq.Stream'"; 100 } 101 102 SelectorParser.parse(selector); 103 this.info.setSelector(selector); 104 105 this.info.setPrefetchSize(prefetch); 106 this.info.setNoLocal(noLocal); 107 this.info.setBrowser(false); 108 this.info.setDispatchAsync(false); 109 110 // Allows the options on the destination to configure the consumerInfo 111 if (dest.getOptions() != null) { 112 Map<String, String> options = new HashMap<String, String>(dest.getOptions()); 113 IntrospectionSupport.setProperties(this.info, options, "consumer."); 114 } 115 116 this.info.setDestination(dest); 117 118 this.connection.addInputStream(this); 119 this.connection.addDispatcher(info.getConsumerId(), this); 120 this.connection.syncSendPacket(info); 121 unconsumedMessages.start(); 122 } 123 124 @Override 125 public void close() throws IOException { 126 if (!unconsumedMessages.isClosed()) { 127 try { 128 if (lastDelivered != null) { 129 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 130 connection.asyncSendPacket(ack); 131 } 132 dispose(); 133 this.connection.syncSendPacket(info.createRemoveCommand()); 134 } catch (JMSException e) { 135 throw IOExceptionSupport.create(e); 136 } 137 } 138 } 139 140 public void dispose() { 141 if (!unconsumedMessages.isClosed()) { 142 unconsumedMessages.close(); 143 this.connection.removeDispatcher(info.getConsumerId()); 144 this.connection.removeInputStream(this); 145 } 146 } 147 148 /** 149 * Return the JMS Properties which where used to send the InputStream 150 * 151 * @return jmsProperties 152 * @throws IOException 153 */ 154 public Map<String, Object> getJMSProperties() throws IOException { 155 if (jmsProperties == null) { 156 fillBuffer(); 157 } 158 return jmsProperties; 159 } 160 161 /** 162 * This method allows the client to receive the Stream data as unaltered ActiveMQMessage 163 * object which is how the split stream data is sent. Each message will contains one 164 * chunk of the written bytes as well as a valid message group sequence id. The EOS 165 * message will have a message group sequence id of -1. 166 * 167 * This method is useful for testing, but should never be mixed with calls to the 168 * normal stream receive methods as it will break the normal stream processing flow 169 * and can lead to loss of data. 170 * 171 * @return an ActiveMQMessage object that either contains byte data or an end of strem 172 * marker. 173 * @throws JMSException 174 * @throws ReadTimeoutException 175 */ 176 public ActiveMQMessage receive() throws JMSException, ReadTimeoutException { 177 checkClosed(); 178 MessageDispatch md; 179 try { 180 if (firstReceived || timeout == -1) { 181 md = unconsumedMessages.dequeue(-1); 182 firstReceived = true; 183 } else { 184 md = unconsumedMessages.dequeue(timeout); 185 if (md == null) throw new ReadTimeoutException(); 186 } 187 } catch (InterruptedException e) { 188 Thread.currentThread().interrupt(); 189 throw JMSExceptionSupport.create(e); 190 } 191 192 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) { 193 return null; 194 } 195 196 deliveredCounter++; 197 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { 198 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 199 connection.asyncSendPacket(ack); 200 deliveredCounter = 0; 201 lastDelivered = null; 202 } else { 203 lastDelivered = md; 204 } 205 206 return (ActiveMQMessage)md.getMessage(); 207 } 208 209 /** 210 * @throws IllegalStateException 211 */ 212 protected void checkClosed() throws IllegalStateException { 213 if (unconsumedMessages.isClosed()) { 214 throw new IllegalStateException("The Consumer is closed"); 215 } 216 } 217 218 /** 219 * 220 * @see InputStream#read() 221 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout 222 */ 223 @Override 224 public int read() throws IOException { 225 fillBuffer(); 226 if (eosReached || buffer.length == 0) { 227 return -1; 228 } 229 230 return buffer[pos++] & 0xff; 231 } 232 233 /** 234 * 235 * @see InputStream#read(byte[], int, int) 236 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout 237 */ 238 @Override 239 public int read(byte[] b, int off, int len) throws IOException { 240 fillBuffer(); 241 if (eosReached || buffer.length == 0) { 242 return -1; 243 } 244 245 int max = Math.min(len, buffer.length - pos); 246 System.arraycopy(buffer, pos, b, off, max); 247 248 pos += max; 249 return max; 250 } 251 252 private void fillBuffer() throws IOException { 253 if (eosReached || (buffer != null && buffer.length > pos)) { 254 return; 255 } 256 try { 257 while (true) { 258 ActiveMQMessage m = receive(); 259 if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { 260 // First message. 261 long producerSequenceId = m.getMessageId().getProducerSequenceId(); 262 if (producerId == null) { 263 // We have to start a stream at sequence id = 0 264 if (producerSequenceId != 0) { 265 continue; 266 } 267 nextSequenceId++; 268 producerId = m.getMessageId().getProducerId(); 269 } else { 270 // Verify it's the next message of the sequence. 271 if (!m.getMessageId().getProducerId().equals(producerId)) { 272 throw new IOException("Received an unexpected message: invalid producer: " + m); 273 } 274 if (producerSequenceId != nextSequenceId++) { 275 throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m); 276 } 277 } 278 279 // Read the buffer in. 280 ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m; 281 buffer = new byte[(int)bm.getBodyLength()]; 282 bm.readBytes(buffer); 283 pos = 0; 284 if (jmsProperties == null) { 285 jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties())); 286 } 287 } else { 288 eosReached = true; 289 if (jmsProperties == null) { 290 // no properties found 291 jmsProperties = Collections.emptyMap(); 292 } 293 } 294 return; 295 } 296 } catch (JMSException e) { 297 eosReached = true; 298 if (jmsProperties == null) { 299 // no properties found 300 jmsProperties = Collections.emptyMap(); 301 } 302 throw IOExceptionSupport.create(e); 303 } 304 } 305 306 @Override 307 public void dispatch(MessageDispatch md) { 308 unconsumedMessages.enqueue(md); 309 } 310 311 @Override 312 public String toString() { 313 return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; 314 } 315 316 /** 317 * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout 318 */ 319 public class ReadTimeoutException extends IOException { 320 private static final long serialVersionUID = -3217758894326719909L; 321 322 public ReadTimeoutException() { 323 super(); 324 } 325 } 326}