1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.management.JMSConsumerStatsImpl;
23 import org.codehaus.activemq.management.StatsCapable;
24 import org.codehaus.activemq.message.ActiveMQDestination;
25 import org.codehaus.activemq.message.ActiveMQMessage;
26 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
27 import org.codehaus.activemq.selector.SelectorParser;
28
29 import javax.jms.IllegalStateException;
30 import javax.jms.InvalidDestinationException;
31 import javax.jms.JMSException;
32 import javax.jms.Message;
33 import javax.jms.MessageConsumer;
34 import javax.jms.MessageListener;
35 import javax.management.j2ee.statistics.Stats;
36
37 /***
38 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
39 * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
40 * creation method supplied by a session.
41 * <P>
42 * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
43 * <P>
44 * A message consumer can be created with a message selector. A message selector allows the client to restrict the
45 * messages delivered to the message consumer to those that match the selector.
46 * <P>
47 * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
48 * them as they arrive.
49 * <P>
50 * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
51 * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
52 * for the next message.
53 * <P>
54 * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
55 * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
56 * onMessage</CODE> method.
57 * <P>
58 * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
59 *
60 * @version $Revision: 1.35 $
61 * @see javax.jms.MessageConsumer
62 * @see javax.jms.QueueReceiver
63 * @see javax.jms.TopicSubscriber
64 * @see javax.jms.Session
65 */
66 public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable {
67 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
68 protected ActiveMQSession session;
69 protected String consumerId;
70 protected MemoryBoundedQueue messageQueue;
71 protected String messageSelector;
72 private MessageListener messageListener;
73 protected String consumerName;
74 protected ActiveMQDestination destination;
75 private boolean closed;
76 protected int consumerNumber;
77 protected int prefetchNumber;
78 protected long startTime;
79 protected boolean noLocal;
80 protected boolean browser;
81 private Thread accessThread;
82 private Object messageListenerGuard;
83 private JMSConsumerStatsImpl stats;
84
85 /***
86 * Create a MessageConsumer
87 *
88 * @param theSession
89 * @param dest
90 * @param name
91 * @param selector
92 * @param cnum
93 * @param prefetch
94 * @param noLocalValue
95 * @param browserValue
96 * @throws JMSException
97 */
98 protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
99 String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
100 if (dest == null) {
101 throw new InvalidDestinationException("Do not understand a null destination");
102 }
103 if (dest.isTemporary()) {
104
105 String physicalName = dest.getPhysicalName();
106 if (physicalName == null) {
107 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
108 }
109 String clientID = theSession.connection.getInitializedClientID();
110 if (physicalName.indexOf(clientID) < 0) {
111 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
112 }
113 }
114 if (selector != null) {
115 selector = selector.trim();
116 if (selector.length() > 0) {
117
118 new SelectorParser().parse(selector);
119 }
120 }
121 this.session = theSession;
122 this.destination = dest;
123 this.consumerName = name;
124 this.messageSelector = selector;
125
126 this.consumerNumber = cnum;
127 this.prefetchNumber = prefetch;
128 this.noLocal = noLocalValue;
129 this.browser = browserValue;
130 this.startTime = System.currentTimeMillis();
131 this.messageListenerGuard = new Object();
132 String queueName = theSession.connection.clientID + ":" + name;
133 queueName += ":" + cnum;
134 this.messageQueue = theSession.connection.getMemoryBoundedQueue(queueName);
135 this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
136 this.session.addConsumer(this);
137 }
138
139 /***
140 * @return Stats for this MessageConsumer
141 */
142 public Stats getStats() {
143 return stats;
144 }
145
146 /***
147 * @return Stats for this MessageConsumer
148 */
149 public JMSConsumerStatsImpl getConsumerStats() {
150 return stats;
151 }
152
153 /***
154 * @return pretty print of this consumer
155 */
156 public String toString() {
157 return "MessageConsumer: " + consumerId;
158 }
159
160 /***
161 * @return Returns the prefetchNumber.
162 */
163 public int getPrefetchNumber() {
164 return prefetchNumber;
165 }
166
167 /***
168 * @param prefetchNumber The prefetchNumber to set.
169 */
170 public void setPrefetchNumber(int prefetchNumber) {
171 this.prefetchNumber = prefetchNumber;
172 }
173
174 /***
175 * Gets this message consumer's message selector expression.
176 *
177 * @return this message consumer's message selector, or null if no message selector exists for the message consumer
178 * (that is, if the message selector was not set or was set to null or the empty string)
179 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
180 */
181 public String getMessageSelector() throws JMSException {
182 checkClosed();
183 return this.messageSelector;
184 }
185
186 /***
187 * Gets the message consumer's <CODE>MessageListener</CODE>.
188 *
189 * @return the listener for the message consumer, or null if no listener is set
190 * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
191 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
192 */
193 public MessageListener getMessageListener() throws JMSException {
194 checkClosed();
195 return this.messageListener;
196 }
197
198 /***
199 * Sets the message consumer's <CODE>MessageListener</CODE>.
200 * <P>
201 * Setting the message listener to null is the equivalent of unsetting the message listener for the message
202 * consumer.
203 * <P>
204 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
205 * existing listener or the consumer is being used to consume messages synchronously is undefined.
206 *
207 * @param listener the listener to which the messages are to be delivered
208 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
209 * @see javax.jms.MessageConsumer#getMessageListener()
210 */
211 public void setMessageListener(MessageListener listener) throws JMSException {
212 checkClosed();
213 this.messageListener = listener;
214 if (listener != null) {
215 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
216 }
217 }
218
219 /***
220 * Receives the next message produced for this message consumer.
221 * <P>
222 * This call blocks indefinitely until a message is produced or until this message consumer is closed.
223 * <P>
224 * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
225 * transaction commits.
226 *
227 * @return the next message produced for this message consumer, or null if this message consumer is concurrently
228 * closed
229 * @throws JMSException
230 */
231 public Message receive() throws JMSException {
232 checkClosed();
233 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
234 try {
235 this.accessThread = Thread.currentThread();
236 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
237 this.accessThread = null;
238 if (message != null) {
239 messageDelivered(message, true);
240 message = message.shallowCopy();
241 }
242 return message;
243 }
244 catch (InterruptedException ioe) {
245 return null;
246 }
247 }
248
249 /***
250 * Receives the next message that arrives within the specified timeout interval.
251 * <P>
252 * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
253 * timeout</CODE> of zero never expires, and the call blocks indefinitely.
254 *
255 * @param timeout the timeout value (in milliseconds)
256 * @return the next message produced for this message consumer, or null if the timeout expires or this message
257 * consumer is concurrently closed
258 * @throws JMSException
259 */
260 public Message receive(long timeout) throws JMSException {
261 checkClosed();
262 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
263 try {
264 if (timeout == 0) {
265 return this.receive();
266 }
267 this.accessThread = Thread.currentThread();
268 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
269 this.accessThread = null;
270 if (message != null) {
271 messageDelivered(message, true);
272 message = message.shallowCopy();
273 }
274 return message;
275 }
276 catch (InterruptedException ioe) {
277 return null;
278 }
279 }
280
281 /***
282 * Receives the next message if one is immediately available.
283 *
284 * @return the next message produced for this message consumer, or null if one is not available
285 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
286 */
287 public Message receiveNoWait() throws JMSException {
288 checkClosed();
289 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
290 try {
291 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeueNoWait();
292 if (message != null) {
293 messageDelivered(message, true);
294 return message.shallowCopy();
295 }
296 }
297 catch (InterruptedException ioe) {
298 throw new JMSException("Queue is interrupted: " + ioe.getMessage());
299 }
300 return null;
301 }
302
303 /***
304 * Closes the message consumer.
305 * <P>
306 * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
307 * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
308 * reclaim these resources may not be timely enough.
309 * <P>
310 * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
311 * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
312 *
313 * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
314 */
315 public void close() throws JMSException {
316 try {
317 this.accessThread.interrupt();
318 }
319 catch (NullPointerException npe) {
320 }
321 catch (SecurityException se) {
322 }
323 this.session.removeConsumer(this);
324 messageQueue.close();
325 closed = true;
326 }
327
328 /***
329 * @return true if this is a durable topic subscriber
330 */
331 public boolean isDurableSubscriber() {
332 return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
333 }
334
335 /***
336 * @throws IllegalStateException
337 */
338 protected void checkClosed() throws IllegalStateException {
339 if (closed) {
340 throw new IllegalStateException("The Consumer is closed");
341 }
342 }
343
344 /***
345 * Process a Message - passing either to the queue or message listener
346 *
347 * @param message
348 */
349 protected void processMessage(ActiveMQMessage message) {
350 message.setConsumerId(this.consumerId);
351 MessageListener listener = null;
352 synchronized (messageListenerGuard) {
353 listener = this.messageListener;
354 }
355 try {
356 if (!closed) {
357 if (listener != null) {
358 listener.onMessage(message.shallowCopy());
359 messageDelivered(message, true);
360 }
361 else {
362 this.messageQueue.enqueue(message);
363 }
364 }
365 else {
366 messageDelivered(message, false);
367 }
368 }
369 catch (Exception e) {
370 log.warn("could not process message: " + message, e);
371
372 messageDelivered(message, false);
373
374
375 }
376 }
377
378 /***
379 * @return Returns the consumerId.
380 */
381 protected String getConsumerId() {
382 return consumerId;
383 }
384
385 /***
386 * @param consumerId The consumerId to set.
387 */
388 protected void setConsumerId(String consumerId) {
389 this.consumerId = consumerId;
390 }
391
392 /***
393 * @return the consumer name - used for durable consumers
394 */
395 protected String getConsumerName() {
396 return this.consumerName;
397 }
398
399 /***
400 * Set the name of the Consumer - used for durable subscribers
401 *
402 * @param value
403 */
404 protected void setConsumerName(String value) {
405 this.consumerName = value;
406 }
407
408 /***
409 * @return the locally unique Consumer Number
410 */
411 protected int getConsumerNumber() {
412 return this.consumerNumber;
413 }
414
415 /***
416 * Set the locally unique consumer number
417 *
418 * @param value
419 */
420 protected void setConsumerNumber(int value) {
421 this.consumerNumber = value;
422 }
423
424 /***
425 * @return true if this consumer does not accept locally produced messages
426 */
427 protected boolean isNoLocal() {
428 return this.noLocal;
429 }
430
431 /***
432 * Retrive is a browser
433 *
434 * @return true if a browser
435 */
436 protected boolean isBrowser() {
437 return this.browser;
438 }
439
440 /***
441 * Set true if only a Browser
442 *
443 * @param value
444 * @see ActiveMQQueueBrowser
445 */
446 protected void setBrowser(boolean value) {
447 this.browser = value;
448 }
449
450 /***
451 * @return ActiveMQDestination
452 */
453 protected ActiveMQDestination getDestination() {
454 return this.destination;
455 }
456
457 /***
458 * @return the startTime
459 */
460 protected long getStartTime() {
461 return startTime;
462 }
463
464 protected void clearMessagesInProgress() {
465 messageQueue.clear();
466 }
467
468 private void messageDelivered(ActiveMQMessage message, boolean messageRead) {
469 boolean read = browser ? false : messageRead;
470 if (message != null) {
471 message.setTransientConsumed(!isDurableSubscriber() && message.getJMSActiveMQDestination().isTopic());
472 this.session.messageDelivered((isDurableSubscriber() || destination.isQueue()), message, read);
473 if (messageRead) {
474 stats.onMessage(message);
475 }
476 }
477 }
478 }