1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activeio.packet;
19
20 import java.util.ArrayList;
21
22 import org.activeio.Packet;
23
24 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
25
26 /***
27 * Provides a simple pool of Packet objects. When the packets that this pool produces are disposed,
28 * they are returned to the pool.
29 *
30 * @version $Revision: 1.1 $
31 */
32 abstract public class PacketPool {
33
34 public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5)));
35 public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4)));
36
37 private final ArrayList pool = new ArrayList();
38 private final int maxPackets;
39 private int currentPoolSize;
40
41 public class PooledPacket extends FilterPacket {
42 private final SynchronizedInt referenceCounter;
43
44 public PooledPacket(Packet next) {
45 this(next, new SynchronizedInt(0));
46 }
47
48 private PooledPacket(Packet next, SynchronizedInt referenceCounter) {
49 super(next);
50 this.referenceCounter=referenceCounter;
51 this.referenceCounter.increment();
52 }
53
54 public Packet filter(Packet packet) {
55 return new PooledPacket(next, referenceCounter);
56 }
57
58 int getReferenceCounter() {
59 return referenceCounter.get();
60 }
61
62 public void dispose() {
63 if( referenceCounter.decrement()==0 ) {
64 returnPacket(next);
65 }
66 }
67 }
68
69 /***
70 * @param maxPackets the number of buffers that will be in the pool.
71 */
72 public PacketPool(int maxPackets) {
73 this.maxPackets = maxPackets;
74 }
75
76 /***
77 * Blocks until a ByteBuffer can be retreived from the pool.
78 *
79 * @return
80 * @throws InterruptedException
81 */
82 public Packet getPacket() throws InterruptedException {
83 Packet answer=null;
84 synchronized(this) {
85 while(answer==null) {
86 if( pool.size()>0) {
87 answer = (Packet) pool.remove(pool.size()-1);
88 } else if( currentPoolSize < maxPackets ) {
89 answer = allocateNewPacket();
90 currentPoolSize++;
91 }
92 if( answer==null ) {
93 this.wait();
94 }
95 }
96 }
97 return new PooledPacket(answer);
98 }
99
100 /***
101 * Returns a ByteBuffer to the pool.
102 *
103 * @param packet
104 */
105 private void returnPacket(Packet packet) {
106 packet.clear();
107 synchronized(this) {
108 pool.add(packet);
109 this.notify();
110 }
111 }
112
113 /***
114 * @return
115 */
116 abstract protected Packet allocateNewPacket();
117
118 }