package org.apache.qpid.server.consumer;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.typedmessage.TypedBytesCodes;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/consumer/AbstractConsumerTarget.class */
public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> {
        return "[(** Multi-Queue **)] ";
    };
    private final boolean _isMultiQueue;
    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
    private volatile Iterator<MessageInstanceConsumer> _pullIterator;
    private volatile boolean _notifyWorkDesired;
    protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
    protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
    private final AtomicReference<ConsumerTarget.State> _state = new AtomicReference<>(ConsumerTarget.State.OPEN);
    private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList();
    private final AtomicBoolean _scheduled = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.qpid.server.consumer.AbstractConsumerTarget$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/qpid/server/consumer/AbstractConsumerTarget$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$qpid$server$message$MessageSource$MessageConversionExceptionHandlingPolicy = new int[MessageSource.MessageConversionExceptionHandlingPolicy.values().length];

        static {
            try {
                $SwitchMap$org$apache$qpid$server$message$MessageSource$MessageConversionExceptionHandlingPolicy[MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$qpid$server$message$MessageSource$MessageConversionExceptionHandlingPolicy[MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$qpid$server$message$MessageSource$MessageConversionExceptionHandlingPolicy[MessageSource.MessageConversionExceptionHandlingPolicy.REJECT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected AbstractConsumerTarget(boolean z, final AMQPConnection<?> aMQPConnection) {
        this._isMultiQueue = z;
        this._suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(((Long) aMQPConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).longValue()) { // from class: org.apache.qpid.server.consumer.AbstractConsumerTarget.1
            @Override // org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker
            protected void log(long j) {
                aMQPConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(Long.valueOf(j)));
            }
        };
    }

    private LogSubject getLogSubject() {
        return (this._consumers.size() == 1 && (this._consumers.get(0) instanceof LogSubject)) ? (LogSubject) this._consumers.get(0) : MULTI_QUEUE_LOG_SUBJECT;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void acquisitionRemoved(MessageInstance messageInstance) {
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean isMultiQueue() {
        return this._isMultiQueue;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void notifyWork() {
        getSession().notifyWork(this);
    }

    protected final void setNotifyWorkDesired(boolean z) {
        if (z != this._notifyWorkDesired) {
            if (z) {
                getSession().removeTicker(this._suspendedConsumerLoggingTicker);
            } else {
                this._suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
                getSession().addTicker(this._suspendedConsumerLoggingTicker);
            }
            Iterator<MessageInstanceConsumer> it = this._consumers.iterator();
            while (it.hasNext()) {
                it.next().setNotifyWorkDesired(z);
            }
            this._notifyWorkDesired = z;
        }
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean isNotifyWorkDesired() {
        return this._notifyWorkDesired;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean processPending() {
        if (getSession() == null || !getSession().getAMQPConnection().isIOThread()) {
            return false;
        }
        return sendNextMessage();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void consumerAdded(MessageInstanceConsumer messageInstanceConsumer) {
        this._consumers.add(messageInstanceConsumer);
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer messageInstanceConsumer) {
        return this._consumers.contains(messageInstanceConsumer) ? doOnIoThreadAsync(() -> {
            consumerRemovedInternal(messageInstanceConsumer);
        }) : Futures.immediateFuture((Object) null);
    }

    private ListenableFuture<Void> doOnIoThreadAsync(Runnable runnable) {
        return getSession().getAMQPConnection().doOnIOThreadAsync(runnable);
    }

    private void consumerRemovedInternal(MessageInstanceConsumer messageInstanceConsumer) {
        this._consumers.remove(messageInstanceConsumer);
        if (this._consumers.isEmpty()) {
            close();
        }
    }

    public List<MessageInstanceConsumer> getConsumers() {
        return this._consumers;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean isSuspended() {
        return !isNotifyWorkDesired();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final ConsumerTarget.State getState() {
        return this._state.get();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final void send(MessageInstanceConsumer messageInstanceConsumer, MessageInstance messageInstance, boolean z) {
        doSend(messageInstanceConsumer, messageInstance, z);
        getSession().getAMQPConnection().updateLastMessageOutboundTime();
        if (messageInstanceConsumer.acquires()) {
            messageInstance.makeAcquisitionStealable();
        }
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void resetStatistics() {
        this._unacknowledgedCount.set(0L);
        this._unacknowledgedBytes.set(0L);
    }

    protected abstract void doSend(MessageInstanceConsumer messageInstanceConsumer, MessageInstance messageInstance, boolean z);

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean sendNextMessage() {
        MessageContainer messageContainer = null;
        MessageInstanceConsumer messageInstanceConsumer = null;
        boolean z = false;
        while (messageContainer == null) {
            if (this._pullIterator == null || !this._pullIterator.hasNext()) {
                if (z) {
                    break;
                }
                z = true;
                this._pullIterator = getConsumers().iterator();
            }
            if (this._pullIterator.hasNext()) {
                messageInstanceConsumer = this._pullIterator.next();
                messageContainer = messageInstanceConsumer.pullMessage();
            }
        }
        if (messageInstanceConsumer == null || messageContainer == null) {
            return false;
        }
        MessageInstance messageInstance = messageContainer.getMessageInstance();
        try {
            try {
                send(messageInstanceConsumer, messageInstance, false);
                if (messageContainer.getMessageReference() == null) {
                    return true;
                }
                messageContainer.getMessageReference().release();
                return true;
            } catch (MessageConversionException e) {
                restoreCredit(messageInstance.getMessage());
                TransactionLogResource owningResource = messageInstance.getOwningResource();
                if (!(owningResource instanceof MessageSource)) {
                    throw new ConnectionScopedRuntimeException(String.format("Unable to convert message %s for this consumer", messageInstance.getMessage()), e);
                }
                MessageSource.MessageConversionExceptionHandlingPolicy messageConversionExceptionHandlingPolicy = ((MessageSource) owningResource).getMessageConversionExceptionHandlingPolicy();
                switch (AnonymousClass2.$SwitchMap$org$apache$qpid$server$message$MessageSource$MessageConversionExceptionHandlingPolicy[messageConversionExceptionHandlingPolicy.ordinal()]) {
                    case 1:
                        messageInstance.release(messageInstanceConsumer);
                        throw new ConnectionScopedRuntimeException(String.format("Unable to convert message %s for this consumer", messageInstance.getMessage()), e);
                    case 2:
                        if (!messageInstanceConsumer.acquires()) {
                            LOGGER.info("Failed to convert message {} for this browser because '{}'.  Message skipped.", messageInstance.getMessage(), e.getMessage());
                            break;
                        } else {
                            if (messageInstance.routeToAlternate(null, null, null) == 0) {
                                LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message discarded.", messageInstance.getMessage(), e.getMessage());
                            } else {
                                LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message routed to alternate.", messageInstance.getMessage(), e.getMessage());
                            }
                            break;
                        }
                    case TypedBytesCodes.BYTEARRAY_TYPE /* 3 */:
                        messageInstance.reject(messageInstanceConsumer);
                        messageInstance.release(messageInstanceConsumer);
                        LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message skipped.", messageInstance.getMessage(), e.getMessage());
                        break;
                    default:
                        throw new ServerScopedRuntimeException("Unrecognised policy " + messageConversionExceptionHandlingPolicy);
                }
                if (messageContainer.getMessageReference() == null) {
                    return true;
                }
                messageContainer.getMessageReference().release();
                return true;
            }
        } catch (Throwable th) {
            if (messageContainer.getMessageReference() != null) {
                messageContainer.getMessageReference().release();
            }
            throw th;
        }
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean close() {
        if (!this._state.compareAndSet(ConsumerTarget.State.OPEN, ConsumerTarget.State.CLOSED)) {
            return false;
        }
        setNotifyWorkDesired(false);
        ArrayList arrayList = new ArrayList(this._consumers);
        this._consumers.clear();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((MessageInstanceConsumer) it.next()).close();
        }
        getSession().removeTicker(this._suspendedConsumerLoggingTicker);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean setScheduled() {
        return this._scheduled.compareAndSet(false, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void clearScheduled() {
        this._scheduled.set(false);
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void queueDeleted(Queue queue, MessageInstanceConsumer messageInstanceConsumer) {
        consumerRemoved(messageInstanceConsumer);
    }
}
