package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.class */
public final class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements Dispatcher, AsyncCallbacks.ReadEntriesCallback {
    private final PersistentTopic topic;
    private final ManagedCursor cursor;
    private final String name;
    private boolean havePendingRead;
    private static final int MaxReadBatchSize = 100;
    private int readBatchSize;
    private final Backoff readFailureBackoff;
    private final ServiceConfiguration serviceConfig;
    private ScheduledFuture<?> readOnActiveConsumerTask;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

    public PersistentDispatcherSingleActiveConsumer(ManagedCursor managedCursor, PulsarApi.CommandSubscribe.SubType subType, int i, PersistentTopic persistentTopic) {
        super(subType, i, persistentTopic.getName());
        this.havePendingRead = false;
        this.readFailureBackoff = new Backoff(15L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
        this.readOnActiveConsumerTask = null;
        this.topic = persistentTopic;
        this.name = String.valueOf(persistentTopic.getName()) + " / " + (managedCursor.getName() != null ? Codec.decode(managedCursor.getName()) : "");
        this.cursor = managedCursor;
        this.readBatchSize = 100;
        this.serviceConfig = persistentTopic.getBrokerService().pulsar().getConfiguration();
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void scheduleReadOnActiveConsumer() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
        if (this.havePendingRead) {
            return;
        }
        if (this.subscriptionType == PulsarApi.CommandSubscribe.SubType.Failover && this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis() > 0) {
            if (this.readOnActiveConsumerTask != null) {
                return;
            }
            this.readOnActiveConsumerTask = this.topic.getBrokerService().executor().schedule(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Rewind cursor and read more entries after {} ms delay", this.name, Integer.valueOf(this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis()));
                }
                this.cursor.rewind();
                readMoreEntries(AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this));
                this.readOnActiveConsumerTask = null;
            }, this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries without delay", this.name);
            }
            this.cursor.rewind();
            readMoreEntries(AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this));
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    public synchronized void readEntriesComplete(List<Entry> list, Object obj) {
        Consumer consumer = (Consumer) obj;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got messages: {}", new Object[]{this.name, consumer, Integer.valueOf(list.size())});
        }
        this.havePendingRead = false;
        if (this.readBatchSize < 100) {
            int min = Math.min(this.readBatchSize * 2, 100);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, consumer, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        Consumer consumer2 = AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this);
        if (consumer2 != null && consumer == consumer2) {
            Consumer.SendMessageInfo sendMessages = consumer2.sendMessages(list);
            long totalSentMessages = sendMessages.getTotalSentMessages();
            long totalSentMessageBytes = sendMessages.getTotalSentMessageBytes();
            sendMessages.getChannelPromse().addListener(future -> {
                if (future.isSuccess()) {
                    if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
                        this.topic.getDispatchRateLimiter().tryDispatchPermit(totalSentMessages, totalSentMessageBytes);
                    }
                    ?? r0 = this;
                    synchronized (r0) {
                        Consumer consumer3 = AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this);
                        if (consumer3 != null && !this.havePendingRead) {
                            readMoreEntries(consumer3);
                        } else if (log.isDebugEnabled()) {
                            Logger logger = log;
                            Object[] objArr = new Object[4];
                            objArr[0] = this.name;
                            objArr[1] = consumer3;
                            objArr[2] = Boolean.valueOf(consumer3 != null);
                            objArr[3] = Boolean.valueOf(this.havePendingRead);
                            logger.debug("[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}", objArr);
                        }
                        r0 = r0;
                    }
                }
            });
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] rewind because no available consumer found", this.name);
        }
        list.forEach((v0) -> {
            v0.release();
        });
        this.cursor.rewind();
        if (consumer2 != null) {
            readMoreEntries(consumer2);
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void consumerFlow(Consumer consumer, int i) {
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", this.name, consumer);
            }
        } else if (AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", this.name, consumer);
            }
        } else if (this.readOnActiveConsumerTask != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded", this.name, consumer);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message", this.name, consumer);
            }
            readMoreEntries(consumer);
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
        if (consumer != AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this)) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", this.name, consumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded", this.name, consumer);
            return;
        }
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
        if (this.havePendingRead) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", this.name, consumer);
            return;
        }
        this.cursor.rewind();
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", this.name, consumer);
        }
        readMoreEntries(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        redeliverUnacknowledgedMessages(consumer);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void readMoreEntries(Consumer consumer) {
        if (consumer == null) {
            return;
        }
        int availablePermits = consumer.getAvailablePermits();
        if (availablePermits <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Consumer buffer is full, pause reading", this.name, consumer);
                return;
            }
            return;
        }
        if (!consumer.isWritable()) {
            availablePermits = 1;
        }
        int min = Math.min(availablePermits, this.readBatchSize);
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            DispatchRateLimiter dispatchRateLimiter = this.topic.getDispatchRateLimiter();
            if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
                if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter.getDispatchRateOnByte()), 1000});
                    }
                    this.topic.getBrokerService().executor().schedule(() -> {
                        Consumer consumer2 = AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this);
                        if (consumer2 != null && !this.havePendingRead) {
                            readMoreEntries(consumer2);
                        } else if (log.isDebugEnabled()) {
                            log.info("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.topic.getName(), consumer2, Boolean.valueOf(this.havePendingRead)});
                        }
                    }, 1000L, TimeUnit.MILLISECONDS);
                    return;
                } else {
                    long availableDispatchRateLimitOnMsg = dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg();
                    if (availableDispatchRateLimitOnMsg > 0) {
                        min = Math.min(min, (int) availableDispatchRateLimitOnMsg);
                    }
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, Integer.valueOf(min)});
        }
        this.havePendingRead = true;
        this.cursor.asyncReadEntriesOrWait(min, this, consumer);
    }

    public synchronized void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        this.havePendingRead = false;
        Consumer consumer = (Consumer) obj;
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog() == 0) {
                this.consumers.forEach((v0) -> {
                    v0.reachedEndOfTopic();
                });
            }
        } else if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, consumer, this.cursor.getReadPosition(), managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, consumer, this.cursor.getReadPosition(), managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        }
        Preconditions.checkNotNull(consumer);
        this.readBatchSize = 1;
        this.topic.getBrokerService().executor().schedule(() -> {
            ?? r0 = this;
            synchronized (r0) {
                Consumer consumer2 = AbstractDispatcherSingleActiveConsumer.ACTIVE_CONSUMER_UPDATER.get(this);
                if (consumer2 == null || this.havePendingRead) {
                    log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.name, consumer, consumer2, Boolean.valueOf(this.havePendingRead)});
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}-{}] Retrying read operation", this.name, consumer);
                    }
                    readMoreEntries(consumer2);
                }
                r0 = r0;
            }
        }, next, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void addUnAckedMessages(int i) {
    }
}
