/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.AbstractSubscription;
import org.activemq.broker.region.MessageReference;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.memory.UsageManager;
import org.activemq.transaction.Synchronization;

public class TopicSubscription
extends AbstractSubscription {
    protected final LinkedList matched = new LinkedList();
    protected final ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
    protected final UsageManager usageManager;
    protected int dispatched = 0;
    protected int delivered = 0;

    public TopicSubscription(ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
        super(context, info);
        this.usageManager = usageManager;
    }

    public void add(MessageReference node) throws InterruptedException, IOException {
        node.incrementReferenceCount();
        if (!this.isFull()) {
            this.dispatch(node);
        } else {
            this.matched.addLast(node);
        }
    }

    public void acknowledge(ConnectionContext context, final MessageAck ack) throws Throwable {
        boolean wasFull = this.isFull();
        if (ack.isStandardAck() || ack.isPoisonAck()) {
            if (context.isInTransaction()) {
                this.delivered += ack.getMessageCount();
                context.getTransaction().addSynchronization(new Synchronization(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void afterCommit() throws Throwable {
                        TopicSubscription topicSubscription = TopicSubscription.this;
                        synchronized (topicSubscription) {
                            TopicSubscription.this.dispatched -= ack.getMessageCount();
                            TopicSubscription.this.delivered = Math.max(0, TopicSubscription.this.delivered - ack.getMessageCount());
                        }
                    }
                });
            } else {
                this.dispatched -= ack.getMessageCount();
                this.delivered = Math.max(0, this.delivered - ack.getMessageCount());
            }
            if (wasFull && !this.isFull()) {
                this.dispatchMatched();
            }
            return;
        }
        if (ack.isDeliveredAck()) {
            this.delivered += ack.getMessageCount();
            if (wasFull && !this.isFull()) {
                this.dispatchMatched();
            }
            return;
        }
        throw new JMSException("Invalid acknowledgment: " + ack);
    }

    private boolean isFull() {
        return this.dispatched - this.delivered >= this.info.getPrefetchSize();
    }

    private void dispatchMatched() throws IOException {
        Iterator iter = this.matched.iterator();
        while (iter.hasNext() && !this.isFull()) {
            MessageReference message = (MessageReference)iter.next();
            iter.remove();
            this.dispatch(message);
        }
    }

    private void dispatch(final MessageReference node) throws IOException {
        Message message = (Message)node;
        MessageDispatch md = new MessageDispatch();
        md.setMessage(message);
        md.setConsumerId(this.info.getConsumerId());
        md.setDestination(node.getRegionDestination().getActiveMQDestination());
        ++this.dispatched;
        if (this.info.isDispatchAsync()) {
            md.setConsumer(new Runnable(){

                public void run() {
                    node.decrementReferenceCount();
                }
            });
            this.context.getConnection().dispatchAsync(md);
        } else {
            this.context.getConnection().dispatchSync(md);
            node.decrementReferenceCount();
        }
    }

    public String toString() {
        return "TopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + this.dispatched + ", delivered=" + this.delivered + ", matched=" + this.matched.size();
    }
}

