001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStreamWriter;
024import java.io.PrintWriter;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.atomic.AtomicBoolean;
032
033import javax.jms.JMSException;
034
035import org.apache.activemq.ActiveMQPrefetchPolicy;
036import org.apache.activemq.advisory.AdvisorySupport;
037import org.apache.activemq.broker.BrokerContext;
038import org.apache.activemq.broker.BrokerContextAware;
039import org.apache.activemq.command.ActiveMQDestination;
040import org.apache.activemq.command.ActiveMQMessage;
041import org.apache.activemq.command.ActiveMQTempQueue;
042import org.apache.activemq.command.ActiveMQTempTopic;
043import org.apache.activemq.command.Command;
044import org.apache.activemq.command.CommandTypes;
045import org.apache.activemq.command.ConnectionError;
046import org.apache.activemq.command.ConnectionId;
047import org.apache.activemq.command.ConnectionInfo;
048import org.apache.activemq.command.ConsumerControl;
049import org.apache.activemq.command.ConsumerId;
050import org.apache.activemq.command.ConsumerInfo;
051import org.apache.activemq.command.DestinationInfo;
052import org.apache.activemq.command.ExceptionResponse;
053import org.apache.activemq.command.LocalTransactionId;
054import org.apache.activemq.command.MessageAck;
055import org.apache.activemq.command.MessageDispatch;
056import org.apache.activemq.command.MessageId;
057import org.apache.activemq.command.ProducerId;
058import org.apache.activemq.command.ProducerInfo;
059import org.apache.activemq.command.RemoveSubscriptionInfo;
060import org.apache.activemq.command.Response;
061import org.apache.activemq.command.SessionId;
062import org.apache.activemq.command.SessionInfo;
063import org.apache.activemq.command.ShutdownInfo;
064import org.apache.activemq.command.TransactionId;
065import org.apache.activemq.command.TransactionInfo;
066import org.apache.activemq.util.ByteArrayOutputStream;
067import org.apache.activemq.util.FactoryFinder;
068import org.apache.activemq.util.IOExceptionSupport;
069import org.apache.activemq.util.IdGenerator;
070import org.apache.activemq.util.IntrospectionSupport;
071import org.apache.activemq.util.LongSequenceGenerator;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * @author <a href="http://hiramchirino.com">chirino</a>
077 */
078public class ProtocolConverter {
079
080    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
081
082    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
083
084    private static final String BROKER_VERSION;
085    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
086
087    static {
088        String version = "5.6.0";
089        try(InputStream in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
090            if (in != null) {
091                try(InputStreamReader isr = new InputStreamReader(in);
092                    BufferedReader reader = new BufferedReader(isr)) {
093                    version = reader.readLine();
094                }
095            }
096        }catch(Exception e) {
097        }
098
099        BROKER_VERSION = version;
100    }
101
102    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
103    private final SessionId sessionId = new SessionId(connectionId, -1);
104    private final ProducerId producerId = new ProducerId(sessionId, 1);
105
106    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
107    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
108    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
109    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
110
111    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>();
112    private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>();
113    private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<>();
114    private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<>();
115    private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<>();
116    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
117    private final StompTransport stompTransport;
118
119    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
120    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
121
122    private final Object commnadIdMutex = new Object();
123    private int lastCommandId;
124    private final AtomicBoolean connected = new AtomicBoolean(false);
125    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
126    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
127    private final BrokerContext brokerContext;
128    private String version = "1.0";
129    private long hbReadInterval;
130    private long hbWriteInterval;
131    private float hbGracePeriodMultiplier = 1.0f;
132    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
133
134    private static class AckEntry {
135
136        private final String messageId;
137        private final StompSubscription subscription;
138
139        public AckEntry(String messageId, StompSubscription subscription) {
140            this.messageId = messageId;
141            this.subscription = subscription;
142        }
143
144        public MessageAck onMessageAck(TransactionId transactionId) {
145            return subscription.onStompMessageAck(messageId, transactionId);
146        }
147
148        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
149            return subscription.onStompMessageNack(messageId, transactionId);
150        }
151
152        public String getMessageId() {
153            return this.messageId;
154        }
155
156        @SuppressWarnings("unused")
157        public StompSubscription getSubscription() {
158            return this.subscription;
159        }
160    }
161
162    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
163        this.stompTransport = stompTransport;
164        this.brokerContext = brokerContext;
165    }
166
167    protected int generateCommandId() {
168        synchronized (commnadIdMutex) {
169            return lastCommandId++;
170        }
171    }
172
173    protected ResponseHandler createResponseHandler(final StompFrame command) {
174        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
175        if (receiptId != null) {
176            return new ResponseHandler() {
177                @Override
178                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
179                    if (response.isException()) {
180                        // Generally a command can fail.. but that does not invalidate the connection.
181                        // We report back the failure but we don't close the connection.
182                        Throwable exception = ((ExceptionResponse)response).getException();
183                        handleException(exception, command);
184                    } else {
185                        StompFrame sc = new StompFrame();
186                        sc.setAction(Stomp.Responses.RECEIPT);
187                        sc.setHeaders(new HashMap<String, String>(1));
188                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
189                        stompTransport.sendToStomp(sc);
190                    }
191                }
192            };
193        }
194        return null;
195    }
196
197    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
198        command.setCommandId(generateCommandId());
199        if (handler != null) {
200            command.setResponseRequired(true);
201            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
202        }
203        stompTransport.sendToActiveMQ(command);
204    }
205
206    protected void sendToStomp(StompFrame command) throws IOException {
207        stompTransport.sendToStomp(command);
208    }
209
210    protected FrameTranslator findTranslator(String header) {
211        return findTranslator(header, null, false);
212    }
213
214    protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) {
215        FrameTranslator translator = frameTranslator;
216        try {
217            if (header != null) {
218                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
219            } else {
220                if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) {
221                    translator = new JmsFrameTranslator();
222                }
223            }
224        } catch (Exception ignore) {
225            // if anything goes wrong use the default translator
226        }
227
228        if (translator instanceof BrokerContextAware) {
229            ((BrokerContextAware)translator).setBrokerContext(brokerContext);
230        }
231
232        return translator;
233    }
234
235    /**
236     * Convert a STOMP command
237     *
238     * @param command
239     */
240    public void onStompCommand(StompFrame command) throws IOException, JMSException {
241        try {
242
243            if (command.getClass() == StompFrameError.class) {
244                throw ((StompFrameError)command).getException();
245            }
246
247            String action = command.getAction();
248            if (action.startsWith(Stomp.Commands.SEND)) {
249                onStompSend(command);
250            } else if (action.startsWith(Stomp.Commands.ACK)) {
251                onStompAck(command);
252            } else if (action.startsWith(Stomp.Commands.NACK)) {
253                onStompNack(command);
254            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
255                onStompBegin(command);
256            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
257                onStompCommit(command);
258            } else if (action.startsWith(Stomp.Commands.ABORT)) {
259                onStompAbort(command);
260            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE_PREFIX)) {
261                onStompSubscribe(command);
262            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE_PREFIX)) {
263                onStompUnsubscribe(command);
264            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
265                       action.startsWith(Stomp.Commands.STOMP)) {
266                onStompConnect(command);
267            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
268                onStompDisconnect(command);
269            } else {
270                throw new ProtocolException("Unknown STOMP action: " + action, true);
271            }
272
273        } catch (ProtocolException e) {
274            handleException(e, command);
275            // Some protocol errors can cause the connection to get closed.
276            if (e.isFatal()) {
277               getStompTransport().onException(e);
278            }
279        }
280    }
281
282    protected void handleException(Throwable exception, StompFrame command) throws IOException {
283        if (command == null) {
284            LOG.warn("Exception occurred while processing a command: {}", exception.toString());
285        } else {
286            LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString());
287        }
288
289        if (LOG.isDebugEnabled()) {
290            LOG.debug("Exception detail", exception);
291        }
292
293        if (command != null && LOG.isTraceEnabled()) {
294            LOG.trace("Command that caused the error: {}", command);
295        }
296
297        // Let the stomp client know about any protocol errors.
298        ByteArrayOutputStream baos = new ByteArrayOutputStream();
299        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
300        exception.printStackTrace(stream);
301        stream.close();
302
303        HashMap<String, String> headers = new HashMap<>();
304        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
305        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
306
307        if (command != null) {
308            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
309            if (receiptId != null) {
310                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
311            }
312        }
313
314        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
315        sendToStomp(errorMessage);
316    }
317
318    protected void onStompSend(StompFrame command) throws IOException, JMSException {
319        checkConnected();
320
321        Map<String, String> headers = command.getHeaders();
322        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
323        if (destination == null) {
324            throw new ProtocolException("SEND received without a Destination specified!");
325        }
326
327        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
328        headers.remove("transaction");
329
330        ActiveMQMessage message = convertMessage(command);
331
332        message.setProducerId(producerId);
333        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
334        message.setMessageId(id);
335
336        if (stompTx != null) {
337            TransactionId activemqTx = transactions.get(stompTx);
338            if (activemqTx == null) {
339                throw new ProtocolException("Invalid transaction id: " + stompTx);
340            }
341            message.setTransactionId(activemqTx);
342        }
343
344        message.onSend();
345        message.beforeMarshall(null);
346        sendToActiveMQ(message, createResponseHandler(command));
347    }
348
349    protected void onStompNack(StompFrame command) throws ProtocolException {
350
351        checkConnected();
352
353        if (this.version.equals(Stomp.V1_0)) {
354            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
355        }
356
357        Map<String, String> headers = command.getHeaders();
358
359        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
360        if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
361            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
362        }
363
364        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
365        if (messageId == null && !this.version.equals(Stomp.V1_2)) {
366            throw new ProtocolException("NACK received without a message-id to acknowledge!");
367        }
368
369        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
370        if (ackId == null && this.version.equals(Stomp.V1_2)) {
371            throw new ProtocolException("NACK received without an ack header to acknowledge!");
372        }
373
374        TransactionId activemqTx = null;
375        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
376        if (stompTx != null) {
377            activemqTx = transactions.get(stompTx);
378            if (activemqTx == null) {
379                throw new ProtocolException("Invalid transaction id: " + stompTx);
380            }
381        }
382
383        boolean nacked = false;
384
385        if (ackId != null) {
386            AckEntry pendingAck = this.pedingAcks.remove(ackId);
387            if (pendingAck != null) {
388                messageId = pendingAck.getMessageId();
389                MessageAck ack = pendingAck.onMessageNack(activemqTx);
390                if (ack != null) {
391                    sendToActiveMQ(ack, createResponseHandler(command));
392                    nacked = true;
393                }
394            }
395        } else if (subscriptionId != null) {
396            StompSubscription sub = this.subscriptions.get(subscriptionId);
397            if (sub != null) {
398                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
399                if (ack != null) {
400                    sendToActiveMQ(ack, createResponseHandler(command));
401                    nacked = true;
402                }
403            }
404        }
405
406        if (!nacked) {
407            throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
408        }
409    }
410
411    protected void onStompAck(StompFrame command) throws ProtocolException {
412        checkConnected();
413
414        Map<String, String> headers = command.getHeaders();
415        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
416        if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
417            throw new ProtocolException("ACK received without a message-id to acknowledge!");
418        }
419
420        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
421        if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
422            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
423        }
424
425        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
426        if (ackId == null && this.version.equals(Stomp.V1_2)) {
427            throw new ProtocolException("ACK received without a ack id for acknowledge!");
428        }
429
430        TransactionId activemqTx = null;
431        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
432        if (stompTx != null) {
433            activemqTx = transactions.get(stompTx);
434            if (activemqTx == null) {
435                throw new ProtocolException("Invalid transaction id: " + stompTx);
436            }
437        }
438
439        boolean acked = false;
440
441        if (ackId != null) {
442            AckEntry pendingAck = this.pedingAcks.remove(ackId);
443            if (pendingAck != null) {
444                messageId = pendingAck.getMessageId();
445                MessageAck ack = pendingAck.onMessageAck(activemqTx);
446                if (ack != null) {
447                    sendToActiveMQ(ack, createResponseHandler(command));
448                    acked = true;
449                }
450            }
451
452        } else if (subscriptionId != null) {
453            StompSubscription sub = this.subscriptions.get(subscriptionId);
454            if (sub != null) {
455                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
456                if (ack != null) {
457                    sendToActiveMQ(ack, createResponseHandler(command));
458                    acked = true;
459                }
460            }
461        } else {
462            // STOMP v1.0: acking with just a message id is very bogus since the same message id
463            // could have been sent to 2 different subscriptions on the same Stomp connection.
464            // For example, when 2 subs are created on the same topic.
465            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
466                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
467                if (ack != null) {
468                    sendToActiveMQ(ack, createResponseHandler(command));
469                    acked = true;
470                    break;
471                }
472            }
473        }
474
475        if (!acked) {
476            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
477        }
478    }
479
480    protected void onStompBegin(StompFrame command) throws ProtocolException {
481        checkConnected();
482
483        Map<String, String> headers = command.getHeaders();
484
485        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
486
487        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
488            throw new ProtocolException("Must specify the transaction you are beginning");
489        }
490
491        if (transactions.get(stompTx) != null) {
492            throw new ProtocolException("The transaction was already started: " + stompTx);
493        }
494
495        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
496        transactions.put(stompTx, activemqTx);
497
498        TransactionInfo tx = new TransactionInfo();
499        tx.setConnectionId(connectionId);
500        tx.setTransactionId(activemqTx);
501        tx.setType(TransactionInfo.BEGIN);
502
503        sendToActiveMQ(tx, createResponseHandler(command));
504    }
505
506    protected void onStompCommit(StompFrame command) throws ProtocolException {
507        checkConnected();
508
509        Map<String, String> headers = command.getHeaders();
510
511        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
512        if (stompTx == null) {
513            throw new ProtocolException("Must specify the transaction you are committing");
514        }
515
516        TransactionId activemqTx = transactions.remove(stompTx);
517        if (activemqTx == null) {
518            throw new ProtocolException("Invalid transaction id: " + stompTx);
519        }
520
521        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
522            sub.onStompCommit(activemqTx);
523        }
524
525        pedingAcks.clear();
526
527        TransactionInfo tx = new TransactionInfo();
528        tx.setConnectionId(connectionId);
529        tx.setTransactionId(activemqTx);
530        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
531
532        sendToActiveMQ(tx, createResponseHandler(command));
533    }
534
535    protected void onStompAbort(StompFrame command) throws ProtocolException {
536        checkConnected();
537        Map<String, String> headers = command.getHeaders();
538
539        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
540        if (stompTx == null) {
541            throw new ProtocolException("Must specify the transaction you are committing");
542        }
543
544        TransactionId activemqTx = transactions.remove(stompTx);
545        if (activemqTx == null) {
546            throw new ProtocolException("Invalid transaction id: " + stompTx);
547        }
548        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
549            try {
550                sub.onStompAbort(activemqTx);
551            } catch (Exception e) {
552                throw new ProtocolException("Transaction abort failed", false, e);
553            }
554        }
555
556        pedingAcks.clear();
557
558        TransactionInfo tx = new TransactionInfo();
559        tx.setConnectionId(connectionId);
560        tx.setTransactionId(activemqTx);
561        tx.setType(TransactionInfo.ROLLBACK);
562
563        sendToActiveMQ(tx, createResponseHandler(command));
564    }
565
566    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
567        checkConnected();
568        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
569        Map<String, String> headers = command.getHeaders();
570
571        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
572        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
573
574        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
575            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
576        }
577
578        final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
579
580        if (actualDest == null) {
581            throw new ProtocolException("Invalid 'null' Destination.");
582        }
583
584        final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
585        ConsumerInfo consumerInfo = new ConsumerInfo(id);
586        consumerInfo.setPrefetchSize(actualDest.isQueue() ?
587                ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
588                headers.containsKey("activemq.subscriptionName") ?
589                        ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
590        consumerInfo.setDispatchAsync(true);
591
592        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
593        if (browser != null && browser.equals(Stomp.TRUE)) {
594
595            if (this.version.equals(Stomp.V1_0)) {
596                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!");
597            }
598
599            consumerInfo.setBrowser(true);
600            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
601        }
602
603        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
604        if (selector != null) {
605            consumerInfo.setSelector("convert_string_expressions:" + selector);
606        }
607
608        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
609
610        if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
611            throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
612        }
613
614        consumerInfo.setDestination(actualDest);
615        consumerInfo.setDispatchAsync(true);
616
617        StompSubscription stompSubscription;
618        if (!consumerInfo.isBrowser()) {
619            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
620        } else {
621            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
622        }
623        stompSubscription.setDestination(actualDest);
624
625        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
626        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
627            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
628        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
629            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
630        } else {
631            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
632        }
633
634        subscriptionsByConsumerId.put(id, stompSubscription);
635        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
636        if (subscriptionId != null) {
637            subscriptions.put(subscriptionId, stompSubscription);
638        }
639
640        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
641        if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
642
643            final StompFrame cmd = command;
644            final int prefetch = consumerInfo.getPrefetchSize();
645
646            // Since dispatch could beat the receipt we set prefetch to zero to start and then
647            // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
648            // an error message.
649            consumerInfo.setPrefetchSize(0);
650
651            final ResponseHandler handler = new ResponseHandler() {
652                @Override
653                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
654                    if (response.isException()) {
655                        // Generally a command can fail.. but that does not invalidate the connection.
656                        // We report back the failure but we don't close the connection.
657                        Throwable exception = ((ExceptionResponse)response).getException();
658                        handleException(exception, cmd);
659                    } else {
660                        StompFrame sc = new StompFrame();
661                        sc.setAction(Stomp.Responses.RECEIPT);
662                        sc.setHeaders(new HashMap<String, String>(1));
663                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
664                        stompTransport.sendToStomp(sc);
665
666                        ConsumerControl control = new ConsumerControl();
667                        control.setPrefetch(prefetch);
668                        control.setDestination(actualDest);
669                        control.setConsumerId(id);
670
671                        sendToActiveMQ(control, null);
672                    }
673                }
674            };
675
676            sendToActiveMQ(consumerInfo, handler);
677        } else {
678            sendToActiveMQ(consumerInfo, createResponseHandler(command));
679        }
680    }
681
682    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
683        checkConnected();
684        Map<String, String> headers = command.getHeaders();
685
686        ActiveMQDestination destination = null;
687        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
688        if (o != null) {
689            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
690        }
691
692        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
693        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
694            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
695        }
696
697        if (subscriptionId == null && destination == null) {
698            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
699        }
700
701        // check if it is a durable subscription
702        String durable = command.getHeaders().get("activemq.subscriptionName");
703        String clientId = durable;
704        if (!this.version.equals(Stomp.V1_0)) {
705            clientId = connectionInfo.getClientId();
706        }
707
708        if (durable != null) {
709            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
710            info.setClientId(clientId);
711            info.setSubscriptionName(durable);
712            info.setConnectionId(connectionId);
713            sendToActiveMQ(info, createResponseHandler(command));
714            return;
715        }
716
717        if (subscriptionId != null) {
718            StompSubscription sub = this.subscriptions.remove(subscriptionId);
719            if (sub != null) {
720                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
721                return;
722            }
723        } else {
724            // Unsubscribing using a destination is a bit weird if multiple subscriptions
725            // are created with the same destination.
726            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
727                StompSubscription sub = iter.next();
728                if (destination != null && destination.equals(sub.getDestination())) {
729                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
730                    iter.remove();
731                    return;
732                }
733            }
734        }
735
736        throw new ProtocolException("No subscription matched.");
737    }
738
739    ConnectionInfo connectionInfo = new ConnectionInfo();
740
741    protected void onStompConnect(final StompFrame command) throws ProtocolException {
742
743        if (connected.get()) {
744            throw new ProtocolException("Already connected.");
745        }
746
747        final Map<String, String> headers = command.getHeaders();
748
749        // allow anyone to login for now
750        String login = headers.get(Stomp.Headers.Connect.LOGIN);
751        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
752        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
753        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
754
755        if (heartBeat == null) {
756            heartBeat = defaultHeartBeat;
757        }
758
759        this.version = StompCodec.detectVersion(headers);
760
761        configureInactivityMonitor(heartBeat.trim());
762
763        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
764        connectionInfo.setConnectionId(connectionId);
765        if (clientId != null) {
766            connectionInfo.setClientId(clientId);
767        } else {
768            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
769        }
770
771        connectionInfo.setResponseRequired(true);
772        connectionInfo.setUserName(login);
773        connectionInfo.setPassword(passcode);
774        connectionInfo.setTransportContext(command.getTransportContext());
775
776        sendToActiveMQ(connectionInfo, new ResponseHandler() {
777            @Override
778            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
779
780                if (response.isException()) {
781                    // If the connection attempt fails we close the socket.
782                    Throwable exception = ((ExceptionResponse)response).getException();
783                    handleException(exception, command);
784                    getStompTransport().onException(IOExceptionSupport.create(exception));
785                    return;
786                }
787
788                final SessionInfo sessionInfo = new SessionInfo(sessionId);
789                sendToActiveMQ(sessionInfo, null);
790
791                final ProducerInfo producerInfo = new ProducerInfo(producerId);
792                sendToActiveMQ(producerInfo, new ResponseHandler() {
793                    @Override
794                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
795
796                        if (response.isException()) {
797                            // If the connection attempt fails we close the socket.
798                            Throwable exception = ((ExceptionResponse)response).getException();
799                            handleException(exception, command);
800                            getStompTransport().onException(IOExceptionSupport.create(exception));
801                        }
802
803                        connected.set(true);
804                        HashMap<String, String> responseHeaders = new HashMap<>();
805
806                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
807                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
808                        if (requestId == null) {
809                            // TODO legacy
810                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
811                        }
812                        if (requestId != null) {
813                            // TODO legacy
814                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
815                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
816                        }
817
818                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
819                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
820                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
821                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
822
823                        StompFrame sc = new StompFrame();
824                        sc.setAction(Stomp.Responses.CONNECTED);
825                        sc.setHeaders(responseHeaders);
826                        sendToStomp(sc);
827
828                        StompWireFormat format = stompTransport.getWireFormat();
829                        if (format != null) {
830                            format.setStompVersion(version);
831                        }
832                    }
833                });
834            }
835        });
836    }
837
838    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
839        if (connected.get()) {
840            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
841            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
842            connected.set(false);
843        }
844    }
845
846    protected void checkConnected() throws ProtocolException {
847        if (!connected.get()) {
848            throw new ProtocolException("Not connected.");
849        }
850    }
851
852    /**
853     * Dispatch a ActiveMQ command
854     *
855     * @param command
856     * @throws IOException
857     */
858    public void onActiveMQCommand(Command command) throws IOException, JMSException {
859        if (command.isResponse()) {
860            Response response = (Response)command;
861            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
862            if (rh != null) {
863                rh.onResponse(this, response);
864            } else {
865                // Pass down any unexpected errors. Should this close the connection?
866                if (response.isException()) {
867                    Throwable exception = ((ExceptionResponse)response).getException();
868                    handleException(exception, null);
869                }
870            }
871        } else if (command.isMessageDispatch()) {
872            MessageDispatch md = (MessageDispatch)command;
873            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
874            if (sub != null) {
875                String ackId = null;
876                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
877                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
878                    ackId = this.ACK_ID_GENERATOR.generateId();
879                    this.pedingAcks.put(ackId, pendingAck);
880                }
881                try {
882                    sub.onMessageDispatch(md, ackId);
883                } catch (Exception ex) {
884                    if (ackId != null) {
885                        this.pedingAcks.remove(ackId);
886                    }
887                }
888            }
889        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
890            stompTransport.sendToStomp(ping);
891        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
892            // Pass down any unexpected async errors. Should this close the connection?
893            Throwable exception = ((ConnectionError)command).getException();
894            handleException(exception, null);
895        }
896    }
897
898    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
899        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
900        return msg;
901    }
902
903    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
904        if (ignoreTransformation == true) {
905            return frameTranslator.convertMessage(this, message);
906        } else {
907            FrameTranslator translator = findTranslator(
908                message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory());
909            return translator.convertMessage(this, message);
910        }
911    }
912
913    public StompTransport getStompTransport() {
914        return stompTransport;
915    }
916
917    public ActiveMQDestination createTempDestination(String name, boolean topic) {
918        ActiveMQDestination rc = tempDestinations.get(name);
919        if( rc == null ) {
920            if (topic) {
921                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
922            } else {
923                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
924            }
925            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
926            tempDestinations.put(name, rc);
927            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
928        }
929        return rc;
930    }
931
932    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
933        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
934    }
935
936    public String getDefaultHeartBeat() {
937        return defaultHeartBeat;
938    }
939
940    public void setDefaultHeartBeat(String defaultHeartBeat) {
941        this.defaultHeartBeat = defaultHeartBeat;
942    }
943
944    /**
945     * @return the hbGracePeriodMultiplier
946     */
947    public float getHbGracePeriodMultiplier() {
948        return hbGracePeriodMultiplier;
949    }
950
951    /**
952     * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
953     */
954    public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
955        this.hbGracePeriodMultiplier = hbGracePeriodMultiplier;
956    }
957
958    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
959
960        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
961
962        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
963            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
964        } else {
965
966            try {
967                hbReadInterval = (Long.parseLong(keepAliveOpts[0]));
968                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
969            } catch(NumberFormatException e) {
970                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
971            }
972
973            try {
974                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
975                monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier));
976                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
977                monitor.setWriteCheckTime(hbWriteInterval);
978                monitor.startMonitoring();
979            } catch(Exception ex) {
980                hbReadInterval = 0;
981                hbWriteInterval = 0;
982            }
983
984            if (LOG.isDebugEnabled()) {
985                LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval);
986            }
987        }
988    }
989
990    protected void sendReceipt(StompFrame command) {
991        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
992        if (receiptId != null) {
993            StompFrame sc = new StompFrame();
994            sc.setAction(Stomp.Responses.RECEIPT);
995            sc.setHeaders(new HashMap<String, String>(1));
996            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
997            try {
998                sendToStomp(sc);
999            } catch (IOException e) {
1000                LOG.warn("Could not send a receipt for {}", command, e);
1001            }
1002        }
1003    }
1004
1005    /**
1006     * Retrieve the STOMP action value from a frame if the value is valid, otherwise
1007     * return an unknown string to allow for safe log output.
1008     *
1009     * @param command
1010     *      The STOMP command to fetch an action from.
1011     *
1012     * @return the command action or a safe string to use in logging.
1013     */
1014    protected Object safeGetAction(StompFrame command) {
1015        String result = "<Unknown>";
1016        if (command != null && command.getAction() != null) {
1017            String action = command.getAction().trim();
1018
1019            if (action != null) {
1020                switch (action) {
1021                    case Stomp.Commands.SEND:
1022                    case Stomp.Commands.ACK:
1023                    case Stomp.Commands.NACK:
1024                    case Stomp.Commands.BEGIN:
1025                    case Stomp.Commands.COMMIT:
1026                    case Stomp.Commands.ABORT:
1027                    case Stomp.Commands.SUBSCRIBE:
1028                    case Stomp.Commands.UNSUBSCRIBE:
1029                    case Stomp.Commands.CONNECT:
1030                    case Stomp.Commands.STOMP:
1031                    case Stomp.Commands.DISCONNECT:
1032                        result = action;
1033                        break;
1034                    case Stomp.Commands.SUBSCRIBE_PREFIX:
1035                        result = Stomp.Commands.SUBSCRIBE;
1036                    case Stomp.Commands.UNSUBSCRIBE_PREFIX:
1037                        result = Stomp.Commands.UNSUBSCRIBE;
1038                    default:
1039                        break;
1040                }
1041            }
1042        }
1043
1044        return result;
1045    }
1046
1047    /**
1048     * Remove all pending acknowledgement markers that are batched into the single
1049     * client acknowledge operation.
1050     *
1051     * @param subscription
1052     *      The STOMP Subscription that has performed a client acknowledge.
1053     * @param msgIdsToRemove
1054     *      List of message IDs that are bound to the subscription that has ack'd
1055     */
1056    protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
1057        int count = 0;
1058
1059        for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
1060            AckEntry actEntry = entry.getValue();
1061            if (msgIdsToRemove.contains(actEntry.messageId)) {
1062                this.pedingAcks.remove(entry.getKey());
1063                count++;
1064            }
1065        }
1066
1067        LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
1068    }
1069}