package org.apache.qpid.server.queue;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.util.MessageQueue;

/* loaded from: input_file:org/apache/qpid/server/queue/SubscriptionImpl.class */
public class SubscriptionImpl implements Subscription {
    public final AMQChannel channel;
    public final AMQProtocolSession protocolSession;
    public final AMQShortString consumerTag;
    private final Object _sessionKey;
    private MessageQueue<QueueEntry> _messages;
    private Queue<QueueEntry> _resendQueue;
    private final boolean _noLocal;
    private final boolean _acks;
    private FilterManager _filters;
    private final boolean _isBrowser;
    private final Boolean _autoClose;
    private boolean _sentClose;
    private AMQQueue _queue;
    private final AtomicBoolean _sendLock;
    private String id;
    private static final Logger _suspensionlogger = Logger.getLogger("Suspension");
    private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
    private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();

    /* loaded from: input_file:org/apache/qpid/server/queue/SubscriptionImpl$Factory.class */
    public static class Factory implements SubscriptionFactory {
        @Override // org.apache.qpid.server.queue.SubscriptionFactory
        public Subscription createSubscription(int i, AMQProtocolSession aMQProtocolSession, AMQShortString aMQShortString, boolean z, FieldTable fieldTable, boolean z2, AMQQueue aMQQueue) throws AMQException {
            return new SubscriptionImpl(i, aMQProtocolSession, aMQShortString, z, fieldTable, z2, aMQQueue);
        }

        @Override // org.apache.qpid.server.queue.SubscriptionFactory
        public SubscriptionImpl createSubscription(int i, AMQProtocolSession aMQProtocolSession, AMQShortString aMQShortString) throws AMQException {
            return new SubscriptionImpl(i, aMQProtocolSession, aMQShortString, false, null, false, null);
        }
    }

    public SubscriptionImpl(int i, AMQProtocolSession aMQProtocolSession, AMQShortString aMQShortString, boolean z) throws AMQException {
        this(i, aMQProtocolSession, aMQShortString, z, null, false, null);
    }

    public SubscriptionImpl(int i, AMQProtocolSession aMQProtocolSession, AMQShortString aMQShortString, boolean z, FieldTable fieldTable, boolean z2, AMQQueue aMQQueue) throws AMQException {
        this._sentClose = false;
        this._sendLock = new AtomicBoolean(false);
        this.id = String.valueOf(System.identityHashCode(this));
        AMQChannel channel = aMQProtocolSession.getChannel(i);
        if (channel == null) {
            throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + i + " not found in protocol session");
        }
        this.channel = channel;
        this.protocolSession = aMQProtocolSession;
        this.consumerTag = aMQShortString;
        this._sessionKey = aMQProtocolSession.getKey();
        this._acks = z;
        this._noLocal = z2;
        this._queue = aMQQueue;
        this._filters = FilterManagerFactory.createManager(fieldTable);
        if (this._filters != null) {
            Object obj = fieldTable.get(AMQPFilterTypes.NO_CONSUME.getValue());
            if (obj != null) {
                this._isBrowser = ((Boolean) obj).booleanValue();
            } else {
                this._isBrowser = false;
            }
        } else {
            this._isBrowser = false;
        }
        if (this._filters != null) {
            Object obj2 = fieldTable.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
            if (obj2 != null) {
                this._autoClose = (Boolean) obj2;
            } else {
                this._autoClose = false;
            }
        } else {
            this._autoClose = false;
        }
        if (filtersMessages()) {
            this._messages = new ConcurrentLinkedMessageQueueAtomicSize();
        } else {
            this._messages = null;
        }
    }

    public SubscriptionImpl(int i, AMQProtocolSession aMQProtocolSession, AMQShortString aMQShortString) throws AMQException {
        this(i, aMQProtocolSession, aMQShortString, false);
    }

    public boolean equals(Object obj) {
        return (obj instanceof SubscriptionImpl) && equals((SubscriptionImpl) obj);
    }

    private boolean equals(SubscriptionImpl subscriptionImpl) {
        return this._sessionKey.equals(subscriptionImpl._sessionKey) && subscriptionImpl.channel == this.channel && subscriptionImpl.consumerTag.equals(this.consumerTag);
    }

    public int hashCode() {
        return this._sessionKey.hashCode();
    }

    public String toString() {
        String str = "[channel=" + this.channel + ", consumerTag=" + this.consumerTag + ", session=" + this.protocolSession.getKey() + ", resendQueue=" + (this._resendQueue != null);
        if (this._resendQueue != null) {
            str = str + ", resendSize=" + this._resendQueue.size();
        }
        return str + "]";
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void send(QueueEntry queueEntry, AMQQueue aMQQueue) throws AMQException {
        if (queueEntry == null) {
            _logger.error("Attempt to send Null message", new NullPointerException());
        } else if (this._isBrowser) {
            sendToBrowser(queueEntry, aMQQueue);
        } else {
            sendToConsumer(this.channel.getStoreContext(), queueEntry, aMQQueue);
        }
    }

    private void sendToBrowser(QueueEntry queueEntry, AMQQueue aMQQueue) throws AMQException {
        synchronized (this.channel) {
            long nextDeliveryTag = this.channel.getNextDeliveryTag();
            if (this._sendLock.get()) {
                _logger.error("Sending " + queueEntry + " when subscriber(" + this + ") is closed!");
            }
            this.protocolSession.getProtocolOutputConverter().writeDeliver(queueEntry.getMessage(), this.channel.getChannelId(), nextDeliveryTag, this.consumerTag);
        }
    }

    private void sendToConsumer(StoreContext storeContext, QueueEntry queueEntry, AMQQueue aMQQueue) throws AMQException {
        try {
            if (!this._acks) {
                if (_logger.isDebugEnabled()) {
                    _logger.debug("No ack mode so dequeuing message immediately: " + queueEntry.getMessage().getMessageId());
                }
                aMQQueue.dequeue(storeContext, queueEntry);
            }
            synchronized (this.channel) {
                long nextDeliveryTag = this.channel.getNextDeliveryTag();
                if (this._acks) {
                    this.channel.addUnacknowledgedMessage(queueEntry, nextDeliveryTag, this.consumerTag);
                }
                this.protocolSession.getProtocolOutputConverter().writeDeliver(queueEntry.getMessage(), this.channel.getChannelId(), nextDeliveryTag, this.consumerTag);
            }
            if (!this._acks) {
                queueEntry.getMessage().decrementReference(storeContext);
            }
        } finally {
            queueEntry.setDeliveredToConsumer();
        }
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean isSuspended() {
        return this.channel.isSuspended() || this._sendLock.get();
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void queueDeleted(AMQQueue aMQQueue) throws AMQException {
        this.channel.queueDeleted(aMQQueue);
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean filtersMessages() {
        return this._filters != null || this._noLocal;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean hasInterest(QueueEntry queueEntry) {
        AMQProtocolSession publisher;
        Object object;
        Object object2;
        if (queueEntry.isRejectedBy(this) && _logger.isDebugEnabled()) {
            _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + queueEntry.debugIdentity());
        }
        if (this._noLocal && (publisher = queueEntry.getMessage().getPublisher()) != null) {
            if (this.protocolSession.getClientProperties() == null || (object = this.protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) == null) {
                Object clientIdentifier = this.protocolSession.getClientIdentifier();
                Object clientIdentifier2 = publisher.getClientIdentifier();
                if (clientIdentifier == clientIdentifier2) {
                    return false;
                }
                if (clientIdentifier != null && clientIdentifier.equals(clientIdentifier2)) {
                    return false;
                }
            } else if (publisher.getClientProperties() != null && (object2 = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null && (object == object2 || object.equals(object2))) {
                return false;
            }
        }
        return checkFilters(queueEntry);
    }

    private String debugIdentity() {
        return this.id;
    }

    private boolean checkFilters(QueueEntry queueEntry) {
        return this._filters == null || this._filters.allAllow(queueEntry.getMessage());
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public Queue<QueueEntry> getPreDeliveryQueue() {
        return this._messages;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void enqueueForPreDelivery(QueueEntry queueEntry, boolean z) {
        if (this._messages != null) {
            if (z) {
                this._messages.pushHead(queueEntry);
            } else {
                this._messages.offer(queueEntry);
            }
        }
    }

    private boolean isAutoClose() {
        return this._autoClose.booleanValue();
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void close() {
        boolean andSet;
        synchronized (this._sendLock) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Setting SendLock true:" + debugIdentity());
            }
            andSet = this._sendLock.getAndSet(true);
        }
        if (andSet) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Called close() on a closed subscription");
                return;
            }
            return;
        }
        if (_logger.isInfoEnabled()) {
            _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
        }
        if (this._resendQueue != null && !this._resendQueue.isEmpty()) {
            if (_logger.isInfoEnabled()) {
                _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this);
            }
            requeue();
        }
        if (this._messages != null) {
            if (_logger.isInfoEnabled()) {
                _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this);
            }
            this._messages.clear();
        }
    }

    private void autoclose() {
        close();
        if (!this._autoClose.booleanValue() || this._sentClose) {
            return;
        }
        _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
        boolean z = false;
        try {
            z = this.channel.unsubscribeConsumer(this.protocolSession, this.consumerTag);
        } catch (AMQException e) {
            _logger.info("Unable to UnsubscribeConsumer :" + this.consumerTag + " so not going to send CancelOK.");
        }
        if (z) {
            this.protocolSession.getProtocolOutputConverter().confirmConsumerAutoClose(this.channel.getChannelId(), this.consumerTag);
            this._sentClose = true;
        }
    }

    private void requeue() {
        if (this._queue != null) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Requeuing :" + this._resendQueue.size() + " messages");
            }
            while (!this._resendQueue.isEmpty()) {
                QueueEntry poll = this._resendQueue.poll();
                if (_logger.isDebugEnabled()) {
                    _logger.debug("Removed for resending:" + poll.debugIdentity());
                }
                poll.release();
                this._queue.subscriberHasPendingResend(false, this, poll);
                try {
                    this.channel.getTransactionalContext().deliver(poll, true);
                } catch (AMQException e) {
                    _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e);
                }
            }
            if (!this._resendQueue.isEmpty()) {
                _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
            }
            this._queue.subscriberHasPendingResend(false, this, null);
        } else if (!this._resendQueue.isEmpty()) {
            _logger.error("Unable to re-deliver messages as queue is null.");
        }
        this._resendQueue = null;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean isClosed() {
        return this._sendLock.get();
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean isBrowser() {
        return this._isBrowser;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public boolean wouldSuspend(QueueEntry queueEntry) {
        return this._acks && this.channel.wouldSuspend(queueEntry.getMessage());
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public Queue<QueueEntry> getResendQueue() {
        if (this._resendQueue == null) {
            this._resendQueue = new ConcurrentLinkedQueueAtomicSize();
        }
        return this._resendQueue;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> queue) {
        if (this._resendQueue != null && !this._resendQueue.isEmpty()) {
            return this._resendQueue;
        }
        if (!filtersMessages()) {
            return queue;
        }
        if (!isAutoClose() || !this._messages.isEmpty()) {
            return this._messages;
        }
        autoclose();
        return null;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void addToResendQueue(QueueEntry queueEntry) {
        getResendQueue().add(queueEntry);
        if (this._queue == null) {
            _logger.error("Queue is null won't be able to resend messages");
        } else {
            this._queue.subscriberHasPendingResend(true, this, queueEntry);
        }
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public Object getSendLock() {
        return this._sendLock;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public AMQChannel getChannel() {
        return this.channel;
    }

    @Override // org.apache.qpid.server.queue.Subscription
    public void start() {
        if (filtersMessages() && isAutoClose() && this._messages.isEmpty()) {
            autoclose();
        }
    }
}
