1 package org.jencks;
2
3 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
4 import EDU.oswego.cs.dl.util.concurrent.Latch;
5 import junit.framework.TestCase;
6 import org.activemq.ActiveMQConnectionFactory;
7 import org.activemq.message.ActiveMQQueue;
8 import org.activemq.ra.ActiveMQActivationSpec;
9 import org.activemq.ra.ActiveMQResourceAdapter;
10 import org.apache.geronimo.connector.ActivationSpecWrapper;
11 import org.apache.geronimo.connector.ResourceAdapterWrapper;
12 import org.apache.geronimo.connector.work.GeronimoWorkManager;
13 import org.apache.geronimo.transaction.ExtendedTransactionManager;
14 import org.apache.geronimo.transaction.context.TransactionContextManager;
15 import org.apache.geronimo.transaction.log.UnrecoverableLog;
16 import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
17 import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
18 import org.apache.geronimo.transaction.manager.XidImporter;
19
20 import javax.jms.Connection;
21 import javax.jms.Message;
22 import javax.jms.MessageListener;
23 import javax.jms.MessageProducer;
24 import javax.jms.Queue;
25 import javax.jms.Session;
26 import javax.resource.ResourceException;
27 import javax.resource.spi.UnavailableException;
28 import javax.resource.spi.endpoint.MessageEndpoint;
29 import javax.resource.spi.endpoint.MessageEndpointFactory;
30 import javax.transaction.xa.XAResource;
31 import java.lang.reflect.Method;
32
33 public class HandWiredJencksTest extends TestCase {
34
35 public class StubMessageEndpoint implements MessageEndpoint, MessageListener {
36 public int messageCount;
37 private final ExtendedTransactionManager etm;
38 private final XAResource resource;
39
40 public StubMessageEndpoint(ExtendedTransactionManager etm, XAResource resource) {
41 this.etm = etm;
42 this.resource = new WrapperNamedXAResource(resource, "test");
43 }
44
45 public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
46 try {
47 etm.begin();
48 etm.getTransaction().enlistResource(resource);
49 }
50 catch (Throwable e) {
51 throw new ResourceException(e);
52 }
53 }
54
55 public void afterDelivery() throws ResourceException {
56 try {
57 etm.commit();
58 }
59 catch (Throwable e) {
60 throw new ResourceException(e);
61 }
62 }
63
64 public void release() {
65 }
66
67 public void onMessage(Message message) {
68 messageCount++;
69 }
70 }
71
72 public void testIt() throws Exception {
73
74
75 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
76 Connection connection = cf.createConnection();
77 connection.start();
78
79 CopyOnWriteArrayList resourceAdapters = null;
80 TransactionManagerImpl tm = new TransactionManagerImpl(600, new UnrecoverableLog(), resourceAdapters);
81
82 final ExtendedTransactionManager etm = tm;
83 XidImporter xidImporter = tm;
84 TransactionContextManager manager = new TransactionContextManager(etm, xidImporter);
85
86 GeronimoWorkManager workManager = new GeronimoWorkManager(1000, manager);
87 workManager.doStart();
88
89 ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
90 ra.setServerUrl("vm://localhost");
91
92 ResourceAdapterWrapper raWrapper = new ResourceAdapterWrapper(ra, workManager);
93 raWrapper.doStart();
94
95 ActiveMQActivationSpec as = new ActiveMQActivationSpec();
96 as.setDestination("TEST");
97 as.setDestinationType(Queue.class.getName());
98 ActivationSpecWrapper asWrapper = new ActivationSpecWrapper(as, raWrapper);
99
100 final Latch messageDelivered = new Latch();
101 MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
102 public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
103 return new StubMessageEndpoint(etm, resource) {
104 public void onMessage(Message message) {
105 super.onMessage(message);
106 messageDelivered.release();
107 }
108
109 ;
110 };
111 }
112
113 public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
114 return true;
115 }
116 };
117
118 asWrapper.activate(messageEndpointFactory);
119
120 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121 MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
122 producer.send(session.createTextMessage("Hello"));
123
124 assertTrue(messageDelivered.attempt(1000 * 5));
125 }
126
127 }