1   package org.jencks;
2   
3   import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
4   import EDU.oswego.cs.dl.util.concurrent.Latch;
5   import junit.framework.TestCase;
6   import org.activemq.ActiveMQConnectionFactory;
7   import org.activemq.message.ActiveMQQueue;
8   import org.activemq.ra.ActiveMQActivationSpec;
9   import org.activemq.ra.ActiveMQResourceAdapter;
10  import org.apache.geronimo.connector.ActivationSpecWrapper;
11  import org.apache.geronimo.connector.ResourceAdapterWrapper;
12  import org.apache.geronimo.connector.work.GeronimoWorkManager;
13  import org.apache.geronimo.transaction.ExtendedTransactionManager;
14  import org.apache.geronimo.transaction.context.TransactionContextManager;
15  import org.apache.geronimo.transaction.log.UnrecoverableLog;
16  import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
17  import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
18  import org.apache.geronimo.transaction.manager.XidImporter;
19  
20  import javax.jms.Connection;
21  import javax.jms.Message;
22  import javax.jms.MessageListener;
23  import javax.jms.MessageProducer;
24  import javax.jms.Queue;
25  import javax.jms.Session;
26  import javax.resource.ResourceException;
27  import javax.resource.spi.UnavailableException;
28  import javax.resource.spi.endpoint.MessageEndpoint;
29  import javax.resource.spi.endpoint.MessageEndpointFactory;
30  import javax.transaction.xa.XAResource;
31  import java.lang.reflect.Method;
32  
33  public class HandWiredJencksTest extends TestCase {
34  
35      public class StubMessageEndpoint implements MessageEndpoint, MessageListener {
36          public int messageCount;
37          private final ExtendedTransactionManager etm;
38          private final XAResource resource;
39  
40          public StubMessageEndpoint(ExtendedTransactionManager etm, XAResource resource) {
41              this.etm = etm;
42              this.resource = new WrapperNamedXAResource(resource, "test");
43          }
44  
45          public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
46              try {
47                  etm.begin();
48                  etm.getTransaction().enlistResource(resource);
49              }
50              catch (Throwable e) {
51                  throw new ResourceException(e);
52              }
53          }
54  
55          public void afterDelivery() throws ResourceException {
56              try {
57                  etm.commit();
58              }
59              catch (Throwable e) {
60                  throw new ResourceException(e);
61              }
62          }
63  
64          public void release() {
65          }
66  
67          public void onMessage(Message message) {
68              messageCount++;
69          }
70      }
71  
72      public void testIt() throws Exception {
73  
74  
75          ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
76          Connection connection = cf.createConnection();
77          connection.start();
78  
79          CopyOnWriteArrayList resourceAdapters = null; //new CopyOnWriteArrayList();
80          TransactionManagerImpl tm = new TransactionManagerImpl(600, new UnrecoverableLog(), resourceAdapters);
81  
82          final ExtendedTransactionManager etm = tm;
83          XidImporter xidImporter = tm;
84          TransactionContextManager manager = new TransactionContextManager(etm, xidImporter);
85  
86          GeronimoWorkManager workManager = new GeronimoWorkManager(1000, manager);
87          workManager.doStart();
88  
89          ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
90          ra.setServerUrl("vm://localhost");
91  
92          ResourceAdapterWrapper raWrapper = new ResourceAdapterWrapper(ra, workManager);
93          raWrapper.doStart();
94  
95          ActiveMQActivationSpec as = new ActiveMQActivationSpec();
96          as.setDestination("TEST");
97          as.setDestinationType(Queue.class.getName());
98          ActivationSpecWrapper asWrapper = new ActivationSpecWrapper(as, raWrapper);
99  
100         final Latch messageDelivered = new Latch();
101         MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
102             public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
103                 return new StubMessageEndpoint(etm, resource) {
104                     public void onMessage(Message message) {
105                         super.onMessage(message);
106                         messageDelivered.release();
107                     }
108 
109                     ;
110                 };
111             }
112 
113             public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
114                 return true;
115             }
116         };
117 
118         asWrapper.activate(messageEndpointFactory);
119 
120         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121         MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
122         producer.send(session.createTextMessage("Hello"));
123 
124         assertTrue(messageDelivered.attempt(1000 * 5));
125     }
126 
127 }