1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.service.impl; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 import org.codehaus.activemq.broker.BrokerClient; 23 import org.codehaus.activemq.filter.FilterFactory; 24 import org.codehaus.activemq.filter.FilterFactoryImpl; 25 import org.codehaus.activemq.message.ActiveMQDestination; 26 import org.codehaus.activemq.message.ActiveMQMessage; 27 import org.codehaus.activemq.message.ConsumerInfo; 28 import org.codehaus.activemq.service.Dispatcher; 29 import org.codehaus.activemq.service.MessageContainer; 30 import org.codehaus.activemq.service.Subscription; 31 import org.codehaus.activemq.service.SubscriptionContainer; 32 import org.codehaus.activemq.service.RedeliveryPolicy; 33 import org.codehaus.activemq.store.PersistenceAdapter; 34 35 import javax.jms.DeliveryMode; 36 import javax.jms.JMSException; 37 import java.util.Iterator; 38 import java.util.Set; 39 40 /*** 41 * A default implementation of a Broker of Topic messages for transient consumers 42 * 43 * @version $Revision: 1.6 $ 44 */ 45 public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager { 46 private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class); 47 48 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) { 49 this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl()); 50 } 51 52 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) { 53 super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher); 54 } 55 56 /*** 57 * @param client 58 * @param info 59 * @throws javax.jms.JMSException 60 */ 61 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 62 if (info.getDestination().isTopic()) { 63 doAddMessageConsumer(client, info); 64 } 65 } 66 67 68 /*** 69 * @param client 70 * @param info 71 * @throws javax.jms.JMSException 72 */ 73 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 74 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId()); 75 if (sub != null) { 76 sub.setActive(false); 77 dispatcher.removeActiveSubscription(client, sub); 78 subscriptionContainer.removeSubscription(info.getConsumerId()); 79 sub.clear(); 80 } 81 } 82 83 84 /*** 85 * @param client 86 * @param message 87 * @throws javax.jms.JMSException 88 */ 89 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 90 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 91 if (destination != null && destination.isTopic()) { 92 MessageContainer container = null; 93 if (log.isDebugEnabled()) { 94 log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message); 95 } 96 Set subscriptions = subscriptionContainer.getSubscriptions(destination); 97 for (Iterator i = subscriptions.iterator(); i.hasNext();) { 98 Subscription sub = (Subscription) i.next(); 99 if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) { 100 if (container == null) { 101 container = getContainer(message.getJMSDestination().toString()); 102 container.addMessage(message); 103 } 104 sub.addMessage(container, message); 105 } 106 } 107 updateSendStats(client, message); 108 } 109 } 110 111 /*** 112 * Delete a durable subscriber 113 * 114 * @param clientId 115 * @param subscriberName 116 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active 117 */ 118 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 119 } 120 }