View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message.util;
20  import java.io.File;
21  import java.io.IOException;
22  import java.util.List;
23  import javax.jms.JMSException;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.codehaus.activemq.message.DefaultWireFormat;
27  import org.codehaus.activemq.message.Packet;
28  import org.codehaus.activemq.message.WireFormat;
29  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
30  
31  /***
32   * Implements a controlled thread safe queue, with Packets being spooled to disk for reading asynchronously.
33   */
34  public  class SpooledBoundedPacketQueue implements BoundedPacketQueue {
35      private String name;
36      private DataContainer container;
37      private WireFormat wireFormat;
38      private long maxDataLength;
39      private boolean closed;
40      private boolean stopped;
41      private SynchronizedInt size = new SynchronizedInt(0);
42      private Object inLock = new Object();
43      private Object outLock = new Object();
44      private static int WAIT_TIMEOUT = 250;
45      private static final Log log = LogFactory.getLog(SpooledBoundedPacketQueue.class);
46  
47      /***
48       * Constructor for SpooledBoundedPacketQueue
49       * 
50       * @param dir
51       * @param name
52       * @param maxDataLength
53       * @param maxBlockSize
54       * @throws IOException
55       */
56      public SpooledBoundedPacketQueue(File dir, String name, long maxDataLength, int maxBlockSize) throws IOException {
57          //ensure name can be used as a file name
58          char[] chars = name.toCharArray();
59          for (int i = 0;i < chars.length;i++) {
60              if (!Character.isLetterOrDigit(chars[i])) {
61                  chars[i] = '_';
62              }
63          }
64          this.name = new String(chars);
65          this.maxDataLength = maxDataLength;
66          this.wireFormat = new DefaultWireFormat();
67          this.container = new DataContainer(dir, this.name, maxBlockSize);
68          //as the DataContainer is temporary, clean-up any old files
69          this.container.deleteAll();
70      }
71      
72      /***
73       * Constructor for SpooledBoundedPacketQueue
74       * @param dir
75       * @param name
76       * @throws IOException
77       */
78      public SpooledBoundedPacketQueue(File dir,String name) throws IOException{
79          this(dir,name,1024 * 1024 * 64,8192);
80      }
81  
82      /***
83       * Place a Packet at the head of the Queue
84       * 
85       * @param packet
86       * @throws JMSException
87       */
88      public void enqueue(Packet packet) throws JMSException {
89          if (!isFull()) {
90              enqueueNoBlock(packet);
91          }
92          else {
93              synchronized (inLock) {
94                  try {
95                      while (isFull()) {
96                          inLock.wait(WAIT_TIMEOUT);
97                      }
98                  }
99                  catch (InterruptedException ie) {
100                 }
101             }
102             enqueueNoBlock(packet);
103         }
104     }
105 
106     /***
107      * Enqueue a Packet without checking usage limits
108      * 
109      * @param packet
110      * @throws JMSException
111      */
112     public void enqueueNoBlock(Packet packet) throws JMSException {
113         byte[] data;
114         try {
115             data = wireFormat.toBytes(packet);
116             size.increment();
117             container.write(data);
118         }
119         catch (IOException e) {
120             JMSException jmsEx = new JMSException("toBytes failed");
121             jmsEx.setLinkedException(e);
122             throw jmsEx;
123         }
124         synchronized (outLock) {
125             outLock.notify();
126         }
127     }
128 
129     /***
130      * @return the first dequeued Packet or blocks until one is available
131      * @throws JMSException
132      * @throws InterruptedException
133      */
134     public Packet dequeue() throws JMSException, InterruptedException {
135         Packet result = null;
136         synchronized (outLock) {
137             while ((result = dequeueNoWait()) == null) {
138                 outLock.wait(WAIT_TIMEOUT);
139             }
140         }
141         return result;
142     }
143 
144     /***
145      * @return the Packet from the head of the Queue or null if the Queue is empty
146      * @param timeInMillis maximum time to wait to dequeue a Packet
147      * @throws JMSException
148      * @throws InterruptedException
149      */
150     public Packet dequeue(long timeInMillis) throws JMSException, InterruptedException {
151         Packet result = dequeueNoWait();
152         if (result == null) {
153             synchronized (outLock) {
154                 outLock.wait(timeInMillis);
155                 result = dequeueNoWait();
156             }
157         }
158         return result;
159     }
160 
161     /***
162      * @return the Packet from the head of the Queue or null if the Queue is empty
163      * @throws JMSException
164      * @throws InterruptedException
165      */
166     public Packet dequeueNoWait() throws JMSException, InterruptedException {
167         Packet result = null;
168         if (stopped) {
169             synchronized (outLock) {
170                 while (stopped && !closed) {
171                     outLock.wait(WAIT_TIMEOUT);
172                 }
173             }
174         }
175         byte[] data;
176         try {
177             data = container.read();
178             if (data != null) {
179                 result = wireFormat.fromBytes(data);
180                 size.decrement();
181             }
182         }
183         catch (IOException e) {
184             JMSException jmsEx = new JMSException("fromBytes failed");
185             jmsEx.setLinkedException(e);
186             throw jmsEx;
187         }
188         if (result != null && !isFull()) {
189             synchronized (inLock) {
190                 inLock.notify();
191             }
192         }
193         return result;
194     }
195 
196     /***
197      * @return true if this queue has reached it's data length limit
198      */
199     public boolean isFull() {
200         return container.length() >= maxDataLength;
201     }
202 
203     /***
204      * close this queue
205      */
206     public void close() {
207         try {
208             closed = true;
209             container.close();
210         }
211         catch (IOException ioe) {
212             log.warn("Couldn't close queue", ioe);
213         }
214     }
215 
216     /***
217      * @return the name of this BoundedPacketQueue
218      */
219     public String getName() {
220         return name;
221     }
222 
223     /***
224      * @return number of Packets held by this queue
225      */
226     public int size() {
227         return size.get();
228     }
229 
230     /***
231      * @return true if the queue is enabled for dequeing (default = true)
232      */
233     public boolean isStarted() {
234         return stopped == false;
235     }
236 
237     /***
238      * disable dequeueing
239      */
240     public void stop() {
241         synchronized (outLock) {
242             stopped = true;
243         }
244     }
245 
246     /***
247      * enable dequeueing
248      */
249     public void start() {
250         stopped = false;
251         synchronized (outLock) {
252             outLock.notifyAll();
253         }
254         synchronized (inLock) {
255             inLock.notifyAll();
256         }
257     }
258 
259     /***
260      * @return true if this queue is empty
261      */
262     public boolean isEmpty() {
263         return size.get() == 0;
264     }
265     
266     /***
267      * clear the queue
268      */
269 
270     public void clear() {
271         
272     }
273 
274    /***
275     * @return a copy of the contents
276     */
277     public List getContents() {
278         return null;
279     }
280 }