View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.codehaus.activemq.broker.BrokerClient;
21  import org.codehaus.activemq.filter.DestinationFilter;
22  import org.codehaus.activemq.message.ActiveMQDestination;
23  import org.codehaus.activemq.message.ActiveMQMessage;
24  import org.codehaus.activemq.message.ConsumerInfo;
25  import org.codehaus.activemq.service.MessageContainerManager;
26  
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import java.util.Iterator;
30  import java.util.Map;
31  
32  /***
33   * Implements an initial image service where on subscription
34   * the client will recieve the last image that was previously cached.
35   * This is very useful in financial market data and in rapidly changing
36   * transient event models where you don't want to persist messages
37   * when you are away, but wish to cache the last image, per destination
38   * around so that when a new reliable consumer subscribes you receive the
39   * latest value you may have missed.
40   * <p/>
41   * This is especially true in finance with slow moving markets where you may
42   * have to wait a while for an update (or times when you subscribe after
43   * market close etc).
44   *
45   * @version $Revision: 1.1 $
46   */
47  public class InitialImageMessageContainerManager extends ProxyMessageContainerManager {
48      private Map cache;
49      private boolean topic;
50      private DestinationFilter destinationFilter;
51  
52      /***
53       * Creates a topic based initial image message container manager using the given destination filter
54       *
55       * @param delegate
56       * @param cache
57       * @param destinationFilter
58       */
59      public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) {
60          this(delegate, cache, true, destinationFilter);
61      }
62  
63      public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
64          super(delegate);
65          this.cache = cache;
66          this.topic = topic;
67          this.destinationFilter = destinationFilter;
68      }
69  
70      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
71          super.addMessageConsumer(client, info);
72  
73          // lookup message for destination
74          ActiveMQDestination destination = info.getDestination();
75          if (isValid(destination)) {
76              if (destination.isWildcard()) {
77                  DestinationFilter filter = DestinationFilter.parseFilter(destination);
78                  sendMatchingInitialImages(client, info, filter);
79              }
80              else {
81                  ActiveMQMessage message = null;
82                  synchronized (cache) {
83                      message = (ActiveMQMessage) cache.get(destination);
84                  }
85                  if (message != null) {
86                      sendMessage(client, message);
87                  }
88              }
89          }
90      }
91  
92      public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
93          ActiveMQDestination destination = message.getJMSActiveMQDestination();
94          if (isValid(destination)) {
95              cache.put(destination, message);
96          }
97          super.sendMessage(client, message);
98      }
99  
100     // Implementation methods
101     //-------------------------------------------------------------------------
102     protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException {
103         synchronized (cache) {
104             for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
105                 Map.Entry entry = (Map.Entry) iter.next();
106                 Destination destination = (Destination) entry.getKey();
107                 if (filter.matches(destination)) {
108                     ActiveMQMessage message = (ActiveMQMessage) entry.getValue();
109                     sendMessage(client, message);
110                 }
111             }
112         }
113     }
114 
115 
116     /***
117      * Does this message match the destinations on which initial image caching should be used
118      *
119      * @param destination
120      * @return true if the given destination should use initial image caching
121      *         which is typically true if the message is a topic which may match
122      *         an optional DestinationFilter
123      */
124     protected boolean isValid(ActiveMQDestination destination) {
125         return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination));
126     }
127 }