001    /**
002     * 
003     * Copyright 2005 LogicBlaze, Inc.
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.jencks;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    
023    import javax.jms.Message;
024    import javax.jms.MessageListener;
025    import javax.resource.ResourceException;
026    import javax.resource.spi.endpoint.MessageEndpoint;
027    import javax.transaction.HeuristicMixedException;
028    import javax.transaction.HeuristicRollbackException;
029    import javax.transaction.NotSupportedException;
030    import javax.transaction.RollbackException;
031    import javax.transaction.SystemException;
032    import javax.transaction.Transaction;
033    import javax.transaction.TransactionManager;
034    import javax.transaction.xa.XAResource;
035    import java.lang.reflect.Method;
036    
037    /**
038     * An XA based Endpoint which uses an XA transaction per message delivery.
039     *
040     * @version $Revision: 1.1.1.1 $
041     */
042    public class XAEndpoint implements MessageEndpoint, MessageListener {
043        private static final Log log = LogFactory.getLog(XAEndpoint.class);
044    
045        private MessageListener messageListener;
046        private XAResource xaResource;
047        private TransactionManager transactionManager;
048        private Transaction transaction;
049        private boolean beforeDeliveryCompleted;
050        private boolean messageDelivered;
051    
052        public XAEndpoint(MessageListener messageListener, XAResource xaResource, TransactionManager transactionManager) {
053            this.messageListener = messageListener;
054            this.xaResource = xaResource;
055            this.transactionManager = transactionManager;
056        }
057    
058        public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
059            if (transaction != null) {
060                throw new IllegalStateException("Transaction still in progress");
061            }
062            beforeDeliveryCompleted = false;
063            try {
064                transactionManager.begin();
065                transaction = transactionManager.getTransaction();
066    
067                transaction.enlistResource(xaResource);
068                beforeDeliveryCompleted = true;
069    
070                log.trace("Transaction started and resource enlisted");
071            }
072            catch (NotSupportedException e) {
073                System.out.println("Caught: " + e);
074                throw new ResourceException(e);
075            }
076            catch (SystemException e) {
077                System.out.println("Caught: " + e);
078                throw new ResourceException(e);
079            }
080            catch (RollbackException e) {
081                System.out.println("Caught: " + e);
082                throw new ResourceException(e);
083            }
084            catch (Throwable e) {
085                System.out.println("Caught: " + e);
086                e.printStackTrace();
087                throw new ResourceException(e);
088            }
089        }
090    
091        public void afterDelivery() throws ResourceException {
092            if (transaction == null) {
093                throw new IllegalStateException("Transaction not in progress");
094            }
095            if (beforeDeliveryCompleted && messageDelivered) {
096                try {
097                    transaction.delistResource(xaResource, XAResource.TMSUSPEND);
098                }
099                catch (SystemException e) {
100                    throw new ResourceException(e);
101                }
102                try {
103                    transaction.commit();
104                    log.trace("Transaction committed");
105                }
106                catch (RollbackException e) {
107                    throw new ResourceException(e);
108                }
109                catch (HeuristicMixedException e) {
110                    doRollback(e);
111                }
112                catch (HeuristicRollbackException e) {
113                    doRollback(e);
114                }
115                catch (SystemException e) {
116                    doRollback(e);
117                }
118                finally {
119                    transaction = null;
120                }
121            }
122        }
123    
124        public void onMessage(Message message) {
125            messageDelivered = false;
126            messageListener.onMessage(message);
127            messageDelivered = true;
128        }
129    
130        public void release() {
131            if (transaction != null) {
132                try {
133                    transaction.rollback();
134                }
135                catch (SystemException e) {
136                    log.warn("Failed to rollback transaction: " + e, e);
137                }
138            }
139        }
140    
141        protected void doRollback(Exception e) throws ResourceException {
142            try {
143                transaction.rollback();
144                log.trace("Transaction rolled back");
145            }
146            catch (SystemException e1) {
147                log.warn("Caught exception while rolling back: " + e1, e1);
148            }
149            transaction = null;
150            throw new ResourceException(e);
151        }
152    }