package org.apache.qpid.server.txn;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;

/* loaded from: input_file:org/apache/qpid/server/txn/NonTransactionalContext.class */
public class NonTransactionalContext implements TransactionalContext {
    private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
    private final AMQChannel _channel;
    private final List<RequiredDeliveryException> _returnMessages;
    private final Set<Long> _browsedAcks;
    private final MessageStore _messageStore;
    private final StoreContext _storeContext;
    private boolean _inTran;

    public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel aMQChannel, List<RequiredDeliveryException> list, Set<Long> set) {
        this._channel = aMQChannel;
        this._storeContext = storeContext;
        this._returnMessages = list;
        this._messageStore = messageStore;
        this._browsedAcks = set;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public StoreContext getStoreContext() {
        return this._storeContext;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void beginTranIfNecessary() throws AMQException {
        if (this._inTran) {
            return;
        }
        this._messageStore.beginTran(this._storeContext);
        this._inTran = true;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void commit() throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void rollback() throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void deliver(QueueEntry queueEntry, boolean z) throws AMQException {
        try {
            queueEntry.process(this._storeContext, z);
            queueEntry.checkDeliveredToConsumer();
        } catch (NoConsumersException e) {
            this._returnMessages.add(e);
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void acknowledgeMessage(final long j, long j2, boolean z, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        if (!z) {
            UnacknowledgedMessage remove = unacknowledgedMessageMap.remove(j);
            if (remove == null) {
                _log.info("Single ack on delivery tag " + j + " not known for channel:" + this._channel.getChannelId());
                throw new AMQException("Single ack on delivery tag " + j + " not known for channel:" + this._channel.getChannelId());
            }
            if (this._browsedAcks.contains(Long.valueOf(j))) {
                this._browsedAcks.remove(Long.valueOf(j));
            } else {
                if (_log.isDebugEnabled()) {
                    _log.debug("Discarding message: " + remove.getMessage().getMessageId());
                }
                remove.discard(this._storeContext);
            }
            if (_log.isDebugEnabled()) {
                _log.debug("Received non-multiple ack for messaging with delivery tag " + j + " msg id " + remove.getMessage().getMessageId());
                return;
            }
            return;
        }
        if (j == 0) {
            _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + unacknowledgedMessageMap.size());
            unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { // from class: org.apache.qpid.server.txn.NonTransactionalContext.1
                @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap.Visitor
                public boolean callback(UnacknowledgedMessage unacknowledgedMessage) throws AMQException {
                    if (NonTransactionalContext.this._browsedAcks.contains(Long.valueOf(j))) {
                        NonTransactionalContext.this._browsedAcks.remove(Long.valueOf(j));
                        return false;
                    }
                    if (NonTransactionalContext._log.isDebugEnabled()) {
                        NonTransactionalContext._log.debug("Discarding message: " + unacknowledgedMessage.getMessage().getMessageId());
                    }
                    unacknowledgedMessage.discard(NonTransactionalContext.this._storeContext);
                    return false;
                }

                @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap.Visitor
                public void visitComplete() {
                    unacknowledgedMessageMap.clear();
                }
            });
            return;
        }
        if (!unacknowledgedMessageMap.contains(j)) {
            throw new AMQException("Multiple ack on delivery tag " + j + " not known for channel");
        }
        LinkedList linkedList = new LinkedList();
        unacknowledgedMessageMap.drainTo(linkedList, j);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            UnacknowledgedMessage unacknowledgedMessage = (UnacknowledgedMessage) it.next();
            if (this._browsedAcks.contains(Long.valueOf(j))) {
                this._browsedAcks.remove(Long.valueOf(j));
            } else {
                if (_log.isDebugEnabled()) {
                    _log.debug("Discarding message: " + unacknowledgedMessage.getMessage().getMessageId());
                }
                unacknowledgedMessage.discard(this._storeContext);
            }
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageFullyReceived(boolean z) throws AMQException {
        if (z) {
            this._messageStore.commitTran(this._storeContext);
            this._inTran = false;
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageProcessed(AMQProtocolSession aMQProtocolSession) throws AMQException {
        this._channel.processReturns(aMQProtocolSession);
    }
}
