1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.codehaus.activemq.broker.BrokerClient;
21 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
22 import org.codehaus.activemq.message.ActiveMQMessage;
23 import org.codehaus.activemq.message.DefaultWireFormat;
24 import org.codehaus.activemq.message.MessageAck;
25 import org.codehaus.activemq.message.Packet;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.service.TransactionTask;
28 import org.codehaus.activemq.util.JMSExceptionHelper;
29
30 import javax.jms.JMSException;
31 import java.io.Externalizable;
32 import java.io.IOException;
33 import java.io.ObjectInput;
34 import java.io.ObjectOutput;
35
36 /***
37 * @version $Revision: 1.2 $
38 */
39 public abstract class PacketTransactionTask implements TransactionTask, Externalizable {
40 private static final long serialVersionUID = -5754338187296859149L;
41 private static transient final WireFormat wireFormat = new DefaultWireFormat();
42
43 private BrokerClient brokerClient;
44 private Packet packet;
45
46
47 public static TransactionTask fromBytes(byte[] data) throws IOException {
48 Packet packet = wireFormat.fromBytes(data);
49 return createTask(packet);
50 }
51
52 public byte[] toBytes() throws JMSException, IOException {
53 return wireFormat.toBytes(packet);
54 }
55
56 public static TransactionTask readTask(ObjectInput in) throws IOException {
57 Packet packet = readPacket(in);
58 return createTask(packet);
59 }
60
61 public static TransactionTask createTask(Packet packet) throws IOException {
62 if (packet instanceof MessageAck) {
63 return new MessageAckTransactionTask(null, (MessageAck) packet);
64 }
65 else if (packet instanceof ActiveMQMessage) {
66 return new SendMessageTransactionTask(null, (ActiveMQMessage) packet);
67 }
68 else {
69 throw new IOException("Unexpected packet type: " + packet);
70 }
71 }
72
73 public static void writeTask(TransactionTask task, ObjectOutput out) throws IOException {
74 if (task instanceof PacketTransactionTask) {
75 PacketTransactionTask packetTask = (PacketTransactionTask) task;
76 writePacket(packetTask.getPacket(), out);
77 }
78 else {
79 out.writeObject(task);
80 }
81 }
82
83 protected PacketTransactionTask(BrokerClient brokerClient, Packet packet) {
84 this.brokerClient = brokerClient;
85 this.packet = packet;
86 }
87
88 public Packet getPacket() {
89 return packet;
90 }
91
92 public void writeExternal(ObjectOutput out) throws IOException {
93 writePacket(packet, out);
94 }
95
96 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
97 packet = readPacket(in);
98 }
99
100
101
102
103 protected BrokerClient createBrokerClient(String consumerId) throws JMSException {
104 BrokerClientImpl answer = new BrokerClientImpl();
105 return answer;
106 }
107
108 protected BrokerClient getBrokerClient(String consumerId) throws JMSException {
109
110 brokerClient = createBrokerClient(consumerId);
111
112
113
114
115
116 return brokerClient;
117 }
118
119 protected static void writePacket(Packet packet, ObjectOutput out) throws IOException {
120 try {
121 wireFormat.writePacket(packet, out);
122 }
123 catch (JMSException e) {
124 throw JMSExceptionHelper.newIOException(e);
125 }
126 }
127
128 protected static Packet readPacket(ObjectInput in) throws IOException {
129 return wireFormat.readPacket(in);
130 }
131 }