/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.activemq.ActiveMQDispatcher;
import org.activemq.ActiveMQSession;
import org.activemq.MessageDispatchChannel;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ConsumerId;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.RedeliveryPolicy;
import org.activemq.management.JMSConsumerStatsImpl;
import org.activemq.management.StatsCapable;
import org.activemq.management.StatsImpl;
import org.activemq.selector.SelectorParser;
import org.activemq.thread.Scheduler;
import org.activemq.transaction.Synchronization;
import org.activemq.util.Callback;
import org.activemq.util.IntrospectionSupport;
import org.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQMessageConsumer
implements MessageConsumer,
StatsCapable,
ActiveMQDispatcher {
    private static final Log log = LogFactory.getLog((Class)ActiveMQMessageConsumer.class);
    protected final ActiveMQSession session;
    protected final ConsumerInfo info;
    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
    private final LinkedList deliveredMessages = new LinkedList();
    private int deliveredCounter = 0;
    private int additionalWindowSize = 0;
    private int rollbackCounter = 0;
    private long redeliveryDelay = 0L;
    private MessageListener messageListener;
    private JMSConsumerStatsImpl stats;
    private final String selector;
    private boolean synchronizationRegistered = false;
    private AtomicBoolean started = new AtomicBoolean(false);

    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, boolean noLocal, boolean browser, boolean dispatchAsync) throws JMSException {
        if (dest == null) {
            throw new InvalidDestinationException("Don't understand null destinations");
        }
        if (dest.getPhysicalName() == null) {
            throw new InvalidDestinationException("The destination object was not given a physical name.");
        }
        if (dest.isTemporary() && session.connection.isJ2EESpecCompliant()) {
            String physicalName = dest.getPhysicalName();
            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }
            String connectionID = session.connection.getConnectionInfo().getConnectionId().getConnectionId();
            if (physicalName.indexOf(connectionID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }
            if (session.connection.isDeleted(dest)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
        }
        this.session = session;
        this.selector = selector;
        this.info = new ConsumerInfo(consumerId);
        this.info.setSubcriptionName(name);
        this.info.setPrefetchSize(prefetch);
        this.info.setNoLocal(noLocal);
        this.info.setDispatchAsync(dispatchAsync);
        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
        if (dest.getOptions() != null) {
            HashMap options = new HashMap(dest.getOptions());
            IntrospectionSupport.setProperties(this.info, options, "consumer.");
        }
        this.info.setDestination(dest);
        this.info.setBrowser(browser);
        if (selector != null && selector.trim().length() != 0) {
            new SelectorParser().parse(selector);
            this.info.setSelector(selector);
        } else {
            this.info.setSelector(null);
        }
        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(this.info);
        }
        catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }
        if (session.connection.isStarted()) {
            this.start();
        }
    }

    public StatsImpl getStats() {
        return this.stats;
    }

    public JMSConsumerStatsImpl getConsumerStats() {
        return this.stats;
    }

    protected ConsumerId getConsumerId() {
        return this.info.getConsumerId();
    }

    protected String getConsumerName() {
        return this.info.getSubcriptionName();
    }

    protected boolean isNoLocal() {
        return this.info.isNoLocal();
    }

    protected boolean isBrowser() {
        return this.info.isBrowser();
    }

    protected ActiveMQDestination getDestination() {
        return this.info.getDestination();
    }

    public int getPrefetchNumber() {
        return this.info.getPrefetchSize();
    }

    public boolean isDurableSubscriber() {
        return false;
    }

    public String getMessageSelector() throws JMSException {
        this.checkClosed();
        return this.selector;
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        this.messageListener = listener;
        if (listener != null) {
            boolean wasRunning = this.session.isRunning();
            if (wasRunning) {
                this.session.stop();
            }
            this.session.redispatch(this.unconsumedMessages);
            if (wasRunning) {
                this.session.start();
            }
        }
    }

    private MessageDispatch dequeue(long timeout) throws JMSException {
        try {
            MessageDispatch md;
            long deadline = 0L;
            if (timeout > 0L) {
                deadline = System.currentTimeMillis() + timeout;
            }
            while (true) {
                if ((md = this.unconsumedMessages.dequeue(timeout)) == null) {
                    if (timeout > 0L && !this.unconsumedMessages.isClosed()) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0L);
                        continue;
                    }
                    return null;
                }
                if (!md.getMessage().isExpired()) break;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Received expired message: " + md));
                }
                this.beforeMessageIsConsumed(md);
                this.afterMessageIsConsumed(md, true);
                if (timeout <= 0L) continue;
                timeout = Math.max(deadline - System.currentTimeMillis(), 0L);
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Received message: " + md));
            }
            return md;
        }
        catch (InterruptedException e) {
            throw JMSExceptionSupport.create(e);
        }
    }

    public Message receive() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        MessageDispatch md = this.dequeue(-1L);
        if (md == null) {
            return null;
        }
        this.beforeMessageIsConsumed(md);
        this.afterMessageIsConsumed(md, false);
        return this.createActiveMQMessage(md);
    }

    private ActiveMQMessage createActiveMQMessage(MessageDispatch md) {
        ActiveMQMessage m = (ActiveMQMessage)md.getMessage();
        if (this.session.isClientAcknowledge()) {
            m.setAcknowledgeCallback(new Callback(){

                public void execute() throws Throwable {
                    ActiveMQMessageConsumer.this.session.checkClosed();
                    ActiveMQMessageConsumer.this.session.acknowledge();
                }
            });
        }
        return m;
    }

    public Message receive(long timeout) throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        if (timeout == 0L) {
            return this.receive();
        }
        if (timeout > 0L) {
            MessageDispatch md = this.dequeue(timeout);
            if (md == null) {
                return null;
            }
            this.beforeMessageIsConsumed(md);
            this.afterMessageIsConsumed(md, false);
            return this.createActiveMQMessage(md);
        }
        return null;
    }

    public Message receiveNoWait() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        MessageDispatch md = this.dequeue(0L);
        if (md == null) {
            return null;
        }
        this.beforeMessageIsConsumed(md);
        this.afterMessageIsConsumed(md, false);
        return this.createActiveMQMessage(md);
    }

    public void close() throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            this.dispose();
            this.session.syncSendPacket(this.info.createRemoveCommand());
        }
    }

    public void dispose() throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            if (this.session.isTransacted() || this.session.isDupsOkAcknowledge()) {
                this.acknowledge();
            }
            this.deliveredMessages.clear();
            this.unconsumedMessages.close();
            this.session.removeConsumer(this);
        }
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.unconsumedMessages.isClosed()) {
            throw new IllegalStateException("The Consumer is closed");
        }
    }

    protected void checkMessageListener() throws IllegalStateException {
        if (this.messageListener != null) {
            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
        }
    }

    private void beforeMessageIsConsumed(MessageDispatch md) {
        md.setDeliverySequenceId(this.session.getNextDeliveryId());
        if (!this.session.isDupsOkAcknowledge()) {
            this.deliveredMessages.addFirst(md);
        }
    }

    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (this.unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            this.ackLater(md, (byte)0);
        } else {
            this.stats.onMessage();
            if (this.session.isTransacted()) {
                this.ackLater(md, (byte)0);
            } else if (this.session.isAutoAcknowledge()) {
                if (!this.deliveredMessages.isEmpty()) {
                    MessageAck ack = new MessageAck(md, 2, this.deliveredMessages.size());
                    this.session.asyncSendPacket(ack);
                    this.deliveredMessages.clear();
                }
            } else if (this.session.isDupsOkAcknowledge()) {
                this.ackLater(md, (byte)2);
            } else if (this.session.isClientAcknowledge()) {
                this.ackLater(md, (byte)0);
            } else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }

    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
        if (this.session.isTransacted()) {
            this.session.doStartTransaction();
            if (!this.synchronizationRegistered) {
                this.synchronizationRegistered = true;
                this.session.getTransactionContext().addSynchronization(new Synchronization(){

                    public void beforeEnd() throws Throwable {
                        ActiveMQMessageConsumer.this.acknowledge();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }

                    public void afterCommit() throws Throwable {
                        ActiveMQMessageConsumer.this.commit();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }

                    public void afterRollback() throws Throwable {
                        ActiveMQMessageConsumer.this.rollback();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }
                });
            }
        }
        ++this.deliveredCounter;
        if (0.5 * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter - this.additionalWindowSize)) {
            MessageAck ack = new MessageAck(md, ackType, this.deliveredCounter);
            ack.setTransactionId(this.session.getTransactionContext().getTransactionId());
            this.session.asyncSendPacket(ack);
            this.additionalWindowSize = this.deliveredCounter;
            if (ackType == 2) {
                this.additionalWindowSize = 0;
                this.deliveredCounter = 0;
            }
        }
    }

    public void acknowledge() throws JMSException {
        if (this.deliveredMessages.isEmpty()) {
            return;
        }
        MessageDispatch lastMd = (MessageDispatch)this.deliveredMessages.get(0);
        MessageAck ack = new MessageAck(lastMd, 2, this.deliveredMessages.size());
        if (this.session.isTransacted()) {
            this.session.doStartTransaction();
            ack.setTransactionId(this.session.getTransactionContext().getTransactionId());
        }
        this.session.asyncSendPacket(ack);
        this.deliveredCounter -= this.deliveredMessages.size();
        this.additionalWindowSize = Math.max(0, this.additionalWindowSize - this.deliveredMessages.size());
        if (!this.session.isTransacted()) {
            this.deliveredMessages.clear();
        }
    }

    public void commit() throws JMSException {
        this.deliveredMessages.clear();
        this.rollbackCounter = 0;
        this.redeliveryDelay = 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback() throws JMSException {
        Object object = this.unconsumedMessages.getMutex();
        synchronized (object) {
            if (this.deliveredMessages.isEmpty()) {
                return;
            }
            ++this.rollbackCounter;
            RedeliveryPolicy redeliveryPolicy = this.session.connection.getRedeliveryPolicy();
            if (this.rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
                MessageDispatch lastMd = (MessageDispatch)this.deliveredMessages.get(0);
                MessageAck ack = new MessageAck(lastMd, 1, this.deliveredMessages.size());
                this.session.asyncSendPacket(ack);
                this.deliveredCounter -= this.deliveredMessages.size();
                this.additionalWindowSize = Math.max(0, this.additionalWindowSize - this.deliveredMessages.size());
            } else {
                this.unconsumedMessages.stop();
                if (this.redeliveryDelay == 0L) {
                    this.redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                } else if (redeliveryPolicy.isUseExponentialBackOff()) {
                    this.redeliveryDelay *= (long)redeliveryPolicy.getBackOffMultiplier();
                }
                Scheduler.executeAfterDelay(new Runnable(){

                    public void run() {
                        if (ActiveMQMessageConsumer.this.started.get()) {
                            ActiveMQMessageConsumer.this.unconsumedMessages.start();
                        }
                    }
                }, this.redeliveryDelay);
                Iterator iter = this.deliveredMessages.iterator();
                while (iter.hasNext()) {
                    MessageDispatch md = (MessageDispatch)iter.next();
                    md.getMessage().incrementRedeliveryCounter();
                    this.unconsumedMessages.enqueueFirst(md);
                }
            }
            this.deliveredMessages.clear();
        }
        if (this.messageListener != null) {
            this.session.redispatch(this.unconsumedMessages);
        }
    }

    public void dispatch(MessageDispatch md) {
        MessageListener listener = this.messageListener;
        try {
            if (!this.unconsumedMessages.isClosed()) {
                if (listener != null) {
                    ActiveMQMessage message = this.createActiveMQMessage(md);
                    this.beforeMessageIsConsumed(md);
                    listener.onMessage((Message)message);
                    this.afterMessageIsConsumed(md, false);
                } else {
                    this.unconsumedMessages.enqueue(md);
                }
            }
        }
        catch (Exception e) {
            log.warn((Object)("could not process message: " + md), (Throwable)e);
        }
    }

    public int getMessageSize() {
        return this.unconsumedMessages.size();
    }

    public void start() {
        this.started.set(true);
        this.unconsumedMessages.start();
    }

    public void stop() {
        this.started.set(false);
        this.unconsumedMessages.stop();
    }
}

