package org.apache.qpid.server;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;

/* loaded from: input_file:org/apache/qpid/server/AMQChannel.class */
public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder {
    public static final int DEFAULT_PREFETCH = 4096;
    private final int _channelId;
    private AMQQueue _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    private final MessageStore _messageStore;
    private ServerTransaction _transaction;
    private final AMQProtocolSession _session;
    private LogActor _actor;
    private volatile boolean _rollingBack;
    private final ClientDeliveryMethod _clientDeliveryMethod;
    private static final Logger _logger = Logger.getLogger(AMQChannel.class);
    private static final boolean MSG_AUTH = ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
    private static final Runnable NULL_TASK = new Runnable() { // from class: org.apache.qpid.server.AMQChannel.1
        @Override // java.lang.Runnable
        public void run() {
        }
    };
    private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0, 0);
    private long _deliveryTag = 0;
    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap();
    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<>();
    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
    private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet();
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private final AtomicLong _txnStarts = new AtomicLong(0);
    private final AtomicLong _txnCommits = new AtomicLong(0);
    private final AtomicLong _txnRejects = new AtomicLong(0);
    private final AtomicLong _txnCount = new AtomicLong(0);
    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
    private AtomicBoolean _closing = new AtomicBoolean(false);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private List<QueueEntry> _resendList = new ArrayList();
    private long _createTime = System.currentTimeMillis();
    private final String id = "(" + System.identityHashCode(this) + ")";
    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { // from class: org.apache.qpid.server.AMQChannel.3
        @Override // org.apache.qpid.server.subscription.RecordDeliveryMethod
        public void recordMessageDelivery(Subscription subscription, QueueEntry queueEntry, long j) {
            AMQChannel.this.addUnacknowledgedMessage(queueEntry, j, subscription);
        }
    };
    private LogSubject _logSubject = new ChannelLogSubject(this);
    private final UUID _qmfId = getConfigStore().createId();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$AsyncCommand.class */
    public static class AsyncCommand {
        private final StoreFuture _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(StoreFuture storeFuture, ServerTransaction.Action action) {
            this._future = storeFuture;
            this._action = action;
        }

        void awaitReadyForCompletion() {
            this._future.waitForCompletion();
        }

        void complete() {
            if (!this._future.isComplete()) {
                this._future.waitForCompletion();
            }
            this._action.postCommit();
            this._action = null;
        }

        boolean isReadyForCompletion() {
            return this._future.isComplete();
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$MessageAcknowledgeAction.class */
    private class MessageAcknowledgeAction implements ServerTransaction.Action {
        private final Collection<QueueEntry> _ackedMessages;

        public MessageAcknowledgeAction(Collection<QueueEntry> collection) {
            this._ackedMessages = collection;
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void postCommit() {
            try {
                Iterator<QueueEntry> it = this._ackedMessages.iterator();
                while (it.hasNext()) {
                    it.next().discard();
                }
            } finally {
                AMQChannel.this._acknowledgedMessages.clear();
            }
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void onRollback() {
            if (AMQChannel.this._rollingBack) {
                AMQChannel.this._resendList.addAll(this._ackedMessages);
                return;
            }
            try {
                Iterator<QueueEntry> it = this._ackedMessages.iterator();
                while (it.hasNext()) {
                    it.next().release();
                }
            } finally {
                AMQChannel.this._acknowledgedMessages.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$MessageDeliveryAction.class */
    public class MessageDeliveryAction implements ServerTransaction.Action {
        private IncomingMessage _incommingMessage;
        private List<? extends BaseQueue> _destinationQueues;

        /* loaded from: input_file:org/apache/qpid/server/AMQChannel$MessageDeliveryAction$ImmediateAction.class */
        private class ImmediateAction implements BaseQueue.PostEnqueueAction {
            private final BaseQueue _queue;

            public ImmediateAction(BaseQueue baseQueue) {
                this._queue = baseQueue;
            }

            @Override // org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction
            public void onEnqueue(QueueEntry queueEntry) {
                if (queueEntry.getDeliveredToConsumer() || !queueEntry.acquire()) {
                    return;
                }
                LocalTransaction localTransaction = new LocalTransaction(AMQChannel.this._messageStore);
                ArrayList arrayList = new ArrayList(1);
                arrayList.add(queueEntry);
                final AMQMessage aMQMessage = (AMQMessage) queueEntry.getMessage();
                localTransaction.dequeue(this._queue, queueEntry.getMessage(), new MessageAcknowledgeAction(arrayList) { // from class: org.apache.qpid.server.AMQChannel.MessageDeliveryAction.ImmediateAction.1
                    {
                        AMQChannel aMQChannel = AMQChannel.this;
                    }

                    @Override // org.apache.qpid.server.AMQChannel.MessageAcknowledgeAction, org.apache.qpid.server.txn.ServerTransaction.Action
                    public void postCommit() {
                        try {
                            AMQChannel.this._session.getProtocolOutputConverter().writeReturn(aMQMessage.getMessagePublishInfo(), aMQMessage.getContentHeaderBody(), aMQMessage, AMQChannel.this._channelId, AMQConstant.NO_CONSUMERS.getCode(), AMQChannel.IMMEDIATE_DELIVERY_REPLY_TEXT);
                            super.postCommit();
                        } catch (AMQException e) {
                            throw new RuntimeException((Throwable) e);
                        }
                    }
                });
                localTransaction.commit();
            }
        }

        public MessageDeliveryAction(IncomingMessage incomingMessage, List<? extends BaseQueue> list) {
            this._incommingMessage = incomingMessage;
            this._destinationQueues = list;
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void postCommit() {
            try {
                boolean isImmediate = this._incommingMessage.isImmediate();
                AMQMessage createAMQMessage = AMQChannel.this.createAMQMessage(this._incommingMessage);
                MessageReference newReference = createAMQMessage.newReference();
                for (int i = 0; i < this._destinationQueues.size(); i++) {
                    BaseQueue baseQueue = this._destinationQueues.get(i);
                    baseQueue.enqueue(createAMQMessage, AMQChannel.this.isTransactional(), isImmediate ? new ImmediateAction(baseQueue) : null);
                    if (baseQueue instanceof AMQQueue) {
                        ((AMQQueue) baseQueue).checkCapacity(AMQChannel.this);
                    }
                }
                this._incommingMessage.getStoredMessage().flushToStore();
                newReference.release();
            } catch (AMQException e) {
                throw new RuntimeException((Throwable) e);
            }
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void onRollback() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$WriteReturnAction.class */
    public class WriteReturnAction implements ServerTransaction.Action {
        private final AMQConstant _errorCode;
        private final IncomingMessage _message;
        private final String _description;

        public WriteReturnAction(AMQConstant aMQConstant, String str, IncomingMessage incomingMessage) {
            this._errorCode = aMQConstant;
            this._message = incomingMessage;
            this._description = str;
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void postCommit() {
            try {
                AMQChannel.this._session.getProtocolOutputConverter().writeReturn(this._message.getMessagePublishInfo(), this._message.getContentHeader(), this._message, AMQChannel.this._channelId, this._errorCode.getCode(), new AMQShortString(this._description));
            } catch (AMQException e) {
                throw new RuntimeException((Throwable) e);
            }
        }

        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
        public void onRollback() {
        }
    }

    public AMQChannel(AMQProtocolSession aMQProtocolSession, int i, MessageStore messageStore) throws AMQException {
        this._session = aMQProtocolSession;
        this._channelId = i;
        this._actor = new AMQPChannelActor(this, aMQProtocolSession.getLogActor().getRootMessageLogger());
        this._actor.message(ChannelMessages.CREATE());
        getConfigStore().addConfiguredObject(this);
        this._messageStore = messageStore;
        this._transaction = new AsyncAutoCommitTransaction(this._messageStore, this);
        this._clientDeliveryMethod = aMQProtocolSession.createDeliveryMethod(this._channelId);
    }

    public ConfigStore getConfigStore() {
        return getVirtualHost().getConfigStore();
    }

    public void setLocalTransactional() {
        this._transaction = new LocalTransaction(this._messageStore);
        this._txnStarts.incrementAndGet();
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    public void receivedComplete() {
        sync();
    }

    public boolean inTransaction() {
        return isTransactional() && this._txnUpdateTime.get() > 0 && this._transaction.getTransactionStartTime() > 0;
    }

    private void incrementOutstandingTxnsIfNecessary() {
        if (isTransactional()) {
            this._txnCount.compareAndSet(0L, 1L);
        }
    }

    private void decrementOutstandingTxnsIfNecessary() {
        if (isTransactional()) {
            this._txnCount.compareAndSet(1L, 0L);
        }
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public Long getTxnStarts() {
        return Long.valueOf(this._txnStarts.get());
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig, org.apache.qpid.server.protocol.AMQSessionModel
    public Long getTxnCommits() {
        return Long.valueOf(this._txnCommits.get());
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig, org.apache.qpid.server.protocol.AMQSessionModel
    public Long getTxnRejects() {
        return Long.valueOf(this._txnRejects.get());
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig, org.apache.qpid.server.protocol.AMQSessionModel
    public Long getTxnCount() {
        return Long.valueOf(this._txnCount.get());
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public Long getTxnStart() {
        return Long.valueOf(this._txnStarts.get());
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public int getChannelId() {
        return this._channelId;
    }

    public void setPublishFrame(MessagePublishInfo messagePublishInfo, Exchange exchange) throws AMQSecurityException {
        if (!getVirtualHost().getSecurityManager().authorisePublish(messagePublishInfo.isImmediate(), messagePublishInfo.getRoutingKey() == null ? null : messagePublishInfo.getRoutingKey().asString(), exchange.getName())) {
            throw new AMQSecurityException("Permission denied: " + exchange.getName());
        }
        this._currentMessage = new IncomingMessage(messagePublishInfo, getProtocolSession().getReference());
        this._currentMessage.setExchange(exchange);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("Content header received on channel " + this._channelId);
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this._currentMessage.setExpiration();
        this._currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
        this._currentMessage.route();
        deliverCurrentMessageIfComplete();
    }

    private void deliverCurrentMessageIfComplete() throws AMQException {
        if (this._currentMessage.allContentReceived()) {
            try {
                List<? extends BaseQueue> destinationQueues = this._currentMessage.getDestinationQueues();
                if (!checkMessageUserId(this._currentMessage.getContentHeader())) {
                    this._transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", this._currentMessage));
                } else if (destinationQueues != null && !destinationQueues.isEmpty()) {
                    final StoredMessage<MessageMetaData> addMessage = this._messageStore.addMessage(this._currentMessage.getMessageMetaData());
                    this._currentMessage.setStoredMessage(addMessage);
                    int bodyCount = this._currentMessage.getBodyCount();
                    if (bodyCount > 0) {
                        long j = 0;
                        for (int i = 0; i < bodyCount; i++) {
                            addMessage.addContent((int) j, ByteBuffer.wrap(this._currentMessage.getContentChunk(i).getData()));
                            j += r0.getSize();
                        }
                    }
                    this._transaction.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.AMQChannel.2
                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void postCommit() {
                        }

                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void onRollback() {
                            addMessage.remove();
                        }
                    });
                    this._transaction.enqueue(destinationQueues, this._currentMessage, new MessageDeliveryAction(this._currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
                    incrementOutstandingTxnsIfNecessary();
                    updateTransactionalActivity();
                    this._currentMessage.getStoredMessage().flushToStore();
                } else if (this._currentMessage.isMandatory() || this._currentMessage.isImmediate()) {
                    this._transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", this._currentMessage));
                } else {
                    this._actor.message(ExchangeMessages.DISCARDMSG(this._currentMessage.getExchange().asString(), this._currentMessage.getRoutingKey()));
                }
            } finally {
                this._session.registerMessageReceived(this._currentMessage.getSize(), this._currentMessage.getContentHeader().getProperties().getTimestamp());
                this._currentMessage = null;
            }
        }
    }

    public void publishContentBody(ContentBody contentBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug(debugIdentity() + "Content body received on channel " + this._channelId);
        }
        try {
            this._currentMessage.addContentBodyFrame(this._session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody));
            deliverCurrentMessageIfComplete();
        } catch (RuntimeException e) {
            this._currentMessage = null;
            throw e;
        } catch (AMQException e2) {
            this._currentMessage = null;
            throw e2;
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: org.apache.qpid.server.AMQChannel.getNextDeliveryTag():long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public long getNextDeliveryTag() {
        /*
            r6 = this;
            r0 = r6
            r1 = r0
            long r1 = r1._deliveryTag
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0._deliveryTag = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.qpid.server.AMQChannel.getNextDeliveryTag():long");
    }

    public int getNextConsumerTag() {
        int i = this._consumerTag + 1;
        this._consumerTag = i;
        return i;
    }

    public Subscription getSubscription(AMQShortString aMQShortString) {
        return this._tag2SubscriptionMap.get(aMQShortString);
    }

    public AMQShortString subscribeToQueue(AMQShortString aMQShortString, AMQQueue aMQQueue, boolean z, FieldTable fieldTable, boolean z2, boolean z3) throws AMQException {
        if (aMQShortString == null) {
            aMQShortString = new AMQShortString("sgen_" + getNextConsumerTag());
        }
        if (this._tag2SubscriptionMap.containsKey(aMQShortString)) {
            throw new AMQException("Consumer already exists with same tag: " + aMQShortString);
        }
        Subscription createSubscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(this._channelId, this._session, aMQShortString, z, fieldTable, z2, this._creditManager);
        this._tag2SubscriptionMap.put(aMQShortString, createSubscription);
        try {
            aMQQueue.registerSubscription(createSubscription, z3);
            return aMQShortString;
        } catch (RuntimeException e) {
            this._tag2SubscriptionMap.remove(aMQShortString);
            throw e;
        } catch (AMQException e2) {
            this._tag2SubscriptionMap.remove(aMQShortString);
            throw e2;
        }
    }

    public boolean unsubscribeConsumer(AMQShortString aMQShortString) throws AMQException {
        Subscription remove = this._tag2SubscriptionMap.remove(aMQShortString);
        if (remove == null) {
            _logger.warn("Attempt to unsubscribe consumer with tag '" + aMQShortString + "' which is not registered.");
            return false;
        }
        try {
            remove.getSendLock();
            remove.mo221getQueue().unregisterSubscription(remove);
            remove.releaseSendLock();
            return true;
        } catch (Throwable th) {
            remove.releaseSendLock();
            throw th;
        }
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public void close() throws AMQException {
        if (this._closing.compareAndSet(false, true)) {
            CurrentActor.get().message(this._logSubject, ChannelMessages.CLOSE());
            unsubscribeAllConsumers();
            this._transaction.rollback();
            try {
                requeue();
            } catch (TransportException e) {
                _logger.error("Caught TransportException whilst attempting to requeue:" + e);
            } catch (AMQException e2) {
                _logger.error("Caught AMQException whilst attempting to requeue:" + e2);
            }
            getConfigStore().removeConfiguredObject(this);
        }
    }

    private void unsubscribeAllConsumers() throws AMQException {
        if (_logger.isInfoEnabled()) {
            if (this._tag2SubscriptionMap.isEmpty()) {
                _logger.info("No consumers to unsubscribe on channel " + toString());
            } else {
                _logger.info("Unsubscribing all consumers on channel " + toString());
            }
        }
        for (Map.Entry<AMQShortString, Subscription> entry : this._tag2SubscriptionMap.entrySet()) {
            if (_logger.isInfoEnabled()) {
                _logger.info("Unsubscribing consumer '" + entry.getKey() + "' on channel " + toString());
            }
            Subscription value = entry.getValue();
            try {
                value.getSendLock();
                value.mo221getQueue().unregisterSubscription(value);
                value.releaseSendLock();
            } catch (Throwable th) {
                value.releaseSendLock();
                throw th;
            }
        }
        this._tag2SubscriptionMap.clear();
    }

    public void addUnacknowledgedMessage(QueueEntry queueEntry, long j, Subscription subscription) {
        if (_logger.isDebugEnabled()) {
            if (queueEntry.getQueue() == null) {
                _logger.debug("Adding unacked message with a null queue:" + queueEntry);
            } else if (_logger.isDebugEnabled()) {
                _logger.debug(debugIdentity() + " Adding unacked message(" + queueEntry.getMessage().toString() + " DT:" + j + ") with a queue(" + queueEntry.getQueue() + ") for " + subscription);
            }
        }
        this._unacknowledgedMessageMap.add(j, queueEntry);
    }

    public String debugIdentity() {
        return this._channelId + this.id;
    }

    public void requeue() throws AMQException {
        Collection<QueueEntry> cancelAllMessages = this._unacknowledgedMessageMap.cancelAllMessages();
        if (!cancelAllMessages.isEmpty() && _logger.isInfoEnabled()) {
            _logger.info("Requeuing " + cancelAllMessages.size() + " unacked messages. for " + toString());
        }
        for (QueueEntry queueEntry : cancelAllMessages) {
            if (queueEntry.isQueueDeleted()) {
                queueEntry.discard();
            } else {
                queueEntry.setRedelivered();
                queueEntry.release();
            }
        }
    }

    public void requeue(long j) throws AMQException {
        QueueEntry remove = this._unacknowledgedMessageMap.remove(j);
        if (remove == null) {
            _logger.warn("Requested requeue of message:" + j + " but no such delivery tag exists." + this._unacknowledgedMessageMap.size());
            return;
        }
        remove.setRedelivered();
        if (!remove.isQueueDeleted()) {
            remove.release();
        } else {
            _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + remove + "):" + j + " but no queue defined and no DeadLetter queue so DROPPING message.");
            remove.discard();
        }
    }

    public boolean isMaxDeliveryCountEnabled(long j) {
        QueueEntry queueEntry = this._unacknowledgedMessageMap.get(j);
        return queueEntry != null && queueEntry.getQueue().getMaximumDeliveryCount() > 0;
    }

    public boolean isDeliveredTooManyTimes(long j) {
        QueueEntry queueEntry = this._unacknowledgedMessageMap.get(j);
        if (queueEntry == null) {
            return false;
        }
        int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
        return maximumDeliveryCount != 0 && queueEntry.getDeliveryCount() >= maximumDeliveryCount;
    }

    public void resend(boolean z) throws AMQException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        if (_logger.isDebugEnabled()) {
            _logger.debug("unacked map Size:" + this._unacknowledgedMessageMap.size());
        }
        this._unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(this._unacknowledgedMessageMap, linkedHashMap, linkedHashMap2, z, this._messageStore));
        if (_logger.isDebugEnabled()) {
            if (linkedHashMap2.isEmpty()) {
                _logger.debug("No message to resend.");
            } else {
                _logger.debug("Preparing (" + linkedHashMap2.size() + ") message to resend.");
            }
        }
        for (Map.Entry entry : linkedHashMap2.entrySet()) {
            QueueEntry queueEntry = (QueueEntry) entry.getValue();
            long longValue = ((Long) entry.getKey()).longValue();
            queueEntry.decrementDeliveryCount();
            AMQQueue queue = queueEntry.getQueue();
            queueEntry.setRedelivered();
            Subscription deliveredSubscription = queueEntry.getDeliveredSubscription();
            if (deliveredSubscription == null) {
                if (_logger.isInfoEnabled()) {
                    _logger.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString() + ")to prevent loss");
                }
                linkedHashMap.put(Long.valueOf(longValue), queueEntry);
            } else if (!queue.resend(queueEntry, deliveredSubscription)) {
                linkedHashMap.put(Long.valueOf(longValue), queueEntry);
            }
        }
        if (_logger.isInfoEnabled() && !linkedHashMap.isEmpty()) {
            _logger.info("Preparing (" + linkedHashMap.size() + ") message to requeue to.");
        }
        for (Map.Entry entry2 : linkedHashMap.entrySet()) {
            QueueEntry queueEntry2 = (QueueEntry) entry2.getValue();
            long longValue2 = ((Long) entry2.getKey()).longValue();
            queueEntry2.decrementDeliveryCount();
            this._unacknowledgedMessageMap.remove(longValue2);
            queueEntry2.setRedelivered();
            queueEntry2.release();
        }
    }

    public void acknowledgeMessage(long j, boolean z) throws AMQException {
        Collection<QueueEntry> ackedMessages = getAckedMessages(j, z);
        this._transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
        updateTransactionalActivity();
    }

    private Collection<QueueEntry> getAckedMessages(long j, boolean z) {
        return this._unacknowledgedMessageMap.acknowledge(j, z);
    }

    public UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    public void setSuspended(boolean z) {
        boolean andSet = this._suspended.getAndSet(z);
        if (andSet != z) {
            if (!z) {
                this._actor.message(this._logSubject, ChannelMessages.FLOW("Started"));
            }
            if (andSet) {
                for (Subscription subscription : this._tag2SubscriptionMap.values()) {
                    subscription.mo221getQueue().deliverAsync(subscription);
                }
            }
            if (!andSet) {
                for (Subscription subscription2 : this._tag2SubscriptionMap.values()) {
                    try {
                        subscription2.getSendLock();
                        subscription2.releaseSendLock();
                    } catch (Throwable th) {
                        subscription2.releaseSendLock();
                        throw th;
                    }
                }
            }
            if (z) {
                this._actor.message(this._logSubject, ChannelMessages.FLOW("Stopped"));
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get() || this._closing.get() || this._session.isClosing();
    }

    public void commit() throws AMQException {
        commit(null);
    }

    public void commit(Runnable runnable) throws AMQException {
        if (!isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        this._transaction.commit(runnable);
        this._txnCommits.incrementAndGet();
        this._txnStarts.incrementAndGet();
        decrementOutstandingTxnsIfNecessary();
    }

    public void rollback() throws AMQException {
        rollback(NULL_TASK);
    }

    public void rollback(Runnable runnable) throws AMQException {
        if (!isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        this._rollingBack = true;
        boolean compareAndSet = this._suspended.compareAndSet(false, true);
        for (Subscription subscription : this._tag2SubscriptionMap.values()) {
            subscription.getSendLock();
            subscription.releaseSendLock();
        }
        try {
            this._transaction.rollback();
            this._rollingBack = false;
            this._txnRejects.incrementAndGet();
            this._txnStarts.incrementAndGet();
            decrementOutstandingTxnsIfNecessary();
            runnable.run();
            for (QueueEntry queueEntry : this._resendList) {
                Subscription deliveredSubscription = queueEntry.getDeliveredSubscription();
                if (deliveredSubscription == null || deliveredSubscription.isClosed()) {
                    queueEntry.release();
                } else {
                    deliveredSubscription.mo221getQueue().resend(queueEntry, deliveredSubscription);
                }
            }
            this._resendList.clear();
            if (compareAndSet) {
                this._suspended.set(false);
                for (Subscription subscription2 : this._tag2SubscriptionMap.values()) {
                    subscription2.mo221getQueue().deliverAsync(subscription2);
                }
            }
        } catch (Throwable th) {
            this._rollingBack = false;
            this._txnRejects.incrementAndGet();
            this._txnStarts.incrementAndGet();
            decrementOutstandingTxnsIfNecessary();
            throw th;
        }
    }

    private void updateTransactionalActivity() {
        if (isTransactional()) {
            this._txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
        }
    }

    public String toString() {
        return "[" + this._session.toString() + ":" + this._channelId + "]";
    }

    public void setDefaultQueue(AMQQueue aMQQueue) {
        this._defaultQueue = aMQQueue;
    }

    public AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public boolean isClosing() {
        return this._closing.get();
    }

    public AMQProtocolSession getProtocolSession() {
        return this._session;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    public void setCredit(long j, int i) {
        this._actor.message(ChannelMessages.PREFETCH_SIZE(Long.valueOf(j), Integer.valueOf(i)));
        this._creditManager.setCreditLimits(j, i);
    }

    public MessageStore getMessageStore() {
        return this._messageStore;
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public RecordDeliveryMethod getRecordDeliveryMethod() {
        return this._recordDeliveryMethod;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AMQMessage createAMQMessage(IncomingMessage incomingMessage) throws AMQException {
        AMQMessage aMQMessage = new AMQMessage(incomingMessage.getStoredMessage());
        aMQMessage.setExpiration(incomingMessage.getExpiration());
        aMQMessage.setConnectionIdentifier(this._session.getReference());
        return aMQMessage;
    }

    private boolean checkMessageUserId(ContentHeaderBody contentHeaderBody) {
        AMQShortString userId = contentHeaderBody.getProperties() instanceof BasicContentHeaderProperties ? contentHeaderBody.getProperties().getUserId() : null;
        if (MSG_AUTH) {
            if (!this._session.getAuthorizedPrincipal().getName().equals(userId == null ? "" : userId.toString())) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public AMQConnectionModel getConnectionModel() {
        return this._session;
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public String getClientID() {
        return String.valueOf(this._session.getContextKey());
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    public LogActor getLogActor() {
        return this._actor;
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public synchronized void block() {
        if (this._blockingEntities.add(this) && this._blocking.compareAndSet(false, true)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
            flow(false);
        }
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public synchronized void unblock() {
        if (this._blockingEntities.remove(this) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            flow(true);
        }
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public synchronized void block(AMQQueue aMQQueue) {
        if (this._blockingEntities.add(aMQQueue) && this._blocking.compareAndSet(false, true)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_ENFORCED(aMQQueue.getNameShortString().toString()));
            flow(false);
        }
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public synchronized void unblock(AMQQueue aMQQueue) {
        if (this._blockingEntities.remove(aMQQueue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !isClosing()) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            flow(true);
        }
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public boolean onSameConnection(InboundMessage inboundMessage) {
        return (inboundMessage instanceof IncomingMessage) && getProtocolSession().getReference() == ((IncomingMessage) inboundMessage).getConnectionReference();
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public int getUnacknowledgedMessageCount() {
        return getUnacknowledgedMessageMap().size();
    }

    private void flow(boolean z) {
        this._session.writeFrame(this._session.getMethodRegistry().createChannelFlowBody(z).generateFrame(this._channelId));
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public boolean getBlocking() {
        return this._blocking.get();
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public VirtualHost getVirtualHost() {
        return getProtocolSession().getVirtualHost();
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public ConfiguredObject<SessionConfigType, SessionConfig> getParent() {
        return getVirtualHost();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public SessionConfigType getConfigType() {
        return SessionConfigType.getInstance();
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public int getChannel() {
        return getChannelId();
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public boolean isAttached() {
        return true;
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public long getDetachedLifespan() {
        return 0L;
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public ConnectionConfig getConnectionConfig() {
        return (AMQProtocolEngine) getProtocolSession();
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public Long getExpiryTime() {
        return null;
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public Long getMaxClientRate() {
        return null;
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public boolean isDurable() {
        return false;
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public UUID getQMFId() {
        return this._qmfId;
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public String getSessionName() {
        return getConnectionConfig().getAddress() + "/" + getChannelId();
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public long getCreateTime() {
        return this._createTime;
    }

    @Override // org.apache.qpid.server.configuration.SessionConfig
    public void mgmtClose() throws AMQException {
        this._session.mgmtCloseChannel(this._channelId);
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public void checkTransactionStatus(long j, long j2, long j3, long j4) throws AMQException {
        if (inTransaction()) {
            long currentTimeMillis = System.currentTimeMillis();
            long transactionStartTime = currentTimeMillis - this._transaction.getTransactionStartTime();
            long j5 = currentTimeMillis - this._txnUpdateTime.get();
            if (j3 > 0 && j5 > j3) {
                CurrentActor.get().message(this._logSubject, ChannelMessages.IDLE_TXN(Long.valueOf(j5)));
                _logger.warn("IDLE TRANSACTION ALERT " + this._logSubject.toString() + " " + j5 + " ms");
            } else if (j > 0 && transactionStartTime > j) {
                CurrentActor.get().message(this._logSubject, ChannelMessages.OPEN_TXN(Long.valueOf(transactionStartTime)));
                _logger.warn("OPEN TRANSACTION ALERT " + this._logSubject.toString() + " " + transactionStartTime + " ms");
            }
            if (j4 > 0 && j5 > j4) {
                closeConnection("Idle transaction timed out");
            } else {
                if (j2 <= 0 || transactionStartTime <= j2) {
                    return;
                }
                closeConnection("Open transaction timed out");
            }
        }
    }

    private void closeConnection(String str) throws AMQException {
        Lock receivedLock = this._session.getReceivedLock();
        receivedLock.lock();
        try {
            this._session.close(AMQConstant.RESOURCE_ERROR, str);
            receivedLock.unlock();
        } catch (Throwable th) {
            receivedLock.unlock();
            throw th;
        }
    }

    public void deadLetter(long j) throws AMQException {
        UnacknowledgedMessageMap unacknowledgedMessageMap = getUnacknowledgedMessageMap();
        QueueEntry queueEntry = unacknowledgedMessageMap.get(j);
        if (queueEntry == null) {
            _logger.warn("No message found, unable to DLQ delivery tag: " + j);
            return;
        }
        ServerMessage message = queueEntry.getMessage();
        AMQQueue queue = queueEntry.getQueue();
        Exchange alternateExchange = queue.getAlternateExchange();
        unacknowledgedMessageMap.remove(j);
        if (alternateExchange == null) {
            _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + j);
            this._actor.message(this._logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(Long.valueOf(message.getMessageNumber()), queue.getName(), message.getRoutingKey()));
            queueEntry.discard();
            return;
        }
        List<? extends BaseQueue> route = alternateExchange.route(new InboundMessageAdapter(queueEntry));
        if (route == null || route.isEmpty()) {
            _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + j);
            this._actor.message(this._logSubject, ChannelMessages.DISCARDMSG_NOROUTE(Long.valueOf(message.getMessageNumber()), alternateExchange.getName()));
            queueEntry.discard();
        } else {
            queueEntry.routeToAlternate();
            Iterator<? extends BaseQueue> it = route.iterator();
            while (it.hasNext()) {
                this._actor.message(this._logSubject, ChannelMessages.DEADLETTERMSG(Long.valueOf(message.getMessageNumber()), it.next().getNameShortString().asString()));
            }
        }
    }

    @Override // org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder
    public void recordFuture(StoreFuture storeFuture, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(storeFuture, action));
    }

    public void sync() {
        while (true) {
            AsyncCommand poll = this._unfinishedCommandsQueue.poll();
            if (poll == null) {
                return;
            }
            poll.awaitReadyForCompletion();
            poll.complete();
        }
    }

    @Override // java.lang.Comparable
    public int compareTo(AMQSessionModel aMQSessionModel) {
        return getQMFId().compareTo(aMQSessionModel.getQMFId());
    }

    @Override // org.apache.qpid.server.protocol.AMQSessionModel
    public int getConsumerCount() {
        return this._tag2SubscriptionMap.size();
    }
}
