/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.messaging.core.impl.clusterconnection;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossSession;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.tm.TransactionManagerLocator;

public class MessageSucker
implements MessageListener {
    private static final Logger log = Logger.getLogger(MessageSucker.class);
    private boolean trace = log.isTraceEnabled();
    private JBossConnection sourceConnection;
    private JBossConnection localConnection;
    private Queue localQueue;
    private SessionDelegate sourceSession;
    private SessionDelegate localSession;
    private ProducerDelegate producer;
    private volatile boolean started;
    private boolean xa;
    private TransactionManager tm;
    private boolean consuming;
    private ConsumerDelegate consumer;
    private boolean preserveOrdering;

    public String toString() {
        return "MessageSucker:" + System.identityHashCode(this) + " queue:" + this.localQueue.getName();
    }

    MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection, boolean xa, boolean preserveOrdering) {
        if (this.trace) {
            log.trace("Creating message sucker, localQueue:" + localQueue + " xa:" + xa + " preserveOrdering:" + preserveOrdering);
        }
        this.localQueue = localQueue;
        this.sourceConnection = sourceConnection;
        this.localConnection = localConnection;
        this.xa = xa;
        this.preserveOrdering = preserveOrdering;
        if (xa) {
            this.tm = TransactionManagerLocator.getInstance().locate();
        }
    }

    synchronized void start() throws Exception {
        JBossSession sess;
        if (this.started) {
            return;
        }
        if (this.trace) {
            log.trace(this + " starting");
        }
        if (!this.xa) {
            sess = (JBossSession)this.sourceConnection.createSession(false, 2);
            this.sourceSession = sess.getDelegate();
            sess = (JBossSession)this.localConnection.createSession(false, 1);
            this.localSession = sess.getDelegate();
        } else {
            sess = (JBossSession)this.sourceConnection.createXASession();
            this.sourceSession = sess.getDelegate();
            sess = (JBossSession)this.localConnection.createXASession();
            this.localSession = sess.getDelegate();
        }
        JBossQueue dest = new JBossQueue(this.localQueue.getName(), true);
        this.producer = this.localSession.createProducerDelegate(dest);
        this.consumer = this.sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
        this.consumer.setMessageListener(this);
        if (this.trace) {
            log.trace(this + " Registering sucker");
        }
        this.localQueue.registerSucker(this);
        this.started = true;
        if (this.trace) {
            log.trace(this + " Registered sucker");
        }
    }

    synchronized void stop() {
        if (!this.started) {
            return;
        }
        this.setConsuming(false);
        this.localQueue.unregisterSucker(this);
        try {
            this.sourceSession.close();
        }
        catch (Throwable t) {
            // empty catch block
        }
        try {
            this.localSession.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.started = false;
    }

    public String getQueueName() {
        return this.localQueue.getName();
    }

    public synchronized void setConsuming(boolean consume) {
        if (this.trace) {
            log.trace(this + " setConsuming " + consume);
        }
        try {
            if (consume && !this.consuming) {
                this.consumer.changeRate(1.0f);
                if (this.trace) {
                    log.trace(this + " sent changeRate(1) message");
                }
                this.consuming = true;
            } else if (!consume && this.consuming) {
                this.consumer.changeRate(0.0f);
                if (this.trace) {
                    log.trace(this + " sent changeRate(0) message");
                }
                this.consuming = false;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void onMessage(Message msg) {
        block17: {
            Transaction tx = null;
            if (this.trace) {
                log.trace(this + " sucked message " + msg);
            }
            try {
                long timeToLive;
                boolean startTx;
                boolean bl = startTx = this.xa && msg.getJMSDeliveryMode() == 2;
                if (startTx) {
                    if (this.trace) {
                        log.trace("Starting JTA transactions");
                    }
                    this.tm.begin();
                    tx = this.tm.getTransaction();
                    tx.enlistResource(this.sourceSession.getXAResource());
                    tx.enlistResource(this.localSession.getXAResource());
                    if (this.trace) {
                        log.trace("Started JTA transaction");
                    }
                }
                if (this.preserveOrdering) {
                    ((MessageProxy)msg).getMessage().putHeader("CLUSTER_SUCKED", "x");
                }
                if ((timeToLive = msg.getJMSExpiration()) != 0L && (timeToLive -= System.currentTimeMillis()) <= 0L) {
                    timeToLive = 1L;
                }
                this.producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
                if (this.trace) {
                    log.trace(this + " forwarded message to queue");
                }
                if (startTx) {
                    if (this.trace) {
                        log.trace("Committing JTA transaction");
                    }
                    tx.delistResource(this.sourceSession.getXAResource(), 0x4000000);
                    tx.delistResource(this.localSession.getXAResource(), 0x4000000);
                    this.tm.commit();
                    if (this.trace) {
                        log.trace("Committed JTA transaction");
                    }
                } else {
                    msg.acknowledge();
                    if (this.trace) {
                        log.trace("Acknowledged message");
                    }
                }
            }
            catch (Exception e) {
                log.error("Failed to forward message", e);
                try {
                    if (tx != null) {
                        this.tm.rollback();
                    }
                }
                catch (Throwable t) {
                    if (!this.trace) break block17;
                    log.trace("Failed to rollback tx", t);
                }
            }
        }
    }
}

