001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import javax.jms.InvalidDestinationException;
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.activemq.util.IntrospectionSupport;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@Deprecated
040public class ActiveMQOutputStream extends OutputStream implements Disposable {
041
042    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
043
044    protected int count;
045
046    final byte buffer[];
047
048    private final ActiveMQConnection connection;
049    private final Map<String, Object> properties;
050    private final ProducerInfo info;
051
052    private long messageSequence;
053    private boolean closed;
054    private final int deliveryMode;
055    private final int priority;
056    private final long timeToLive;
057    private boolean alwaysSyncSend = false;
058    private boolean addPropertiesOnFirstMsgOnly = false;
059
060    /**
061     * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
062     */
063    public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
064
065    public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
066                                long timeToLive) throws JMSException {
067        this.connection = connection;
068        this.deliveryMode = deliveryMode;
069        this.priority = priority;
070        this.timeToLive = timeToLive;
071        this.properties = properties == null ? null : new HashMap<String, Object>(properties);
072
073        Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
074        if (chunkSize == null) {
075            chunkSize = 64 * 1024;
076        } else {
077            if (chunkSize < 1) {
078                throw new IllegalArgumentException("Chunk size must be greater then 0");
079            } else {
080                chunkSize *= 1024;
081            }
082        }
083
084        buffer = new byte[chunkSize];
085
086        if (destination == null) {
087            throw new InvalidDestinationException("Don't understand null destinations");
088        }
089
090        this.info = new ProducerInfo(producerId);
091
092        // Allows the options on the destination to configure the stream
093        if (destination.getOptions() != null) {
094            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
095            IntrospectionSupport.setProperties(this, options, "producer.");
096            IntrospectionSupport.setProperties(this.info, options, "producer.");
097            if (options.size() > 0) {
098                String msg = "There are " + options.size()
099                    + " producer options that couldn't be set on the producer."
100                    + " Check the options are spelled correctly."
101                    + " Unknown parameters=[" + options + "]."
102                    + " This producer cannot be started.";
103                LOG.warn(msg);
104                throw new ConfigurationException(msg);
105            }
106        }
107
108        this.info.setDestination(destination);
109
110        this.connection.addOutputStream(this);
111        this.connection.asyncSendPacket(info);
112    }
113
114    @Override
115    public void close() throws IOException {
116        if (!closed) {
117            flushBuffer();
118            try {
119                // Send an EOS style empty message to signal EOS.
120                send(new ActiveMQMessage(), true);
121                dispose();
122                this.connection.asyncSendPacket(info.createRemoveCommand());
123            } catch (JMSException e) {
124                IOExceptionSupport.create(e);
125            }
126        }
127    }
128
129    @Override
130    public void dispose() {
131        if (!closed) {
132            this.connection.removeOutputStream(this);
133            closed = true;
134        }
135    }
136
137    @Override
138    public synchronized void write(int b) throws IOException {
139        buffer[count++] = (byte) b;
140        if (count == buffer.length) {
141            flushBuffer();
142        }
143    }
144
145    @Override
146    public synchronized void write(byte b[], int off, int len) throws IOException {
147        while (len > 0) {
148            int max = Math.min(len, buffer.length - count);
149            System.arraycopy(b, off, buffer, count, max);
150
151            len -= max;
152            count += max;
153            off += max;
154
155            if (count == buffer.length) {
156                flushBuffer();
157            }
158        }
159    }
160
161    @Override
162    public synchronized void flush() throws IOException {
163        flushBuffer();
164    }
165
166    private void flushBuffer() throws IOException {
167        if (count != 0) {
168            try {
169                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
170                msg.writeBytes(buffer, 0, count);
171                send(msg, false);
172            } catch (JMSException e) {
173                throw IOExceptionSupport.create(e);
174            }
175            count = 0;
176        }
177    }
178
179    /**
180     * @param msg
181     * @throws JMSException
182     */
183    private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
184        if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
185            for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
186                String key = iter.next();
187                Object value = properties.get(key);
188                msg.setObjectProperty(key, value);
189            }
190        }
191        msg.setType("org.apache.activemq.Stream");
192        msg.setGroupID(info.getProducerId().toString());
193        if (eosMessage) {
194            msg.setGroupSequence(-1);
195        } else {
196            msg.setGroupSequence((int) messageSequence);
197        }
198        MessageId id = new MessageId(info.getProducerId(), messageSequence++);
199        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
200    }
201
202    @Override
203    public String toString() {
204        return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
205    }
206
207    public boolean isAlwaysSyncSend() {
208        return alwaysSyncSend;
209    }
210
211    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
212        this.alwaysSyncSend = alwaysSyncSend;
213    }
214
215    public boolean isAddPropertiesOnFirstMsgOnly() {
216        return addPropertiesOnFirstMsgOnly;
217    }
218
219    public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
220        this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
221    }
222}