1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.service.Dispatcher;
24 import org.codehaus.activemq.service.MessageContainerManager;
25 import org.codehaus.activemq.service.Subscription;
26
27 /***
28 * A dispatcher of messages to some JMS connection.
29 * <p/>
30 * Typically this uses either IO or NIO to shovel the messages down
31 * a socket as fast as possible - in either a push or pull way.
32 *
33 * @version $Revision: 1.2 $
34 */
35 public class DispatcherImpl implements Dispatcher {
36
37 private SynchronizedBoolean started = new SynchronizedBoolean(false);
38 private DispatchWorker worker = new DispatchWorker();
39 private MessageContainerManager containerManager;
40 private Thread runner;
41
42
43 /***
44 * Register the MessageContainerManager for the Dispatcher
45 *
46 * @param mcm
47 */
48 public void register(MessageContainerManager mcm) {
49 this.containerManager = mcm;
50 worker.register(mcm);
51 }
52
53 /***
54 * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is
55 * waiting for messages to dispatch
56 *
57 * @param sub the Subscription that now has messages to dispatch
58 */
59 public void wakeup(Subscription sub) {
60 worker.wakeup();
61 }
62
63 /***
64 * Called to indicate that there is work to do this will wake up a Dispatch Worker if it is
65 * waiting for messages to dispatch
66 */
67 public void wakeup() {
68 worker.wakeup();
69 }
70
71 /***
72 * Add an active subscription
73 *
74 * @param client
75 * @param sub
76 */
77 public void addActiveSubscription(BrokerClient client, Subscription sub) {
78 worker.addActiveSubscription(client, sub);
79 }
80
81 /***
82 * remove an active subscription
83 *
84 * @param client
85 * @param sub
86 */
87 public void removeActiveSubscription(BrokerClient client, Subscription sub) {
88 worker.removeActiveSubscription(client, sub);
89 }
90
91 /***
92 * start the DispatchWorker
93 *
94 * @see org.codehaus.activemq.service.Service#start()
95 */
96 public void start() {
97 if (started.commit(false, true)) {
98 worker.start();
99 runner = new Thread(worker, "Dispatch Worker");
100 runner.setDaemon(true);
101 runner.setPriority(Thread.NORM_PRIORITY + 1);
102 runner.start();
103 }
104 }
105
106 /***
107 * stop the DispatchWorker
108 *
109 * @see org.codehaus.activemq.service.Service#stop()
110 */
111 public void stop() {
112 worker.stop();
113 started.set(false);
114 }
115 }