1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.codehaus.activemq.broker.BrokerClient;
21 import org.codehaus.activemq.filter.DestinationFilter;
22 import org.codehaus.activemq.message.ActiveMQDestination;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.message.ConsumerInfo;
25 import org.codehaus.activemq.service.MessageContainerManager;
26
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import java.util.Iterator;
30 import java.util.Map;
31
32 /***
33 * Implements an initial image service where on subscription
34 * the client will recieve the last image that was previously cached.
35 * This is very useful in financial market data and in rapidly changing
36 * transient event models where you don't want to persist messages
37 * when you are away, but wish to cache the last image, per destination
38 * around so that when a new reliable consumer subscribes you receive the
39 * latest value you may have missed.
40 * <p/>
41 * This is especially true in finance with slow moving markets where you may
42 * have to wait a while for an update (or times when you subscribe after
43 * market close etc).
44 *
45 * @version $Revision: 1.1 $
46 */
47 public class InitialImageMessageContainerManager extends ProxyMessageContainerManager {
48 private Map cache;
49 private boolean topic;
50 private DestinationFilter destinationFilter;
51
52 /***
53 * Creates a topic based initial image message container manager using the given destination filter
54 *
55 * @param delegate
56 * @param cache
57 * @param destinationFilter
58 */
59 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) {
60 this(delegate, cache, true, destinationFilter);
61 }
62
63 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
64 super(delegate);
65 this.cache = cache;
66 this.topic = topic;
67 this.destinationFilter = destinationFilter;
68 }
69
70 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
71 super.addMessageConsumer(client, info);
72
73
74 ActiveMQDestination destination = info.getDestination();
75 if (isValid(destination)) {
76 if (destination.isWildcard()) {
77 DestinationFilter filter = DestinationFilter.parseFilter(destination);
78 sendMatchingInitialImages(client, info, filter);
79 }
80 else {
81 ActiveMQMessage message = null;
82 synchronized (cache) {
83 message = (ActiveMQMessage) cache.get(destination);
84 }
85 if (message != null) {
86 sendMessage(client, message);
87 }
88 }
89 }
90 }
91
92 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
93 ActiveMQDestination destination = message.getJMSActiveMQDestination();
94 if (isValid(destination)) {
95 cache.put(destination, message);
96 }
97 super.sendMessage(client, message);
98 }
99
100
101
102 protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException {
103 synchronized (cache) {
104 for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
105 Map.Entry entry = (Map.Entry) iter.next();
106 Destination destination = (Destination) entry.getKey();
107 if (filter.matches(destination)) {
108 ActiveMQMessage message = (ActiveMQMessage) entry.getValue();
109 sendMessage(client, message);
110 }
111 }
112 }
113 }
114
115
116 /***
117 * Does this message match the destinations on which initial image caching should be used
118 *
119 * @param destination
120 * @return true if the given destination should use initial image caching
121 * which is typically true if the message is a topic which may match
122 * an optional DestinationFilter
123 */
124 protected boolean isValid(ActiveMQDestination destination) {
125 return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination));
126 }
127 }