/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.ExtractResendAndRequeue;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQChannel
implements AMQSessionModel,
AsyncAutoCommitTransaction.FutureRecorder {
    public static final int DEFAULT_PREFETCH = 4096;
    private static final Logger _logger = Logger.getLogger(AMQChannel.class);
    private static final boolean MSG_AUTH = ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
    private final int _channelId;
    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0L, 0L);
    private long _deliveryTag = 0L;
    private AMQQueue _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
    private final MessageStore _messageStore;
    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList();
    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(4096);
    private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private ServerTransaction _transaction;
    private final AtomicLong _txnStarts = new AtomicLong(0L);
    private final AtomicLong _txnCommits = new AtomicLong(0L);
    private final AtomicLong _txnRejects = new AtomicLong(0L);
    private final AtomicLong _txnCount = new AtomicLong(0L);
    private final AtomicLong _txnUpdateTime = new AtomicLong(0L);
    private final AMQProtocolSession _session;
    private AtomicBoolean _closing = new AtomicBoolean(false);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private LogActor _actor;
    private LogSubject _logSubject;
    private volatile boolean _rollingBack;
    private static final Runnable NULL_TASK = new Runnable(){

        public void run() {
        }
    };
    private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
    private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
    private long _createTime = System.currentTimeMillis();
    private final ClientDeliveryMethod _clientDeliveryMethod;
    private final TransactionTimeoutHelper _transactionTimeoutHelper;
    private final UUID _id = UUID.randomUUID();
    private final String id = "(" + System.identityHashCode(this) + ")";
    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod(){

        public void recordMessageDelivery(Subscription sub, QueueEntry entry, long deliveryTag) {
            AMQChannel.this.addUnacknowledgedMessage(entry, deliveryTag, sub);
        }
    };

    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException {
        this._session = session;
        this._channelId = channelId;
        this._actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
        this._logSubject = new ChannelLogSubject(this);
        this._actor.message(ChannelMessages.CREATE());
        this._messageStore = messageStore;
        this._transaction = new AsyncAutoCommitTransaction(this._messageStore, this);
        this._clientDeliveryMethod = session.createDeliveryMethod(this._channelId);
        this._transactionTimeoutHelper = new TransactionTimeoutHelper(this._logSubject);
    }

    public void setLocalTransactional() {
        this._transaction = new LocalTransaction(this._messageStore);
        this._txnStarts.incrementAndGet();
    }

    public boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    public void receivedComplete() {
        this.sync();
    }

    private void incrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(0L, 1L);
        }
    }

    private void decrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(1L, 0L);
        }
    }

    public Long getTxnStarts() {
        return this._txnStarts.get();
    }

    @Override
    public Long getTxnCommits() {
        return this._txnCommits.get();
    }

    @Override
    public Long getTxnRejects() {
        return this._txnRejects.get();
    }

    @Override
    public Long getTxnCount() {
        return this._txnCount.get();
    }

    @Override
    public Long getTxnStart() {
        return this._txnStarts.get();
    }

    @Override
    public int getChannelId() {
        return this._channelId;
    }

    public void setPublishFrame(MessagePublishInfo info, Exchange e) throws AMQSecurityException {
        String routingKey;
        String string = routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
        if (!this.getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) {
            throw new AMQSecurityException("Permission denied: " + e.getName());
        }
        this._currentMessage = new IncomingMessage(info, this.getProtocolSession().getReference());
        this._currentMessage.setExchange(e);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Content header received on channel " + this._channelId));
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this._currentMessage.setExpiration();
        this._currentMessage.headersReceived(this.getProtocolSession().getLastReceivedTime());
        this._currentMessage.route();
        this.deliverCurrentMessageIfComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverCurrentMessageIfComplete() throws AMQException {
        if (this._currentMessage.allContentReceived()) {
            try {
                List<? extends BaseQueue> destinationQueues = this._currentMessage.getDestinationQueues();
                if (!this.checkMessageUserId(this._currentMessage.getContentHeader())) {
                    this._transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", this._currentMessage));
                } else if (destinationQueues == null || destinationQueues.isEmpty()) {
                    if (this._currentMessage.isMandatory() || this._currentMessage.isImmediate()) {
                        this._transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", this._currentMessage));
                    } else {
                        this._actor.message(ExchangeMessages.DISCARDMSG(this._currentMessage.getExchange().asString(), this._currentMessage.getRoutingKey()));
                    }
                } else {
                    final StoredMessage<MessageMetaData> handle = this._messageStore.addMessage(this._currentMessage.getMessageMetaData());
                    this._currentMessage.setStoredMessage(handle);
                    int bodyCount = this._currentMessage.getBodyCount();
                    if (bodyCount > 0) {
                        long bodyLengthReceived = 0L;
                        for (int i = 0; i < bodyCount; ++i) {
                            ContentChunk contentChunk = this._currentMessage.getContentChunk(i);
                            handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
                            bodyLengthReceived += (long)contentChunk.getSize();
                        }
                    }
                    this._transaction.addPostTransactionAction(new ServerTransaction.Action(){

                        public void postCommit() {
                        }

                        public void onRollback() {
                            handle.remove();
                        }
                    });
                    this._transaction.enqueue(destinationQueues, this._currentMessage, new MessageDeliveryAction(this._currentMessage, destinationQueues), this.getProtocolSession().getLastReceivedTime());
                    this.incrementOutstandingTxnsIfNecessary();
                    this.updateTransactionalActivity();
                    this._currentMessage.getStoredMessage().flushToStore();
                }
            }
            finally {
                long bodySize = this._currentMessage.getSize();
                long timestamp = ((BasicContentHeaderProperties)this._currentMessage.getContentHeader().getProperties()).getTimestamp();
                this._session.registerMessageReceived(bodySize, timestamp);
                this._currentMessage = null;
            }
        }
    }

    public void publishContentBody(ContentBody contentBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)(this.debugIdentity() + "Content body received on channel " + this._channelId));
        }
        try {
            ContentChunk contentChunk = this._session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk((AMQBody)contentBody);
            this._currentMessage.addContentBodyFrame(contentChunk);
            this.deliverCurrentMessageIfComplete();
        }
        catch (AMQException e) {
            this._currentMessage = null;
            throw e;
        }
        catch (RuntimeException e) {
            this._currentMessage = null;
            throw e;
        }
    }

    public long getNextDeliveryTag() {
        return ++this._deliveryTag;
    }

    public int getNextConsumerTag() {
        return ++this._consumerTag;
    }

    public Subscription getSubscription(AMQShortString subscription) {
        return this._tag2SubscriptionMap.get(subscription);
    }

    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException {
        if (tag == null) {
            tag = new AMQShortString("sgen_" + this.getNextConsumerTag());
        }
        if (this._tag2SubscriptionMap.containsKey(tag)) {
            throw new AMQException("Consumer already exists with same tag: " + tag);
        }
        Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(this._channelId, this._session, tag, acks, filters, noLocal, this._creditManager);
        this._tag2SubscriptionMap.put(tag, subscription);
        try {
            queue.registerSubscription(subscription, exclusive);
        }
        catch (AMQException e) {
            this._tag2SubscriptionMap.remove(tag);
            throw e;
        }
        catch (RuntimeException e) {
            this._tag2SubscriptionMap.remove(tag);
            throw e;
        }
        return tag;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException {
        Subscription sub = this._tag2SubscriptionMap.remove(consumerTag);
        if (sub != null) {
            try {
                sub.getSendLock();
                sub.getQueue().unregisterSubscription(sub);
            }
            finally {
                sub.releaseSendLock();
            }
            return true;
        }
        _logger.warn((Object)("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."));
        return false;
    }

    @Override
    public void close() throws AMQException {
        if (!this._closing.compareAndSet(false, true)) {
            return;
        }
        CurrentActor.get().message(this._logSubject, ChannelMessages.CLOSE());
        this.unsubscribeAllConsumers();
        this._transaction.rollback();
        try {
            this.requeue();
        }
        catch (AMQException e) {
            _logger.error((Object)("Caught AMQException whilst attempting to requeue:" + (Object)((Object)e)));
        }
        catch (TransportException e) {
            _logger.error((Object)("Caught TransportException whilst attempting to requeue:" + (Object)((Object)e)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsubscribeAllConsumers() throws AMQException {
        if (_logger.isInfoEnabled()) {
            if (!this._tag2SubscriptionMap.isEmpty()) {
                _logger.info((Object)("Unsubscribing all consumers on channel " + this.toString()));
            } else {
                _logger.info((Object)("No consumers to unsubscribe on channel " + this.toString()));
            }
        }
        for (Map.Entry<AMQShortString, Subscription> me : this._tag2SubscriptionMap.entrySet()) {
            if (_logger.isInfoEnabled()) {
                _logger.info((Object)("Unsubscribing consumer '" + me.getKey() + "' on channel " + this.toString()));
            }
            Subscription sub = me.getValue();
            try {
                sub.getSendLock();
                sub.getQueue().unregisterSubscription(sub);
            }
            finally {
                sub.releaseSendLock();
            }
        }
        this._tag2SubscriptionMap.clear();
    }

    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) {
        if (_logger.isDebugEnabled()) {
            if (entry.getQueue() == null) {
                _logger.debug((Object)("Adding unacked message with a null queue:" + entry));
            } else if (_logger.isDebugEnabled()) {
                _logger.debug((Object)(this.debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") with a queue(" + entry.getQueue() + ") for " + subscription));
            }
        }
        this._unacknowledgedMessageMap.add(deliveryTag, entry);
    }

    public String debugIdentity() {
        return this._channelId + this.id;
    }

    public void requeue() throws AMQException {
        Collection<QueueEntry> messagesToBeDelivered = this._unacknowledgedMessageMap.cancelAllMessages();
        if (!messagesToBeDelivered.isEmpty() && _logger.isInfoEnabled()) {
            _logger.info((Object)("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + this.toString()));
        }
        for (QueueEntry unacked : messagesToBeDelivered) {
            if (!unacked.isQueueDeleted()) {
                unacked.setRedelivered();
                unacked.release();
                continue;
            }
            unacked.discard();
        }
    }

    public void requeue(long deliveryTag) throws AMQException {
        QueueEntry unacked = this._unacknowledgedMessageMap.remove(deliveryTag);
        if (unacked != null) {
            unacked.setRedelivered();
            if (!unacked.isQueueDeleted()) {
                unacked.release();
            } else {
                _logger.warn((Object)(System.identityHashCode(this) + " Requested requeue of message(" + unacked + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."));
                unacked.discard();
            }
        } else {
            _logger.warn((Object)("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + this._unacknowledgedMessageMap.size()));
        }
    }

    public boolean isMaxDeliveryCountEnabled(long deliveryTag) {
        QueueEntry queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
            return maximumDeliveryCount > 0;
        }
        return false;
    }

    public boolean isDeliveredTooManyTimes(long deliveryTag) {
        QueueEntry queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
            int numDeliveries = queueEntry.getDeliveryCount();
            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
        }
        return false;
    }

    public void resend(boolean requeue) throws AMQException {
        long deliveryTag;
        QueueEntry message;
        LinkedHashMap<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
        LinkedHashMap<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("unacked map Size:" + this._unacknowledgedMessageMap.size()));
        }
        this._unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(this._unacknowledgedMessageMap, msgToRequeue, msgToResend, requeue, this._messageStore));
        if (_logger.isDebugEnabled()) {
            if (!msgToResend.isEmpty()) {
                _logger.debug((Object)("Preparing (" + msgToResend.size() + ") message to resend."));
            } else {
                _logger.debug((Object)"No message to resend.");
            }
        }
        for (Map.Entry entry : msgToResend.entrySet()) {
            message = (QueueEntry)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            AMQQueue queue = message.getQueue();
            message.setRedelivered();
            Subscription sub = message.getDeliveredSubscription();
            if (sub != null) {
                if (queue.resend(message, sub)) continue;
                msgToRequeue.put(deliveryTag, message);
                continue;
            }
            if (_logger.isInfoEnabled()) {
                _logger.info((Object)("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"));
            }
            msgToRequeue.put(deliveryTag, message);
        }
        if (_logger.isInfoEnabled() && !msgToRequeue.isEmpty()) {
            _logger.info((Object)("Preparing (" + msgToRequeue.size() + ") message to requeue to."));
        }
        for (Map.Entry entry : msgToRequeue.entrySet()) {
            message = (QueueEntry)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            this._unacknowledgedMessageMap.remove(deliveryTag);
            message.setRedelivered();
            message.release();
        }
    }

    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException {
        Collection<QueueEntry> ackedMessages = this.getAckedMessages(deliveryTag, multiple);
        this._transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
        this.updateTransactionalActivity();
    }

    private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) {
        return this._unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
    }

    public UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setSuspended(boolean suspended) {
        boolean wasSuspended = this._suspended.getAndSet(suspended);
        if (wasSuspended != suspended) {
            if (!suspended) {
                this._actor.message(this._logSubject, ChannelMessages.FLOW("Started"));
            }
            if (wasSuspended) {
                for (Subscription s : this._tag2SubscriptionMap.values()) {
                    s.getQueue().deliverAsync(s);
                }
            }
            if (!wasSuspended) {
                for (Subscription s : this._tag2SubscriptionMap.values()) {
                    try {
                        s.getSendLock();
                    }
                    finally {
                        s.releaseSendLock();
                    }
                }
            }
            if (suspended) {
                this._actor.message(this._logSubject, ChannelMessages.FLOW("Stopped"));
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get() || this._closing.get() || this._session.isClosing();
    }

    public void commit() throws AMQException {
        this.commit(null, false);
    }

    public void commit(final Runnable immediateAction, boolean async) throws AMQException {
        if (!this.isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        if (async && this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).commitAsync(new Runnable(){

                public void run() {
                    immediateAction.run();
                    AMQChannel.this._txnCommits.incrementAndGet();
                    AMQChannel.this._txnStarts.incrementAndGet();
                    AMQChannel.this.decrementOutstandingTxnsIfNecessary();
                    AMQChannel.this._txnUpdateTime.set(0L);
                }
            });
        } else {
            this._transaction.commit(immediateAction);
            this._txnCommits.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this._txnUpdateTime.set(0L);
            this.decrementOutstandingTxnsIfNecessary();
        }
    }

    public void rollback() throws AMQException {
        this.rollback(NULL_TASK);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback(Runnable postRollbackTask) throws AMQException {
        if (!this.isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        this._rollingBack = true;
        boolean requiresSuspend = this._suspended.compareAndSet(false, true);
        for (Subscription sub : this._tag2SubscriptionMap.values()) {
            sub.getSendLock();
            sub.releaseSendLock();
        }
        try {
            this._transaction.rollback();
        }
        finally {
            this._rollingBack = false;
            this._txnRejects.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this._txnUpdateTime.set(0L);
            this.decrementOutstandingTxnsIfNecessary();
        }
        postRollbackTask.run();
        for (QueueEntry entry : this._resendList) {
            Subscription sub = entry.getDeliveredSubscription();
            if (sub == null || sub.isClosed()) {
                entry.release();
                continue;
            }
            sub.getQueue().resend(entry, sub);
        }
        this._resendList.clear();
        if (requiresSuspend) {
            this._suspended.set(false);
            for (Subscription sub : this._tag2SubscriptionMap.values()) {
                sub.getQueue().deliverAsync(sub);
            }
        }
    }

    private void updateTransactionalActivity() {
        if (this.isTransactional()) {
            this._txnUpdateTime.set(this.getProtocolSession().getLastReceivedTime());
        }
    }

    public String toString() {
        return "[" + this._session.toString() + ":" + this._channelId + "]";
    }

    public void setDefaultQueue(AMQQueue queue) {
        this._defaultQueue = queue;
    }

    public AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public boolean isClosing() {
        return this._closing.get();
    }

    public AMQProtocolSession getProtocolSession() {
        return this._session;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    public void setCredit(long prefetchSize, int prefetchCount) {
        this._actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
        this._creditManager.setCreditLimits(prefetchSize, prefetchCount);
    }

    public MessageStore getMessageStore() {
        return this._messageStore;
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public RecordDeliveryMethod getRecordDeliveryMethod() {
        return this._recordDeliveryMethod;
    }

    private AMQMessage createAMQMessage(IncomingMessage incomingMessage) throws AMQException {
        AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
        message.setExpiration(incomingMessage.getExpiration());
        message.setConnectionIdentifier(this._session.getReference());
        return message;
    }

    private boolean checkMessageUserId(ContentHeaderBody header) {
        AMQShortString userID;
        AMQShortString aMQShortString = userID = header.getProperties() instanceof BasicContentHeaderProperties ? ((BasicContentHeaderProperties)header.getProperties()).getUserId() : null;
        return !MSG_AUTH || this._session.getAuthorizedPrincipal().getName().equals(userID == null ? "" : userID.toString());
    }

    @Override
    public UUID getId() {
        return this._id;
    }

    @Override
    public AMQConnectionModel getConnectionModel() {
        return this._session;
    }

    @Override
    public String getClientID() {
        return String.valueOf(this._session.getContextKey());
    }

    @Override
    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    @Override
    public int compareTo(AMQSessionModel o) {
        return this.getId().compareTo(o.getId());
    }

    public LogActor getLogActor() {
        return this._actor;
    }

    @Override
    public synchronized void block() {
        if (this._blockingEntities.add(this) && this._blocking.compareAndSet(false, true)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
            this.flow(false);
        }
    }

    @Override
    public synchronized void unblock() {
        if (this._blockingEntities.remove(this) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.flow(true);
        }
    }

    @Override
    public synchronized void block(AMQQueue queue) {
        if (this._blockingEntities.add(queue) && this._blocking.compareAndSet(false, true)) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
            this.flow(false);
        }
    }

    @Override
    public synchronized void unblock(AMQQueue queue) {
        if (this._blockingEntities.remove(queue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !this.isClosing()) {
            this._actor.message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.flow(true);
        }
    }

    @Override
    public boolean onSameConnection(InboundMessage inbound) {
        if (inbound instanceof IncomingMessage) {
            IncomingMessage incoming = (IncomingMessage)inbound;
            return this.getProtocolSession().getReference() == incoming.getConnectionReference();
        }
        return false;
    }

    @Override
    public int getUnacknowledgedMessageCount() {
        return this.getUnacknowledgedMessageMap().size();
    }

    private void flow(boolean flow) {
        MethodRegistry methodRegistry = this._session.getMethodRegistry();
        ChannelFlowBody responseBody = methodRegistry.createChannelFlowBody(flow);
        this._session.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
    }

    @Override
    public boolean getBlocking() {
        return this._blocking.get();
    }

    public VirtualHost getVirtualHost() {
        return this.getProtocolSession().getVirtualHost();
    }

    public int getChannel() {
        return this.getChannelId();
    }

    public boolean isDurable() {
        return false;
    }

    public long getCreateTime() {
        return this._createTime;
    }

    public void mgmtClose() throws AMQException {
        this._session.mgmtCloseChannel(this._channelId);
    }

    @Override
    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException {
        long transactionStartTime = this._transaction.getTransactionStartTime();
        long transactionUpdateTime = this._txnUpdateTime.get();
        if (this.isTransactional() && transactionUpdateTime > 0L && transactionStartTime > 0L) {
            long currentTime = System.currentTimeMillis();
            long openTime = currentTime - transactionStartTime;
            long idleTime = currentTime - transactionUpdateTime;
            this._transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), "IDLE TRANSACTION ALERT");
            if (this._transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) {
                this.closeConnection("Idle transaction timed out");
                return;
            }
            this._transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), "OPEN TRANSACTION ALERT");
            if (this._transactionTimeoutHelper.isTimedOut(openTime, openClose)) {
                this.closeConnection("Open transaction timed out");
                return;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(String reason) throws AMQException {
        Lock receivedLock = this._session.getReceivedLock();
        receivedLock.lock();
        try {
            this._session.close(AMQConstant.RESOURCE_ERROR, reason);
        }
        finally {
            receivedLock.unlock();
        }
    }

    public void deadLetter(long deliveryTag) throws AMQException {
        UnacknowledgedMessageMap unackedMap = this.getUnacknowledgedMessageMap();
        QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
        if (rejectedQueueEntry == null) {
            _logger.warn((Object)("No message found, unable to DLQ delivery tag: " + deliveryTag));
            return;
        }
        ServerMessage msg = rejectedQueueEntry.getMessage();
        AMQQueue queue = rejectedQueueEntry.getQueue();
        Exchange altExchange = queue.getAlternateExchange();
        unackedMap.remove(deliveryTag);
        if (altExchange == null) {
            _logger.debug((Object)("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag));
            this._actor.message(this._logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
            rejectedQueueEntry.discard();
            return;
        }
        InboundMessageAdapter m = new InboundMessageAdapter(rejectedQueueEntry);
        List<? extends BaseQueue> destinationQueues = altExchange.route(m);
        if (destinationQueues == null || destinationQueues.isEmpty()) {
            _logger.debug((Object)("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag));
            this._actor.message(this._logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
            rejectedQueueEntry.discard();
            return;
        }
        rejectedQueueEntry.routeToAlternate();
        for (BaseQueue baseQueue : destinationQueues) {
            this._actor.message(this._logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), baseQueue.getNameShortString().asString()));
        }
    }

    @Override
    public void recordFuture(StoreFuture future, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    public void sync() {
        AsyncCommand cmd;
        while ((cmd = this._unfinishedCommandsQueue.poll()) != null) {
            cmd.awaitReadyForCompletion();
            cmd.complete();
        }
        if (this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).sync();
        }
    }

    @Override
    public int getConsumerCount() {
        return this._tag2SubscriptionMap.size();
    }

    private static class AsyncCommand {
        private final StoreFuture _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(StoreFuture future, ServerTransaction.Action action) {
            this._future = future;
            this._action = action;
        }

        void awaitReadyForCompletion() {
            this._future.waitForCompletion();
        }

        void complete() {
            if (!this._future.isComplete()) {
                this._future.waitForCompletion();
            }
            this._action.postCommit();
            this._action = null;
        }

        boolean isReadyForCompletion() {
            return this._future.isComplete();
        }
    }

    private class WriteReturnAction
    implements ServerTransaction.Action {
        private final AMQConstant _errorCode;
        private final IncomingMessage _message;
        private final String _description;

        public WriteReturnAction(AMQConstant errorCode, String description, IncomingMessage message) {
            this._errorCode = errorCode;
            this._message = message;
            this._description = description;
        }

        public void postCommit() {
            try {
                AMQChannel.this._session.getProtocolOutputConverter().writeReturn(this._message.getMessagePublishInfo(), this._message.getContentHeader(), this._message, AMQChannel.this._channelId, this._errorCode.getCode(), new AMQShortString(this._description));
            }
            catch (AMQException e) {
                throw new RuntimeException(e);
            }
        }

        public void onRollback() {
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class MessageAcknowledgeAction
    implements ServerTransaction.Action {
        private final Collection<QueueEntry> _ackedMessages;

        public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) {
            this._ackedMessages = ackedMessages;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void postCommit() {
            try {
                for (QueueEntry entry : this._ackedMessages) {
                    entry.discard();
                }
            }
            finally {
                AMQChannel.this._acknowledgedMessages.clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onRollback() {
            if (AMQChannel.this._rollingBack) {
                AMQChannel.this._resendList.addAll(this._ackedMessages);
            } else {
                try {
                    for (QueueEntry entry : this._ackedMessages) {
                        entry.release();
                    }
                }
                finally {
                    AMQChannel.this._acknowledgedMessages.clear();
                }
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class MessageDeliveryAction
    implements ServerTransaction.Action {
        private IncomingMessage _incommingMessage;
        private List<? extends BaseQueue> _destinationQueues;

        public MessageDeliveryAction(IncomingMessage currentMessage, List<? extends BaseQueue> destinationQueues) {
            this._incommingMessage = currentMessage;
            this._destinationQueues = destinationQueues;
        }

        @Override
        public void postCommit() {
            try {
                boolean immediate = this._incommingMessage.isImmediate();
                AMQMessage amqMessage = AMQChannel.this.createAMQMessage(this._incommingMessage);
                MessageReference ref = amqMessage.newReference();
                for (int i = 0; i < this._destinationQueues.size(); ++i) {
                    BaseQueue queue = this._destinationQueues.get(i);
                    ImmediateAction action = immediate ? new ImmediateAction(queue) : null;
                    queue.enqueue(amqMessage, AMQChannel.this.isTransactional(), action);
                    if (!(queue instanceof AMQQueue)) continue;
                    ((AMQQueue)queue).checkCapacity(AMQChannel.this);
                }
                this._incommingMessage.getStoredMessage().flushToStore();
                ref.release();
            }
            catch (AMQException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void onRollback() {
        }

        private class ImmediateAction
        implements BaseQueue.PostEnqueueAction {
            private final BaseQueue _queue;

            public ImmediateAction(BaseQueue queue) {
                this._queue = queue;
            }

            public void onEnqueue(QueueEntry entry) {
                if (!entry.getDeliveredToConsumer() && entry.acquire()) {
                    LocalTransaction txn = new LocalTransaction(AMQChannel.this._messageStore);
                    ArrayList<QueueEntry> entries = new ArrayList<QueueEntry>(1);
                    entries.add(entry);
                    final AMQMessage message = (AMQMessage)entry.getMessage();
                    txn.dequeue(this._queue, entry.getMessage(), new MessageAcknowledgeAction(entries){

                        public void postCommit() {
                            try {
                                ProtocolOutputConverter outputConverter = AMQChannel.this._session.getProtocolOutputConverter();
                                outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), message, AMQChannel.this._channelId, AMQConstant.NO_CONSUMERS.getCode(), IMMEDIATE_DELIVERY_REPLY_TEXT);
                            }
                            catch (AMQException e) {
                                throw new RuntimeException(e);
                            }
                            super.postCommit();
                        }
                    });
                    txn.commit();
                }
            }
        }
    }
}

