1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.codehaus.activemq.broker.BrokerClient;
22 import org.codehaus.activemq.message.ActiveMQDestination;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.service.Dispatcher;
25 import org.codehaus.activemq.service.MessageContainer;
26 import org.codehaus.activemq.service.MessageContainerManager;
27 import org.codehaus.activemq.service.Subscription;
28
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import java.util.Collections;
32 import java.util.Iterator;
33 import java.util.Map;
34
35 /***
36 * @version $Revision: 1.5 $
37 */
38 public abstract class MessageContainerManagerSupport implements MessageContainerManager {
39 protected Dispatcher dispatcher;
40 protected Map messageContainers = new ConcurrentHashMap();
41 private Map destinations = new ConcurrentHashMap();
42 private boolean maintainDestinationStats = true;
43
44 public MessageContainerManagerSupport(Dispatcher dispatcher) {
45 this.dispatcher = dispatcher;
46 dispatcher.register(this);
47 }
48
49 public Map getDestinations() {
50 return Collections.unmodifiableMap(destinations);
51 }
52
53 public void start() throws JMSException {
54 dispatcher.start();
55 }
56
57 public void stop() throws JMSException {
58 dispatcher.stop();
59 JMSException firstException = null;
60 try {
61 dispatcher.stop();
62 }
63 catch (JMSException e) {
64 firstException = e;
65 }
66
67
68 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
69 MessageContainer container = (MessageContainer) iter.next();
70 try {
71 container.stop();
72 }
73 catch (JMSException e) {
74 if (firstException == null) {
75 firstException = e;
76 }
77 }
78 }
79 if (firstException != null) {
80 throw firstException;
81 }
82
83 }
84
85 public synchronized MessageContainer getContainer(String destinationName) throws JMSException {
86 MessageContainer container = (MessageContainer) messageContainers.get(destinationName);
87 if (container == null) {
88 container = createContainer(destinationName);
89 container.start();
90 messageContainers.put(destinationName, container);
91
92 destinations.put(destinationName, createDestination(destinationName));
93 }
94 return container;
95 }
96
97
98
99
100 public boolean isMaintainDestinationStats() {
101 return maintainDestinationStats;
102 }
103
104 public void setMaintainDestinationStats(boolean maintainDestinationStats) {
105 this.maintainDestinationStats = maintainDestinationStats;
106 }
107
108
109
110
111 /***
112 * Factory method to create a new {@link Destination}
113 */
114 protected abstract Destination createDestination(String destinationName);
115
116 /***
117 * Factory method to create a new {@link MessageContainer}
118 */
119 protected abstract MessageContainer createContainer(String destinationName) throws JMSException;
120
121 /***
122 * Loads the container for the given name and destination on startup
123 */
124 protected void loadContainer(String destinationName, Destination destination) throws JMSException {
125 destinations.put(destinationName, destination);
126
127 MessageContainer container = createContainer(destinationName);
128 container.start();
129 messageContainers.put(destinationName, container);
130 }
131
132 /***
133 * Updates the message acknowledgement stats
134 *
135 * @param client
136 * @param subscription
137 */
138 protected void updateAcknowledgeStats(BrokerClient client, Subscription subscription) {
139 if (isMaintainDestinationStats()) {
140
141 String name = subscription.getDestination().getPhysicalName();
142 ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
143 destination.getStats().onMessageAck();
144 }
145 }
146
147 /***
148 * Updates the message sending stats
149 *
150 * @param client
151 * @param message
152 * @throws JMSException
153 */
154 protected void updateSendStats(BrokerClient client, ActiveMQMessage message) throws JMSException {
155 if (isMaintainDestinationStats()) {
156
157 String name = message.getJMSActiveMQDestination().getPhysicalName();
158 ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
159 if (destination != null){
160 destination.getStats().onMessageSend(message);
161 }
162 }
163 }
164 }