/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.AbstractSubscription;
import org.activemq.broker.region.MessageReference;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.MessageId;
import org.activemq.transaction.Synchronization;

public abstract class PrefetchSubscription
extends AbstractSubscription {
    protected final LinkedList matched = new LinkedList();
    protected final LinkedList dispatched = new LinkedList();
    protected final ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
    protected int delivered = 0;
    int preLoadLimit = 102400;
    int preLoadSize = 0;
    boolean dispatching = false;

    public PrefetchSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
        super(context, info);
    }

    public synchronized void add(MessageReference node) throws Throwable {
        if (!this.isFull()) {
            this.dispatch(node);
        } else {
            this.matched.addLast(node);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void acknowledge(ConnectionContext context, final MessageAck ack) throws Throwable {
        boolean wasFull = this.isFull();
        if (ack.isStandardAck()) {
            int index = 0;
            boolean inAckRange = false;
            Iterator iter = this.dispatched.iterator();
            while (iter.hasNext()) {
                MessageReference node = (MessageReference)iter.next();
                MessageId messageId = node.getMessageId();
                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                    inAckRange = true;
                }
                if (!inAckRange) continue;
                if (!context.isInTransaction()) {
                    iter.remove();
                } else {
                    context.getTransaction().addSynchronization(new Synchronization(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void afterCommit() throws Throwable {
                            PrefetchSubscription prefetchSubscription = PrefetchSubscription.this;
                            synchronized (prefetchSubscription) {
                                boolean inAckRange = false;
                                int index = 0;
                                Iterator iter = PrefetchSubscription.this.dispatched.iterator();
                                while (iter.hasNext()) {
                                    MessageReference node = (MessageReference)iter.next();
                                    MessageId messageId = node.getMessageId();
                                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                                        inAckRange = true;
                                    }
                                    if (!inAckRange) continue;
                                    ++index;
                                    iter.remove();
                                    if (!ack.getLastMessageId().equals(messageId)) continue;
                                    PrefetchSubscription.this.delivered = Math.max(0, PrefetchSubscription.this.delivered - (index + 1));
                                    return;
                                }
                            }
                        }
                    });
                }
                ++index;
                this.acknowledge(context, ack, node);
                if (!ack.getLastMessageId().equals(messageId)) continue;
                this.delivered = context.isInTransaction() ? Math.max(this.delivered, index + 1) : Math.max(0, this.delivered - (index + 1));
                if (wasFull && !this.isFull()) {
                    this.dispatchMatched();
                }
                return;
            }
            throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
        }
        if (ack.isDeliveredAck()) {
            int index = 0;
            Iterator iter = this.dispatched.iterator();
            while (iter.hasNext()) {
                MessageReference node = (MessageReference)iter.next();
                if (ack.getLastMessageId().equals(node.getMessageId())) {
                    this.delivered = Math.max(this.delivered, index + 1);
                    if (wasFull && !this.isFull()) {
                        this.dispatchMatched();
                    }
                    return;
                }
                ++index;
            }
            throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
        }
        if (ack.isPoisonAck()) {
            if (ack.isInTransaction()) {
                throw new JMSException("Poison ack cannot be transacted: " + ack);
            }
            int index = 0;
            boolean inAckRange = false;
            Iterator iter = this.dispatched.iterator();
            while (iter.hasNext()) {
                MessageId messageId;
                MessageReference node;
                block21: {
                    node = (MessageReference)iter.next();
                    messageId = node.getMessageId();
                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                        inAckRange = true;
                    }
                    if (!inAckRange) continue;
                    node.incrementReferenceCount();
                    try {
                        Message message = node.getMessage();
                        if (message == null) break block21;
                        message.setDestination(this.dlqDestination);
                        message.setProducerId(message.getMessageId().getProducerId());
                        message.setTransactionId(null);
                        boolean originalFlowControl = context.isProducerFlowControl();
                        try {
                            context.setProducerFlowControl(false);
                            context.getBroker().send(context, message);
                        }
                        finally {
                            context.setProducerFlowControl(originalFlowControl);
                        }
                    }
                    finally {
                        node.decrementReferenceCount();
                    }
                }
                iter.remove();
                ++index;
                this.acknowledge(context, ack, node);
                if (!ack.getLastMessageId().equals(messageId)) continue;
                this.delivered = Math.max(0, this.delivered - (index + 1));
                if (wasFull && !this.isFull()) {
                    this.dispatchMatched();
                }
                return;
            }
            throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
        }
        throw new JMSException("Invalid acknowledgment: " + ack);
    }

    protected boolean isFull() {
        return this.dispatched.size() - this.delivered >= this.info.getPrefetchSize() || this.preLoadSize > this.preLoadLimit;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void dispatchMatched() throws IOException {
        if (!this.dispatching) {
            this.dispatching = true;
            try {
                Iterator iter = this.matched.iterator();
                while (iter.hasNext() && !this.isFull()) {
                    MessageReference node = (MessageReference)iter.next();
                    iter.remove();
                    this.dispatch(node);
                }
            }
            finally {
                this.dispatching = false;
            }
        }
    }

    private void dispatch(final MessageReference node) throws IOException {
        node.incrementReferenceCount();
        final Message message = node.getMessage();
        if (message == null) {
            return;
        }
        this.incrementPreloadSize(node.getMessage().getSize());
        if (this.canDispatch(node)) {
            MessageDispatch md = this.createMessageDispatch(node, message);
            this.dispatched.addLast(node);
            if (this.info.isDispatchAsync()) {
                md.setConsumer(new Runnable(){

                    public void run() {
                        PrefetchSubscription.this.onDispatch(node, message);
                    }
                });
                this.context.getConnection().dispatchAsync(md);
            } else {
                this.context.getConnection().dispatchSync(md);
                this.onDispatch(node, message);
            }
        }
    }

    private synchronized void onDispatch(MessageReference node, Message message) {
        boolean wasFull = this.isFull();
        this.decrementPreloadSize(message.getSize());
        node.decrementReferenceCount();
        if (node.getRegionDestination() != null) {
            node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
            if (wasFull && !this.isFull()) {
                try {
                    this.dispatchMatched();
                }
                catch (IOException e) {
                    this.context.getConnection().serviceException(e);
                }
            }
        }
    }

    private int incrementPreloadSize(int size) {
        this.preLoadSize += size;
        return this.preLoadSize;
    }

    private int decrementPreloadSize(int size) {
        this.preLoadSize -= size;
        return this.preLoadSize;
    }

    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
        MessageDispatch md = new MessageDispatch();
        md.setConsumerId(this.info.getConsumerId());
        md.setDestination(node.getRegionDestination().getActiveMQDestination());
        md.setMessage(message);
        md.setRedeliveryCounter(node.getRedeliveryCounter());
        return md;
    }

    protected abstract boolean canDispatch(MessageReference var1);

    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
    }
}

