package org.jboss.messaging.core.impl.clusterconnection;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.transaction.TransactionManager;
import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.container.ClientConsumer;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;

/* loaded from: input_file:org/jboss/messaging/core/impl/clusterconnection/MessageSucker.class */
public class MessageSucker implements MessageListener {
    private static final Logger log = Logger.getLogger(MessageSucker.class);
    private boolean trace = log.isTraceEnabled();
    private Queue localQueue;
    private Session sourceSession;
    private Session localSession;
    private ProducerDelegate producer;
    private volatile boolean started;
    private TransactionManager tm;
    private boolean consuming;
    private ClientConsumerDelegate consumer;
    private ClientConsumer clientConsumer;
    private boolean preserveOrdering;
    private long sourceChannelID;
    private JBossQueue jbq;

    public String toString() {
        return "MessageSucker:" + System.identityHashCode(this) + " queue:" + this.localQueue.getName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageSucker(Queue queue, Session session, Session session2, boolean z, long j) {
        if (this.trace) {
            log.trace("Creating message sucker, localQueue:" + queue + " preserveOrdering:" + z);
        }
        this.jbq = new JBossQueue(queue.getName(), true);
        this.localQueue = queue;
        this.sourceSession = session;
        this.localSession = session2;
        this.preserveOrdering = z;
        this.sourceChannelID = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        if (this.trace) {
            log.trace(this + " starting");
        }
        this.producer = ((JBossSession) this.localSession).getDelegate().createProducerDelegate(this.jbq);
        this.consumer = (ClientConsumerDelegate) ((JBossSession) this.sourceSession).getDelegate().createConsumerDelegate(this.jbq, null, false, null, false, false);
        this.clientConsumer = ((ConsumerState) this.consumer.getState()).getClientConsumer();
        this.consumer.setMessageListener(this);
        if (this.trace) {
            log.trace(this + " Registering sucker");
        }
        this.localQueue.registerSucker(this);
        this.started = true;
        if (this.trace) {
            log.trace(this + " Registered sucker");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stop() {
        if (this.started) {
            setConsuming(false);
            this.localQueue.unregisterSucker(this);
            try {
                this.consumer.closing(-1L);
            } catch (Throwable th) {
            }
            try {
                this.consumer.close();
            } catch (Throwable th2) {
            }
            try {
                this.producer.close();
            } catch (Throwable th3) {
            }
            this.sourceSession = null;
            this.localSession = null;
            this.consumer = null;
            this.clientConsumer = null;
            this.producer = null;
            this.started = false;
        }
    }

    public String getQueueName() {
        return this.localQueue.getName();
    }

    public synchronized void setConsuming(boolean z) {
        if (this.trace) {
            log.trace(this + " setConsuming " + z);
        }
        if (z) {
            try {
                if (!this.consuming) {
                    if (this.trace) {
                        log.trace(this + " resuming client consumer");
                    }
                    this.clientConsumer.resume();
                    this.consuming = true;
                }
            } catch (Exception e) {
                return;
            }
        }
        if (!z && this.consuming) {
            if (this.trace) {
                log.trace(this + " pausing client consumer");
            }
            this.clientConsumer.pause();
            this.consuming = false;
        }
    }

    public void onMessage(Message message) {
        try {
            if (this.trace) {
                log.trace(this + " sucked message " + message + " JMSDestination - " + message.getJMSDestination());
            }
            Destination jMSDestination = message.getJMSDestination();
            JBossMessage message2 = ((MessageProxy) message).getMessage();
            if (this.preserveOrdering) {
                message2.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
            }
            message2.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, Long.valueOf(this.sourceChannelID));
            long jMSExpiration = message.getJMSExpiration();
            if (jMSExpiration != 0) {
                jMSExpiration -= System.currentTimeMillis();
                if (jMSExpiration <= 0) {
                    jMSExpiration = 1;
                }
            }
            message.acknowledge();
            if (this.trace) {
                log.trace("Acknowledged message");
            }
            message2.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, jMSDestination);
            synchronized (this.localSession) {
                this.producer.send(null, message, message.getJMSDeliveryMode(), message.getJMSPriority(), jMSExpiration, true);
                if (this.trace) {
                    log.trace(this + " forwarded message to queue");
                }
            }
        } catch (Exception e) {
            log.error("Failed to forward message", e);
            if (0 != 0) {
                try {
                    this.tm.rollback();
                } catch (Throwable th) {
                    if (this.trace) {
                        log.trace("Failed to rollback tx", th);
                    }
                }
            }
        }
    }
}
