001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.Map; 024 025import javax.jms.InvalidDestinationException; 026import javax.jms.JMSException; 027 028import org.apache.activemq.command.ActiveMQBytesMessage; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ActiveMQMessage; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.command.ProducerId; 033import org.apache.activemq.command.ProducerInfo; 034import org.apache.activemq.util.IOExceptionSupport; 035import org.apache.activemq.util.IntrospectionSupport; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039@Deprecated 040public class ActiveMQOutputStream extends OutputStream implements Disposable { 041 042 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class); 043 044 protected int count; 045 046 final byte buffer[]; 047 048 private final ActiveMQConnection connection; 049 private final Map<String, Object> properties; 050 private final ProducerInfo info; 051 052 private long messageSequence; 053 private boolean closed; 054 private final int deliveryMode; 055 private final int priority; 056 private final long timeToLive; 057 private boolean alwaysSyncSend = false; 058 private boolean addPropertiesOnFirstMsgOnly = false; 059 060 /** 061 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb 062 */ 063 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE"; 064 065 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority, 066 long timeToLive) throws JMSException { 067 this.connection = connection; 068 this.deliveryMode = deliveryMode; 069 this.priority = priority; 070 this.timeToLive = timeToLive; 071 this.properties = properties == null ? null : new HashMap<String, Object>(properties); 072 073 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE); 074 if (chunkSize == null) { 075 chunkSize = 64 * 1024; 076 } else { 077 if (chunkSize < 1) { 078 throw new IllegalArgumentException("Chunk size must be greater then 0"); 079 } else { 080 chunkSize *= 1024; 081 } 082 } 083 084 buffer = new byte[chunkSize]; 085 086 if (destination == null) { 087 throw new InvalidDestinationException("Don't understand null destinations"); 088 } 089 090 this.info = new ProducerInfo(producerId); 091 092 // Allows the options on the destination to configure the stream 093 if (destination.getOptions() != null) { 094 Map<String, String> options = new HashMap<String, String>(destination.getOptions()); 095 IntrospectionSupport.setProperties(this, options, "producer."); 096 IntrospectionSupport.setProperties(this.info, options, "producer."); 097 if (options.size() > 0) { 098 String msg = "There are " + options.size() 099 + " producer options that couldn't be set on the producer." 100 + " Check the options are spelled correctly." 101 + " Unknown parameters=[" + options + "]." 102 + " This producer cannot be started."; 103 LOG.warn(msg); 104 throw new ConfigurationException(msg); 105 } 106 } 107 108 this.info.setDestination(destination); 109 110 this.connection.addOutputStream(this); 111 this.connection.asyncSendPacket(info); 112 } 113 114 @Override 115 public void close() throws IOException { 116 if (!closed) { 117 flushBuffer(); 118 try { 119 // Send an EOS style empty message to signal EOS. 120 send(new ActiveMQMessage(), true); 121 dispose(); 122 this.connection.asyncSendPacket(info.createRemoveCommand()); 123 } catch (JMSException e) { 124 IOExceptionSupport.create(e); 125 } 126 } 127 } 128 129 @Override 130 public void dispose() { 131 if (!closed) { 132 this.connection.removeOutputStream(this); 133 closed = true; 134 } 135 } 136 137 @Override 138 public synchronized void write(int b) throws IOException { 139 buffer[count++] = (byte) b; 140 if (count == buffer.length) { 141 flushBuffer(); 142 } 143 } 144 145 @Override 146 public synchronized void write(byte b[], int off, int len) throws IOException { 147 while (len > 0) { 148 int max = Math.min(len, buffer.length - count); 149 System.arraycopy(b, off, buffer, count, max); 150 151 len -= max; 152 count += max; 153 off += max; 154 155 if (count == buffer.length) { 156 flushBuffer(); 157 } 158 } 159 } 160 161 @Override 162 public synchronized void flush() throws IOException { 163 flushBuffer(); 164 } 165 166 private void flushBuffer() throws IOException { 167 if (count != 0) { 168 try { 169 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 170 msg.writeBytes(buffer, 0, count); 171 send(msg, false); 172 } catch (JMSException e) { 173 throw IOExceptionSupport.create(e); 174 } 175 count = 0; 176 } 177 } 178 179 /** 180 * @param msg 181 * @throws JMSException 182 */ 183 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { 184 if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) { 185 for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) { 186 String key = iter.next(); 187 Object value = properties.get(key); 188 msg.setObjectProperty(key, value); 189 } 190 } 191 msg.setType("org.apache.activemq.Stream"); 192 msg.setGroupID(info.getProducerId().toString()); 193 if (eosMessage) { 194 msg.setGroupSequence(-1); 195 } else { 196 msg.setGroupSequence((int) messageSequence); 197 } 198 MessageId id = new MessageId(info.getProducerId(), messageSequence++); 199 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend()); 200 } 201 202 @Override 203 public String toString() { 204 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; 205 } 206 207 public boolean isAlwaysSyncSend() { 208 return alwaysSyncSend; 209 } 210 211 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 212 this.alwaysSyncSend = alwaysSyncSend; 213 } 214 215 public boolean isAddPropertiesOnFirstMsgOnly() { 216 return addPropertiesOnFirstMsgOnly; 217 } 218 219 public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) { 220 this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly; 221 } 222}