View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq;
20  
21  import javax.jms.ConnectionConsumer;
22  import javax.jms.IllegalStateException;
23  import javax.jms.JMSException;
24  import javax.jms.ServerSession;
25  import javax.jms.ServerSessionPool;
26  
27  import org.codehaus.activemq.message.ActiveMQMessage;
28  import org.codehaus.activemq.message.ConsumerInfo;
29  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
30  
31  /***
32   * For application servers, <CODE>Connection</CODE> objects provide a special
33   * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
34   * messages it is to consume are specified by a <CODE>Destination</CODE> and
35   * a message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
36   * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
37   * <p/>
38   * <P>
39   * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
40   * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
41   * and starts it. As traffic picks up, messages can back up. If this happens, a
42   * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
43   * with more than one message. This reduces the thread context switches and
44   * minimizes resource use at the expense of some serialization of message
45   * processing.
46   *
47   * @see javax.jms.Connection#createConnectionConsumer
48   * @see javax.jms.Connection#createDurableConnectionConsumer
49   * @see javax.jms.QueueConnection#createConnectionConsumer
50   * @see javax.jms.TopicConnection#createConnectionConsumer
51   * @see javax.jms.TopicConnection#createDurableConnectionConsumer
52   */
53  
54  public class ActiveMQConnectionConsumer implements ConnectionConsumer,
55          ActiveMQMessageDispatcher {
56  
57      private ActiveMQConnection connection;
58  
59      private ServerSessionPool sessionPool;
60  
61      private ConsumerInfo consumerInfo;
62  
63      private boolean closed;
64  
65      private int maximumMessages;
66      
67      protected MemoryBoundedQueue messageQueue;
68      
69  
70      /***
71       * Create a ConnectionConsumer
72       *
73       * @param theConnection
74       * @param theSessionPool
75       * @param theConsumerInfo
76       * @param theMaximumMessages
77       * @throws JMSException
78       */
79      protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection,
80                                           ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo,
81                                           int theMaximumMessages) throws JMSException {
82          this.connection = theConnection;
83          this.sessionPool = theSessionPool;
84          this.consumerInfo = theConsumerInfo;
85          this.maximumMessages = theMaximumMessages;
86          this.connection.addConnectionConsumer(this);
87          this.consumerInfo.setStarted(true);
88          this.connection.syncSendPacket(this.consumerInfo);
89  
90          String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName()+":"+
91  			theConsumerInfo.getConsumerNo();
92          this.messageQueue = connection.getMemoryBoundedQueue(queueName);
93      }
94  
95      /***
96       * Tests to see if the Message Dispatcher is a target for this message
97       *
98       * @param message the message to test
99       * @return true if the Message Dispatcher can dispatch the message
100      */
101     public boolean isTarget(ActiveMQMessage message) {
102         return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
103     }
104 
105     /***
106      * Dispatch an ActiveMQMessage
107      *
108      * @param message
109      */
110     public void dispatch(ActiveMQMessage message) {
111         if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
112             message.setConsumerId(this.consumerInfo.getConsumerId());
113             try {
114             	if( sessionPool != null )
115             		dispatchToSession(message);
116             	else
117             		dispatchToQueue(message);
118             }
119             catch (JMSException jmsEx) {
120                 this.connection.handleAsyncException(jmsEx);
121             }
122         }
123     }
124         
125     /***
126 	 * @param message
127 	 * @throws JMSException
128 	 */
129 	private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
130 		messageQueue.enqueue(message);
131     }
132 
133     /***
134      * Receives the next message that arrives within the specified timeout interval.
135      * @throws JMSException
136      */
137     public ActiveMQMessage receive(long timeout) throws JMSException {
138         try {
139             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
140             return message;
141         }
142         catch (InterruptedException ioe) {
143             return null;
144         }
145     }
146 	
147 	
148     /***
149 	 * @param message
150 	 * @throws JMSException
151 	 */
152 	private void dispatchToSession(ActiveMQMessage message) throws JMSException {
153 		ServerSession serverSession = sessionPool.getServerSession();
154 		ActiveMQSession session = (ActiveMQSession) serverSession
155 		        .getSession();
156 		session.dispatch(message);
157 		serverSession.start();
158 	}
159 
160 	/***
161      * Gets the server session pool associated with this connection consumer.
162      *
163      * @return the server session pool used by this connection consumer
164      * @throws JMSException if the JMS provider fails to get the server session pool
165      *                      associated with this consumer due to some internal error.
166      */
167 
168     public ServerSessionPool getServerSessionPool() throws JMSException {
169         if (closed) {
170             throw new IllegalStateException("The Connection Consumer is closed");
171         }
172         return this.sessionPool;
173     }
174 
175     /***
176      * Closes the connection consumer.
177      * <p/>
178      * <P>
179      * Since a provider may allocate some resources on behalf of a connection
180      * consumer outside the Java virtual machine, clients should close these
181      * resources when they are not needed. Relying on garbage collection to
182      * eventually reclaim these resources may not be timely enough.
183      *
184      * @throws JMSException
185      */
186 
187     public void close() throws JMSException {
188         if (!closed) {
189             closed = true;
190             this.consumerInfo.setStarted(false);
191             this.connection.asyncSendPacket(this.consumerInfo);
192             this.connection.removeConnectionConsumer(this);
193         }
194 
195     }
196 }