001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStreamWriter; 024import java.io.PrintWriter; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.atomic.AtomicBoolean; 032 033import javax.jms.JMSException; 034 035import org.apache.activemq.ActiveMQPrefetchPolicy; 036import org.apache.activemq.advisory.AdvisorySupport; 037import org.apache.activemq.broker.BrokerContext; 038import org.apache.activemq.broker.BrokerContextAware; 039import org.apache.activemq.command.ActiveMQDestination; 040import org.apache.activemq.command.ActiveMQMessage; 041import org.apache.activemq.command.ActiveMQTempQueue; 042import org.apache.activemq.command.ActiveMQTempTopic; 043import org.apache.activemq.command.Command; 044import org.apache.activemq.command.CommandTypes; 045import org.apache.activemq.command.ConnectionError; 046import org.apache.activemq.command.ConnectionId; 047import org.apache.activemq.command.ConnectionInfo; 048import org.apache.activemq.command.ConsumerControl; 049import org.apache.activemq.command.ConsumerId; 050import org.apache.activemq.command.ConsumerInfo; 051import org.apache.activemq.command.DestinationInfo; 052import org.apache.activemq.command.ExceptionResponse; 053import org.apache.activemq.command.LocalTransactionId; 054import org.apache.activemq.command.MessageAck; 055import org.apache.activemq.command.MessageDispatch; 056import org.apache.activemq.command.MessageId; 057import org.apache.activemq.command.ProducerId; 058import org.apache.activemq.command.ProducerInfo; 059import org.apache.activemq.command.RemoveSubscriptionInfo; 060import org.apache.activemq.command.Response; 061import org.apache.activemq.command.SessionId; 062import org.apache.activemq.command.SessionInfo; 063import org.apache.activemq.command.ShutdownInfo; 064import org.apache.activemq.command.TransactionId; 065import org.apache.activemq.command.TransactionInfo; 066import org.apache.activemq.util.ByteArrayOutputStream; 067import org.apache.activemq.util.FactoryFinder; 068import org.apache.activemq.util.IOExceptionSupport; 069import org.apache.activemq.util.IdGenerator; 070import org.apache.activemq.util.IntrospectionSupport; 071import org.apache.activemq.util.LongSequenceGenerator; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075/** 076 * @author <a href="http://hiramchirino.com">chirino</a> 077 */ 078public class ProtocolConverter { 079 080 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 081 082 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 083 084 private static final String BROKER_VERSION; 085 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); 086 087 static { 088 String version = "5.6.0"; 089 try(InputStream in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 090 if (in != null) { 091 try(InputStreamReader isr = new InputStreamReader(in); 092 BufferedReader reader = new BufferedReader(isr)) { 093 version = reader.readLine(); 094 } 095 } 096 }catch(Exception e) { 097 } 098 099 BROKER_VERSION = version; 100 } 101 102 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 103 private final SessionId sessionId = new SessionId(connectionId, -1); 104 private final ProducerId producerId = new ProducerId(sessionId, 1); 105 106 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 107 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 108 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 109 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 110 111 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>(); 112 private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>(); 113 private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<>(); 114 private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<>(); 115 private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<>(); 116 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>(); 117 private final StompTransport stompTransport; 118 119 private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>(); 120 private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); 121 122 private final Object commnadIdMutex = new Object(); 123 private int lastCommandId; 124 private final AtomicBoolean connected = new AtomicBoolean(false); 125 private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); 126 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 127 private final BrokerContext brokerContext; 128 private String version = "1.0"; 129 private long hbReadInterval; 130 private long hbWriteInterval; 131 private float hbGracePeriodMultiplier = 1.0f; 132 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; 133 134 private static class AckEntry { 135 136 private final String messageId; 137 private final StompSubscription subscription; 138 139 public AckEntry(String messageId, StompSubscription subscription) { 140 this.messageId = messageId; 141 this.subscription = subscription; 142 } 143 144 public MessageAck onMessageAck(TransactionId transactionId) { 145 return subscription.onStompMessageAck(messageId, transactionId); 146 } 147 148 public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { 149 return subscription.onStompMessageNack(messageId, transactionId); 150 } 151 152 public String getMessageId() { 153 return this.messageId; 154 } 155 156 @SuppressWarnings("unused") 157 public StompSubscription getSubscription() { 158 return this.subscription; 159 } 160 } 161 162 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { 163 this.stompTransport = stompTransport; 164 this.brokerContext = brokerContext; 165 } 166 167 protected int generateCommandId() { 168 synchronized (commnadIdMutex) { 169 return lastCommandId++; 170 } 171 } 172 173 protected ResponseHandler createResponseHandler(final StompFrame command) { 174 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 175 if (receiptId != null) { 176 return new ResponseHandler() { 177 @Override 178 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 179 if (response.isException()) { 180 // Generally a command can fail.. but that does not invalidate the connection. 181 // We report back the failure but we don't close the connection. 182 Throwable exception = ((ExceptionResponse)response).getException(); 183 handleException(exception, command); 184 } else { 185 StompFrame sc = new StompFrame(); 186 sc.setAction(Stomp.Responses.RECEIPT); 187 sc.setHeaders(new HashMap<String, String>(1)); 188 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 189 stompTransport.sendToStomp(sc); 190 } 191 } 192 }; 193 } 194 return null; 195 } 196 197 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 198 command.setCommandId(generateCommandId()); 199 if (handler != null) { 200 command.setResponseRequired(true); 201 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 202 } 203 stompTransport.sendToActiveMQ(command); 204 } 205 206 protected void sendToStomp(StompFrame command) throws IOException { 207 stompTransport.sendToStomp(command); 208 } 209 210 protected FrameTranslator findTranslator(String header) { 211 return findTranslator(header, null, false); 212 } 213 214 protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { 215 FrameTranslator translator = frameTranslator; 216 try { 217 if (header != null) { 218 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); 219 } else { 220 if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { 221 translator = new JmsFrameTranslator(); 222 } 223 } 224 } catch (Exception ignore) { 225 // if anything goes wrong use the default translator 226 } 227 228 if (translator instanceof BrokerContextAware) { 229 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 230 } 231 232 return translator; 233 } 234 235 /** 236 * Convert a STOMP command 237 * 238 * @param command 239 */ 240 public void onStompCommand(StompFrame command) throws IOException, JMSException { 241 try { 242 243 if (command.getClass() == StompFrameError.class) { 244 throw ((StompFrameError)command).getException(); 245 } 246 247 String action = command.getAction(); 248 if (action.startsWith(Stomp.Commands.SEND)) { 249 onStompSend(command); 250 } else if (action.startsWith(Stomp.Commands.ACK)) { 251 onStompAck(command); 252 } else if (action.startsWith(Stomp.Commands.NACK)) { 253 onStompNack(command); 254 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 255 onStompBegin(command); 256 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 257 onStompCommit(command); 258 } else if (action.startsWith(Stomp.Commands.ABORT)) { 259 onStompAbort(command); 260 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE_PREFIX)) { 261 onStompSubscribe(command); 262 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE_PREFIX)) { 263 onStompUnsubscribe(command); 264 } else if (action.startsWith(Stomp.Commands.CONNECT) || 265 action.startsWith(Stomp.Commands.STOMP)) { 266 onStompConnect(command); 267 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 268 onStompDisconnect(command); 269 } else { 270 throw new ProtocolException("Unknown STOMP action: " + action, true); 271 } 272 273 } catch (ProtocolException e) { 274 handleException(e, command); 275 // Some protocol errors can cause the connection to get closed. 276 if (e.isFatal()) { 277 getStompTransport().onException(e); 278 } 279 } 280 } 281 282 protected void handleException(Throwable exception, StompFrame command) throws IOException { 283 if (command == null) { 284 LOG.warn("Exception occurred while processing a command: {}", exception.toString()); 285 } else { 286 LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString()); 287 } 288 289 if (LOG.isDebugEnabled()) { 290 LOG.debug("Exception detail", exception); 291 } 292 293 if (command != null && LOG.isTraceEnabled()) { 294 LOG.trace("Command that caused the error: {}", command); 295 } 296 297 // Let the stomp client know about any protocol errors. 298 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 299 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 300 exception.printStackTrace(stream); 301 stream.close(); 302 303 HashMap<String, String> headers = new HashMap<>(); 304 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 305 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); 306 307 if (command != null) { 308 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 309 if (receiptId != null) { 310 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 311 } 312 } 313 314 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 315 sendToStomp(errorMessage); 316 } 317 318 protected void onStompSend(StompFrame command) throws IOException, JMSException { 319 checkConnected(); 320 321 Map<String, String> headers = command.getHeaders(); 322 String destination = headers.get(Stomp.Headers.Send.DESTINATION); 323 if (destination == null) { 324 throw new ProtocolException("SEND received without a Destination specified!"); 325 } 326 327 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 328 headers.remove("transaction"); 329 330 ActiveMQMessage message = convertMessage(command); 331 332 message.setProducerId(producerId); 333 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 334 message.setMessageId(id); 335 336 if (stompTx != null) { 337 TransactionId activemqTx = transactions.get(stompTx); 338 if (activemqTx == null) { 339 throw new ProtocolException("Invalid transaction id: " + stompTx); 340 } 341 message.setTransactionId(activemqTx); 342 } 343 344 message.onSend(); 345 message.beforeMarshall(null); 346 sendToActiveMQ(message, createResponseHandler(command)); 347 } 348 349 protected void onStompNack(StompFrame command) throws ProtocolException { 350 351 checkConnected(); 352 353 if (this.version.equals(Stomp.V1_0)) { 354 throw new ProtocolException("NACK received but connection is in v1.0 mode."); 355 } 356 357 Map<String, String> headers = command.getHeaders(); 358 359 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 360 if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) { 361 throw new ProtocolException("NACK received without a subscription id for acknowledge!"); 362 } 363 364 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 365 if (messageId == null && !this.version.equals(Stomp.V1_2)) { 366 throw new ProtocolException("NACK received without a message-id to acknowledge!"); 367 } 368 369 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 370 if (ackId == null && this.version.equals(Stomp.V1_2)) { 371 throw new ProtocolException("NACK received without an ack header to acknowledge!"); 372 } 373 374 TransactionId activemqTx = null; 375 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 376 if (stompTx != null) { 377 activemqTx = transactions.get(stompTx); 378 if (activemqTx == null) { 379 throw new ProtocolException("Invalid transaction id: " + stompTx); 380 } 381 } 382 383 boolean nacked = false; 384 385 if (ackId != null) { 386 AckEntry pendingAck = this.pedingAcks.remove(ackId); 387 if (pendingAck != null) { 388 messageId = pendingAck.getMessageId(); 389 MessageAck ack = pendingAck.onMessageNack(activemqTx); 390 if (ack != null) { 391 sendToActiveMQ(ack, createResponseHandler(command)); 392 nacked = true; 393 } 394 } 395 } else if (subscriptionId != null) { 396 StompSubscription sub = this.subscriptions.get(subscriptionId); 397 if (sub != null) { 398 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); 399 if (ack != null) { 400 sendToActiveMQ(ack, createResponseHandler(command)); 401 nacked = true; 402 } 403 } 404 } 405 406 if (!nacked) { 407 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); 408 } 409 } 410 411 protected void onStompAck(StompFrame command) throws ProtocolException { 412 checkConnected(); 413 414 Map<String, String> headers = command.getHeaders(); 415 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 416 if (messageId == null && !(this.version.equals(Stomp.V1_2))) { 417 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 418 } 419 420 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 421 if (subscriptionId == null && this.version.equals(Stomp.V1_1)) { 422 throw new ProtocolException("ACK received without a subscription id for acknowledge!"); 423 } 424 425 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 426 if (ackId == null && this.version.equals(Stomp.V1_2)) { 427 throw new ProtocolException("ACK received without a ack id for acknowledge!"); 428 } 429 430 TransactionId activemqTx = null; 431 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 432 if (stompTx != null) { 433 activemqTx = transactions.get(stompTx); 434 if (activemqTx == null) { 435 throw new ProtocolException("Invalid transaction id: " + stompTx); 436 } 437 } 438 439 boolean acked = false; 440 441 if (ackId != null) { 442 AckEntry pendingAck = this.pedingAcks.remove(ackId); 443 if (pendingAck != null) { 444 messageId = pendingAck.getMessageId(); 445 MessageAck ack = pendingAck.onMessageAck(activemqTx); 446 if (ack != null) { 447 sendToActiveMQ(ack, createResponseHandler(command)); 448 acked = true; 449 } 450 } 451 452 } else if (subscriptionId != null) { 453 StompSubscription sub = this.subscriptions.get(subscriptionId); 454 if (sub != null) { 455 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 456 if (ack != null) { 457 sendToActiveMQ(ack, createResponseHandler(command)); 458 acked = true; 459 } 460 } 461 } else { 462 // STOMP v1.0: acking with just a message id is very bogus since the same message id 463 // could have been sent to 2 different subscriptions on the same Stomp connection. 464 // For example, when 2 subs are created on the same topic. 465 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 466 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 467 if (ack != null) { 468 sendToActiveMQ(ack, createResponseHandler(command)); 469 acked = true; 470 break; 471 } 472 } 473 } 474 475 if (!acked) { 476 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 477 } 478 } 479 480 protected void onStompBegin(StompFrame command) throws ProtocolException { 481 checkConnected(); 482 483 Map<String, String> headers = command.getHeaders(); 484 485 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 486 487 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 488 throw new ProtocolException("Must specify the transaction you are beginning"); 489 } 490 491 if (transactions.get(stompTx) != null) { 492 throw new ProtocolException("The transaction was already started: " + stompTx); 493 } 494 495 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 496 transactions.put(stompTx, activemqTx); 497 498 TransactionInfo tx = new TransactionInfo(); 499 tx.setConnectionId(connectionId); 500 tx.setTransactionId(activemqTx); 501 tx.setType(TransactionInfo.BEGIN); 502 503 sendToActiveMQ(tx, createResponseHandler(command)); 504 } 505 506 protected void onStompCommit(StompFrame command) throws ProtocolException { 507 checkConnected(); 508 509 Map<String, String> headers = command.getHeaders(); 510 511 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 512 if (stompTx == null) { 513 throw new ProtocolException("Must specify the transaction you are committing"); 514 } 515 516 TransactionId activemqTx = transactions.remove(stompTx); 517 if (activemqTx == null) { 518 throw new ProtocolException("Invalid transaction id: " + stompTx); 519 } 520 521 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 522 sub.onStompCommit(activemqTx); 523 } 524 525 pedingAcks.clear(); 526 527 TransactionInfo tx = new TransactionInfo(); 528 tx.setConnectionId(connectionId); 529 tx.setTransactionId(activemqTx); 530 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 531 532 sendToActiveMQ(tx, createResponseHandler(command)); 533 } 534 535 protected void onStompAbort(StompFrame command) throws ProtocolException { 536 checkConnected(); 537 Map<String, String> headers = command.getHeaders(); 538 539 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 540 if (stompTx == null) { 541 throw new ProtocolException("Must specify the transaction you are committing"); 542 } 543 544 TransactionId activemqTx = transactions.remove(stompTx); 545 if (activemqTx == null) { 546 throw new ProtocolException("Invalid transaction id: " + stompTx); 547 } 548 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 549 try { 550 sub.onStompAbort(activemqTx); 551 } catch (Exception e) { 552 throw new ProtocolException("Transaction abort failed", false, e); 553 } 554 } 555 556 pedingAcks.clear(); 557 558 TransactionInfo tx = new TransactionInfo(); 559 tx.setConnectionId(connectionId); 560 tx.setTransactionId(activemqTx); 561 tx.setType(TransactionInfo.ROLLBACK); 562 563 sendToActiveMQ(tx, createResponseHandler(command)); 564 } 565 566 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 567 checkConnected(); 568 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 569 Map<String, String> headers = command.getHeaders(); 570 571 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 572 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 573 574 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 575 throw new ProtocolException("SUBSCRIBE received without a subscription id!"); 576 } 577 578 final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); 579 580 if (actualDest == null) { 581 throw new ProtocolException("Invalid 'null' Destination."); 582 } 583 584 final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 585 ConsumerInfo consumerInfo = new ConsumerInfo(id); 586 consumerInfo.setPrefetchSize(actualDest.isQueue() ? 587 ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH : 588 headers.containsKey("activemq.subscriptionName") ? 589 ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 590 consumerInfo.setDispatchAsync(true); 591 592 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER); 593 if (browser != null && browser.equals(Stomp.TRUE)) { 594 595 if (this.version.equals(Stomp.V1_0)) { 596 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!"); 597 } 598 599 consumerInfo.setBrowser(true); 600 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH); 601 } 602 603 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 604 if (selector != null) { 605 consumerInfo.setSelector("convert_string_expressions:" + selector); 606 } 607 608 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 609 610 if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) { 611 throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!"); 612 } 613 614 consumerInfo.setDestination(actualDest); 615 consumerInfo.setDispatchAsync(true); 616 617 StompSubscription stompSubscription; 618 if (!consumerInfo.isBrowser()) { 619 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 620 } else { 621 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 622 } 623 stompSubscription.setDestination(actualDest); 624 625 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 626 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 627 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 628 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 629 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 630 } else { 631 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 632 } 633 634 subscriptionsByConsumerId.put(id, stompSubscription); 635 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set. 636 if (subscriptionId != null) { 637 subscriptions.put(subscriptionId, stompSubscription); 638 } 639 640 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 641 if (receiptId != null && consumerInfo.getPrefetchSize() > 0) { 642 643 final StompFrame cmd = command; 644 final int prefetch = consumerInfo.getPrefetchSize(); 645 646 // Since dispatch could beat the receipt we set prefetch to zero to start and then 647 // once we've sent our Receipt we are safe to turn on dispatch if the response isn't 648 // an error message. 649 consumerInfo.setPrefetchSize(0); 650 651 final ResponseHandler handler = new ResponseHandler() { 652 @Override 653 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 654 if (response.isException()) { 655 // Generally a command can fail.. but that does not invalidate the connection. 656 // We report back the failure but we don't close the connection. 657 Throwable exception = ((ExceptionResponse)response).getException(); 658 handleException(exception, cmd); 659 } else { 660 StompFrame sc = new StompFrame(); 661 sc.setAction(Stomp.Responses.RECEIPT); 662 sc.setHeaders(new HashMap<String, String>(1)); 663 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 664 stompTransport.sendToStomp(sc); 665 666 ConsumerControl control = new ConsumerControl(); 667 control.setPrefetch(prefetch); 668 control.setDestination(actualDest); 669 control.setConsumerId(id); 670 671 sendToActiveMQ(control, null); 672 } 673 } 674 }; 675 676 sendToActiveMQ(consumerInfo, handler); 677 } else { 678 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 679 } 680 } 681 682 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 683 checkConnected(); 684 Map<String, String> headers = command.getHeaders(); 685 686 ActiveMQDestination destination = null; 687 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 688 if (o != null) { 689 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true); 690 } 691 692 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 693 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 694 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); 695 } 696 697 if (subscriptionId == null && destination == null) { 698 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 699 } 700 701 // check if it is a durable subscription 702 String durable = command.getHeaders().get("activemq.subscriptionName"); 703 String clientId = durable; 704 if (!this.version.equals(Stomp.V1_0)) { 705 clientId = connectionInfo.getClientId(); 706 } 707 708 if (durable != null) { 709 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 710 info.setClientId(clientId); 711 info.setSubscriptionName(durable); 712 info.setConnectionId(connectionId); 713 sendToActiveMQ(info, createResponseHandler(command)); 714 return; 715 } 716 717 if (subscriptionId != null) { 718 StompSubscription sub = this.subscriptions.remove(subscriptionId); 719 if (sub != null) { 720 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 721 return; 722 } 723 } else { 724 // Unsubscribing using a destination is a bit weird if multiple subscriptions 725 // are created with the same destination. 726 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 727 StompSubscription sub = iter.next(); 728 if (destination != null && destination.equals(sub.getDestination())) { 729 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 730 iter.remove(); 731 return; 732 } 733 } 734 } 735 736 throw new ProtocolException("No subscription matched."); 737 } 738 739 ConnectionInfo connectionInfo = new ConnectionInfo(); 740 741 protected void onStompConnect(final StompFrame command) throws ProtocolException { 742 743 if (connected.get()) { 744 throw new ProtocolException("Already connected."); 745 } 746 747 final Map<String, String> headers = command.getHeaders(); 748 749 // allow anyone to login for now 750 String login = headers.get(Stomp.Headers.Connect.LOGIN); 751 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 752 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 753 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT); 754 755 if (heartBeat == null) { 756 heartBeat = defaultHeartBeat; 757 } 758 759 this.version = StompCodec.detectVersion(headers); 760 761 configureInactivityMonitor(heartBeat.trim()); 762 763 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 764 connectionInfo.setConnectionId(connectionId); 765 if (clientId != null) { 766 connectionInfo.setClientId(clientId); 767 } else { 768 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 769 } 770 771 connectionInfo.setResponseRequired(true); 772 connectionInfo.setUserName(login); 773 connectionInfo.setPassword(passcode); 774 connectionInfo.setTransportContext(command.getTransportContext()); 775 776 sendToActiveMQ(connectionInfo, new ResponseHandler() { 777 @Override 778 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 779 780 if (response.isException()) { 781 // If the connection attempt fails we close the socket. 782 Throwable exception = ((ExceptionResponse)response).getException(); 783 handleException(exception, command); 784 getStompTransport().onException(IOExceptionSupport.create(exception)); 785 return; 786 } 787 788 final SessionInfo sessionInfo = new SessionInfo(sessionId); 789 sendToActiveMQ(sessionInfo, null); 790 791 final ProducerInfo producerInfo = new ProducerInfo(producerId); 792 sendToActiveMQ(producerInfo, new ResponseHandler() { 793 @Override 794 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 795 796 if (response.isException()) { 797 // If the connection attempt fails we close the socket. 798 Throwable exception = ((ExceptionResponse)response).getException(); 799 handleException(exception, command); 800 getStompTransport().onException(IOExceptionSupport.create(exception)); 801 } 802 803 connected.set(true); 804 HashMap<String, String> responseHeaders = new HashMap<>(); 805 806 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 807 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 808 if (requestId == null) { 809 // TODO legacy 810 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 811 } 812 if (requestId != null) { 813 // TODO legacy 814 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 815 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 816 } 817 818 responseHeaders.put(Stomp.Headers.Connected.VERSION, version); 819 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT, 820 String.format("%d,%d", hbWriteInterval, hbReadInterval)); 821 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION); 822 823 StompFrame sc = new StompFrame(); 824 sc.setAction(Stomp.Responses.CONNECTED); 825 sc.setHeaders(responseHeaders); 826 sendToStomp(sc); 827 828 StompWireFormat format = stompTransport.getWireFormat(); 829 if (format != null) { 830 format.setStompVersion(version); 831 } 832 } 833 }); 834 } 835 }); 836 } 837 838 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 839 if (connected.get()) { 840 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 841 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 842 connected.set(false); 843 } 844 } 845 846 protected void checkConnected() throws ProtocolException { 847 if (!connected.get()) { 848 throw new ProtocolException("Not connected."); 849 } 850 } 851 852 /** 853 * Dispatch a ActiveMQ command 854 * 855 * @param command 856 * @throws IOException 857 */ 858 public void onActiveMQCommand(Command command) throws IOException, JMSException { 859 if (command.isResponse()) { 860 Response response = (Response)command; 861 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 862 if (rh != null) { 863 rh.onResponse(this, response); 864 } else { 865 // Pass down any unexpected errors. Should this close the connection? 866 if (response.isException()) { 867 Throwable exception = ((ExceptionResponse)response).getException(); 868 handleException(exception, null); 869 } 870 } 871 } else if (command.isMessageDispatch()) { 872 MessageDispatch md = (MessageDispatch)command; 873 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 874 if (sub != null) { 875 String ackId = null; 876 if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) { 877 AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); 878 ackId = this.ACK_ID_GENERATOR.generateId(); 879 this.pedingAcks.put(ackId, pendingAck); 880 } 881 try { 882 sub.onMessageDispatch(md, ackId); 883 } catch (Exception ex) { 884 if (ackId != null) { 885 this.pedingAcks.remove(ackId); 886 } 887 } 888 } 889 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { 890 stompTransport.sendToStomp(ping); 891 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 892 // Pass down any unexpected async errors. Should this close the connection? 893 Throwable exception = ((ConnectionError)command).getException(); 894 handleException(exception, null); 895 } 896 } 897 898 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 899 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 900 return msg; 901 } 902 903 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 904 if (ignoreTransformation == true) { 905 return frameTranslator.convertMessage(this, message); 906 } else { 907 FrameTranslator translator = findTranslator( 908 message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); 909 return translator.convertMessage(this, message); 910 } 911 } 912 913 public StompTransport getStompTransport() { 914 return stompTransport; 915 } 916 917 public ActiveMQDestination createTempDestination(String name, boolean topic) { 918 ActiveMQDestination rc = tempDestinations.get(name); 919 if( rc == null ) { 920 if (topic) { 921 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 922 } else { 923 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 924 } 925 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 926 tempDestinations.put(name, rc); 927 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 928 } 929 return rc; 930 } 931 932 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 933 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 934 } 935 936 public String getDefaultHeartBeat() { 937 return defaultHeartBeat; 938 } 939 940 public void setDefaultHeartBeat(String defaultHeartBeat) { 941 this.defaultHeartBeat = defaultHeartBeat; 942 } 943 944 /** 945 * @return the hbGracePeriodMultiplier 946 */ 947 public float getHbGracePeriodMultiplier() { 948 return hbGracePeriodMultiplier; 949 } 950 951 /** 952 * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set 953 */ 954 public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { 955 this.hbGracePeriodMultiplier = hbGracePeriodMultiplier; 956 } 957 958 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { 959 960 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); 961 962 if (keepAliveOpts == null || keepAliveOpts.length != 2) { 963 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 964 } else { 965 966 try { 967 hbReadInterval = (Long.parseLong(keepAliveOpts[0])); 968 hbWriteInterval = Long.parseLong(keepAliveOpts[1]); 969 } catch(NumberFormatException e) { 970 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 971 } 972 973 try { 974 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); 975 monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier)); 976 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); 977 monitor.setWriteCheckTime(hbWriteInterval); 978 monitor.startMonitoring(); 979 } catch(Exception ex) { 980 hbReadInterval = 0; 981 hbWriteInterval = 0; 982 } 983 984 if (LOG.isDebugEnabled()) { 985 LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval); 986 } 987 } 988 } 989 990 protected void sendReceipt(StompFrame command) { 991 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 992 if (receiptId != null) { 993 StompFrame sc = new StompFrame(); 994 sc.setAction(Stomp.Responses.RECEIPT); 995 sc.setHeaders(new HashMap<String, String>(1)); 996 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 997 try { 998 sendToStomp(sc); 999 } catch (IOException e) { 1000 LOG.warn("Could not send a receipt for {}", command, e); 1001 } 1002 } 1003 } 1004 1005 /** 1006 * Retrieve the STOMP action value from a frame if the value is valid, otherwise 1007 * return an unknown string to allow for safe log output. 1008 * 1009 * @param command 1010 * The STOMP command to fetch an action from. 1011 * 1012 * @return the command action or a safe string to use in logging. 1013 */ 1014 protected Object safeGetAction(StompFrame command) { 1015 String result = "<Unknown>"; 1016 if (command != null && command.getAction() != null) { 1017 String action = command.getAction().trim(); 1018 1019 if (action != null) { 1020 switch (action) { 1021 case Stomp.Commands.SEND: 1022 case Stomp.Commands.ACK: 1023 case Stomp.Commands.NACK: 1024 case Stomp.Commands.BEGIN: 1025 case Stomp.Commands.COMMIT: 1026 case Stomp.Commands.ABORT: 1027 case Stomp.Commands.SUBSCRIBE: 1028 case Stomp.Commands.UNSUBSCRIBE: 1029 case Stomp.Commands.CONNECT: 1030 case Stomp.Commands.STOMP: 1031 case Stomp.Commands.DISCONNECT: 1032 result = action; 1033 break; 1034 case Stomp.Commands.SUBSCRIBE_PREFIX: 1035 result = Stomp.Commands.SUBSCRIBE; 1036 case Stomp.Commands.UNSUBSCRIBE_PREFIX: 1037 result = Stomp.Commands.UNSUBSCRIBE; 1038 default: 1039 break; 1040 } 1041 } 1042 } 1043 1044 return result; 1045 } 1046 1047 /** 1048 * Remove all pending acknowledgement markers that are batched into the single 1049 * client acknowledge operation. 1050 * 1051 * @param subscription 1052 * The STOMP Subscription that has performed a client acknowledge. 1053 * @param msgIdsToRemove 1054 * List of message IDs that are bound to the subscription that has ack'd 1055 */ 1056 protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) { 1057 int count = 0; 1058 1059 for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){ 1060 AckEntry actEntry = entry.getValue(); 1061 if (msgIdsToRemove.contains(actEntry.messageId)) { 1062 this.pedingAcks.remove(entry.getKey()); 1063 count++; 1064 } 1065 } 1066 1067 LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count); 1068 } 1069}