001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.LinkedList;
024import java.util.Map;
025import java.util.Map.Entry;
026
027import javax.jms.JMSException;
028
029import org.apache.activemq.command.ActiveMQBytesMessage;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQMessage;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.command.TransactionId;
037
038/**
039 * Keeps track of the STOMP subscription so that acking is correctly done.
040 *
041 * @author <a href="http://hiramchirino.com">chirino</a>
042 */
043public class StompSubscription {
044
045    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
046    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
047    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
048
049    protected final ProtocolConverter protocolConverter;
050    protected final String subscriptionId;
051    protected final ConsumerInfo consumerInfo;
052
053    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
054    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
055
056    protected String ackMode = AUTO_ACK;
057    protected ActiveMQDestination destination;
058    protected String transformation;
059
060    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
061        this.protocolConverter = stompTransport;
062        this.subscriptionId = subscriptionId;
063        this.consumerInfo = consumerInfo;
064        this.transformation = transformation;
065    }
066
067    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
068        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
069        if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
070            synchronized (this) {
071                dispatchedMessage.put(message.getMessageId(), md);
072            }
073        } else if (ackMode.equals(AUTO_ACK)) {
074            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
075            protocolConverter.getStompTransport().sendToActiveMQ(ack);
076        }
077
078        boolean ignoreTransformation = false;
079
080        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
081            message.setReadOnlyProperties(false);
082            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
083        } else {
084            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
085                ignoreTransformation = true;
086            }
087        }
088
089        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
090
091        command.setAction(Stomp.Responses.MESSAGE);
092        if (subscriptionId != null) {
093            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
094        }
095
096        if (ackId != null) {
097            command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
098        }
099
100        protocolConverter.getStompTransport().sendToStomp(command);
101    }
102
103    synchronized void onStompAbort(TransactionId transactionId) {
104        unconsumedMessage.clear();
105    }
106
107    void onStompCommit(TransactionId transactionId) {
108        MessageAck ack = null;
109        synchronized (this) {
110            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
111                @SuppressWarnings("rawtypes")
112                Map.Entry entry = (Entry)iter.next();
113                MessageDispatch msg = (MessageDispatch)entry.getValue();
114                if (unconsumedMessage.contains(msg)) {
115                    iter.remove();
116                }
117            }
118
119            // For individual Ack we already sent an Ack that will be applied on commit
120            // we don't send a second standard Ack as that would produce an error.
121            if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
122                ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
123                ack.setTransactionId(transactionId);
124                unconsumedMessage.clear();
125            }
126        }
127        // avoid contention with onMessageDispatch
128        if (ack != null) {
129            protocolConverter.getStompTransport().sendToActiveMQ(ack);
130        }
131    }
132
133    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
134
135        MessageId msgId = new MessageId(messageId);
136
137        if (!dispatchedMessage.containsKey(msgId)) {
138            return null;
139        }
140
141        MessageAck ack = new MessageAck();
142        ack.setDestination(consumerInfo.getDestination());
143        ack.setConsumerId(consumerInfo.getConsumerId());
144
145        final ArrayList<String> acknowledgedMessages = new ArrayList<>();
146
147        if (ackMode == CLIENT_ACK) {
148            if (transactionId == null) {
149                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
150            } else {
151                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
152            }
153            int count = 0;
154            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
155
156                @SuppressWarnings("rawtypes")
157                Map.Entry entry = (Entry)iter.next();
158                MessageId id = (MessageId)entry.getKey();
159                MessageDispatch msg = (MessageDispatch)entry.getValue();
160
161                if (transactionId != null) {
162                    if (!unconsumedMessage.contains(msg)) {
163                        unconsumedMessage.add(msg);
164                        count++;
165                    }
166                } else {
167                    acknowledgedMessages.add(id.toString());
168                    iter.remove();
169                    count++;
170                }
171
172                if (id.equals(msgId)) {
173                    ack.setLastMessageId(id);
174                    break;
175                }
176            }
177            ack.setMessageCount(count);
178            if (transactionId != null) {
179                ack.setTransactionId(transactionId);
180            }
181
182            this.protocolConverter.afterClientAck(this, acknowledgedMessages);
183        } else if (ackMode == INDIVIDUAL_ACK) {
184            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
185            ack.setMessageID(msgId);
186            ack.setMessageCount(1);
187            if (transactionId != null) {
188                unconsumedMessage.add(dispatchedMessage.get(msgId));
189                ack.setTransactionId(transactionId);
190            } else {
191                dispatchedMessage.remove(msgId);
192            }
193        }
194
195        return ack;
196    }
197
198    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
199
200        MessageId msgId = new MessageId(messageId);
201
202        if (!dispatchedMessage.containsKey(msgId)) {
203            return null;
204        }
205
206        MessageAck ack = new MessageAck();
207        ack.setDestination(consumerInfo.getDestination());
208        ack.setConsumerId(consumerInfo.getConsumerId());
209        ack.setAckType(MessageAck.POSION_ACK_TYPE);
210        ack.setMessageID(msgId);
211        if (transactionId != null) {
212            unconsumedMessage.add(dispatchedMessage.get(msgId));
213            ack.setTransactionId(transactionId);
214        }
215        dispatchedMessage.remove(msgId);
216
217        return ack;
218    }
219
220    public String getAckMode() {
221        return ackMode;
222    }
223
224    public void setAckMode(String ackMode) {
225        this.ackMode = ackMode;
226    }
227
228    public String getSubscriptionId() {
229        return subscriptionId;
230    }
231
232    public void setDestination(ActiveMQDestination destination) {
233        this.destination = destination;
234    }
235
236    public ActiveMQDestination getDestination() {
237        return destination;
238    }
239
240    public ConsumerInfo getConsumerInfo() {
241        return consumerInfo;
242    }
243}