1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message;
20
21 import java.util.Iterator;
22 import org.codehaus.activemq.util.BitArray;
23 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
24
25 /***
26 * Abstract class for a transportable Packet
27 *
28 * @version $Revision: 1.19 $
29 */
30 public abstract class AbstractPacket implements Packet {
31
32 /***
33 * Message flag indexes (used for writing/reading to/from a Stream
34 */
35 static final int RECEIPT_REQUIRED_INDEX = 0;
36 static final int BROKERS_VISITED_INDEX =1;
37 private String id;
38 protected BitArray bitArray;
39 private boolean receiptRequired;
40 private transient int memoryUsage = 2048;
41 private transient int memoryUsageReferenceCount;
42
43 private CopyOnWriteArraySet brokersVisited;
44
45 protected AbstractPacket(){
46 this.bitArray = new BitArray();
47 }
48
49 /***
50 * @return the unique id for this Packet
51 */
52 public String getId() {
53 return this.id;
54 }
55
56 /***
57 * Set the unique id for this Packet
58 *
59 * @param newId
60 */
61 public void setId(String newId) {
62 this.id = newId;
63 }
64
65 /***
66 * @return true if a Recipt is required
67 */
68 public boolean isReceiptRequired() {
69 return this.receiptRequired;
70 }
71
72 /***
73 * @return false since most packets are not receipt packets
74 */
75 public boolean isReceipt() {
76 return false;
77 }
78
79 /***
80 * Set if a Recipt if required on receiving this Packet
81 *
82 * @param value
83 */
84 public void setReceiptRequired(boolean value) {
85 this.receiptRequired = value;
86 }
87
88 /***
89 * Retrieve if a JMS Message type or not
90 *
91 * @return true if it is a JMS Message
92 */
93 public boolean isJMSMessage() {
94 return false;
95 }
96
97 /***
98 * Tests equality with another instance
99 *
100 * @param obj - the other instance to test equality with
101 * @return Returns true if the objects are equilvant
102 */
103 public boolean equals(Object obj) {
104 boolean result = this == obj;
105 if (!result && obj != null && obj instanceof AbstractPacket) {
106 AbstractPacket other = (AbstractPacket) obj;
107 result = other.getId().equals(this.getId());
108 }
109 return result;
110 }
111
112 /***
113 * @return Returns hash code for this instance
114 */
115 public int hashCode() {
116 return this.id != null ? this.id.hashCode() : super.hashCode();
117 }
118
119 /***
120 * Get a hint about how much memory this Packet is consuming
121 *
122 * @return an aproximation of the current memory used by this instance
123 */
124 public int getMemoryUsage() {
125 return memoryUsage;
126 }
127
128 /***
129 * Set a hint about how mujch memory this packet is consuming
130 *
131 * @param newMemoryUsage
132 */
133 public void setMemoryUsage(int newMemoryUsage) {
134 this.memoryUsage = newMemoryUsage;
135 }
136
137 /***
138 * Increment reference count for bounded memory collections
139 *
140 * @return the incremented reference value
141 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue
142 */
143 public synchronized int incrementMemoryReferenceCount() {
144 return ++memoryUsageReferenceCount;
145 }
146
147 /***
148 * Decrement reference count for bounded memory collections
149 *
150 * @return the decremented reference value
151 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue
152 */
153 public synchronized int decrementMemoryReferenceCount() {
154 return --memoryUsageReferenceCount;
155 }
156
157 /***
158 * @return the current reference count for bounded memory collections
159 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue
160 */
161 public synchronized int getMemoryUsageReferenceCount() {
162 return memoryUsageReferenceCount;
163 }
164
165 /***
166 * As the packet passes through the broker add the broker to the visited list
167 *
168 * @param brokerName the name of the broker
169 */
170 public void addBrokerVisited(String brokerName) {
171 initializeBrokersVisited();
172 brokersVisited.add(brokerName);
173 }
174
175 /***
176 * test to see if the named broker has already seen this packet
177 *
178 * @param brokerName the name of the broker
179 * @return true if the packet has visited the broker
180 */
181 public boolean hasVisited(String brokerName) {
182 if (brokersVisited == null){
183 return false;
184 }
185 return brokersVisited.contains(brokerName);
186 }
187
188 /***
189 * @return Returns the brokersVisited.
190 */
191 public String getBrokersVisitedAsString() {
192 String result = "";
193 if (brokersVisited != null && !brokersVisited.isEmpty()){
194 for (Iterator i = brokersVisited.iterator(); i.hasNext();){
195 result += i.next().toString() + ",";
196 }
197 }
198 return result;
199 }
200
201
202 /***
203 * @return pretty print of this Packet
204 */
205 public String toString() {
206 return getPacketTypeAsString(getPacketType()) + ": " + getId();
207 }
208
209
210 protected static String getPacketTypeAsString(int type) {
211 String packetTypeStr = "";
212 switch (type) {
213 case ACTIVEMQ_MESSAGE:
214 packetTypeStr = "ACTIVEMQ_MESSAGE";
215 break;
216 case ACTIVEMQ_TEXT_MESSAGE:
217 packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
218 break;
219 case ACTIVEMQ_OBJECT_MESSAGE:
220 packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
221 break;
222 case ACTIVEMQ_BYTES_MESSAGE:
223 packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
224 break;
225 case ACTIVEMQ_STREAM_MESSAGE:
226 packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
227 break;
228 case ACTIVEMQ_MAP_MESSAGE:
229 packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
230 break;
231 case ACTIVEMQ_MSG_ACK:
232 packetTypeStr = "ACTIVEMQ_MSG_ACK";
233 break;
234 case RECEIPT_INFO:
235 packetTypeStr = "RECEIPT_INFO";
236 break;
237 case CONSUMER_INFO:
238 packetTypeStr = "CONSUMER_INFO";
239 break;
240 case PRODUCER_INFO:
241 packetTypeStr = "PRODUCER_INFO";
242 break;
243 case TRANSACTION_INFO:
244 packetTypeStr = "TRANSACTION_INFO";
245 break;
246 case XA_TRANSACTION_INFO:
247 packetTypeStr = "XA_TRANSACTION_INFO";
248 break;
249 case ACTIVEMQ_BROKER_INFO:
250 packetTypeStr = "ACTIVEMQ_BROKER_INFO";
251 break;
252 case ACTIVEMQ_CONNECTION_INFO:
253 packetTypeStr = "ACTIVEMQ_CONNECTION_INFO";
254 break;
255 case SESSION_INFO:
256 packetTypeStr = "SESSION_INFO";
257 break;
258 default :
259 packetTypeStr = "UNKNOWN PACKET TYPE: " + type;
260 }
261 return packetTypeStr;
262 }
263
264 /***
265 * A helper method used when implementing equals() which returns true if the objects are identical or equal handling
266 * nulls properly
267 * @param left
268 * @param right
269 *
270 * @return true if the objects are the same or equal or both null
271 */
272 protected boolean equals(Object left, Object right) {
273 return left == right || (left != null && left.equals(right));
274 }
275
276 /***
277 * Initializes another message with current values from this instance
278 *
279 * @param other the other ActiveMQMessage to initialize
280 */
281 protected void initializeOther(AbstractPacket other) {
282 initializeBrokersVisited();
283 other.id = this.id;
284 other.receiptRequired = this.receiptRequired;
285 CopyOnWriteArraySet set = this.brokersVisited;
286 if (set != null && !set.isEmpty()){
287 other.brokersVisited = new CopyOnWriteArraySet(set);
288 }
289 }
290
291 synchronized void initializeBrokersVisited(){
292 if (this.brokersVisited == null){
293 this.brokersVisited = new CopyOnWriteArraySet();
294 }
295 }
296
297 /***
298 * @return Returns the brokersVisited.
299 */
300 Object[] getBrokersVisited() {
301 if (brokersVisited == null || brokersVisited.isEmpty()){
302 return null;
303 }
304 return brokersVisited.toArray();
305 }
306
307 /***
308 * @return Returns the bitArray.
309 */
310 public BitArray getBitArray() {
311 return bitArray;
312 }
313 /***
314 * @param bitArray The bitArray to set.
315 */
316 public void setBitArray(BitArray bitArray) {
317 this.bitArray = bitArray;
318 }
319 }