package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.class */
public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers implements StreamingDispatcher {
    private static final Logger log = LoggerFactory.getLogger(PersistentStreamingDispatcherMultipleConsumers.class);
    private final StreamingEntryReader streamingEntryReader;

    public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription) {
        super(persistentTopic, managedCursor, subscription);
        this.streamingEntryReader = new StreamingEntryReader(this.cursor, this, this.topic);
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest pendingReadEntryRequest) {
        PersistentDispatcherMultipleConsumers.ReadType readType = (PersistentDispatcherMultipleConsumers.ReadType) pendingReadEntryRequest.ctx;
        if (pendingReadEntryRequest.isLast()) {
            this.readFailureBackoff.reduceToHalf();
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                this.havePendingRead = false;
            } else {
                this.havePendingReplayRead = false;
            }
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int min = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        if (this.shouldRewindBeforeReadingOrReplaying && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
            entry.release();
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing a messages to {} consumers", this.name, Integer.valueOf(this.consumerList.size()));
        }
        this.cursor.seek(this.cursor.getManagedLedger().getNextValidPosition(entry.getPosition()));
        long length = entry.getLength();
        updatePendingBytesToDispatch(length);
        if (this.serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
            this.sendInProgress = true;
            this.dispatchMessagesThread.execute(SafeRun.safeRun(() -> {
                if (sendMessagesToConsumers(readType, Lists.newArrayList(new Entry[]{entry}))) {
                    readMoreEntries();
                } else {
                    updatePendingBytesToDispatch(-length);
                }
            }));
        } else if (sendMessagesToConsumers(readType, Lists.newArrayList(new Entry[]{entry}))) {
            readMoreEntriesAsync();
        } else {
            updatePendingBytesToDispatch(-length);
        }
        pendingReadEntryRequest.recycle();
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public void canReadMoreEntries(boolean z) {
        this.havePendingRead = false;
        this.topic.getBrokerService().executor().schedule(() -> {
            this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topic.getName(), SafeRun.safeRun(() -> {
                synchronized (this) {
                    if (this.havePendingRead) {
                        log.info("[{}] Skipping read since we have pendingRead", this.name);
                    } else {
                        log.info("[{}] Scheduling read operation", this.name);
                        readMoreEntries();
                    }
                }
            }));
        }, z ? this.readFailureBackoff.next() : 0L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public void notifyConsumersEndOfTopic() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
            this.consumerList.forEach((v0) -> {
                v0.reachedEndOfTopic();
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.streamingEntryReader.cancelReadRequests()) {
            this.havePendingRead = false;
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public synchronized void readMoreEntries() {
        if (this.sendInProgress) {
            return;
        }
        int i = this.totalAvailablePermits;
        if (i <= 0 || !isAtleastOneConsumerAvailable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer buffer is full, pause reading", this.name);
                return;
            }
            return;
        }
        Pair<Integer, Long> calculateToRead = calculateToRead(i);
        int intValue = ((Integer) calculateToRead.getLeft()).intValue();
        long longValue = ((Long) calculateToRead.getRight()).longValue();
        if (-1 == intValue || longValue == -1) {
            return;
        }
        Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(intValue);
        if (!messagesToReplayNow.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(messagesToReplayNow.size()), Integer.valueOf(this.consumerList.size())});
            }
            this.havePendingReplayRead = true;
            Set<? extends Position> asyncReplayEntriesInOrder = this.topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
            asyncReplayEntriesInOrder.forEach(position -> {
                this.redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId());
            });
            if (messagesToReplayNow.size() - asyncReplayEntriesInOrder.size() == 0) {
                this.havePendingReplayRead = false;
                this.topic.getBrokerService().executor().execute(SafeRun.safeRun(this::readMoreEntries));
                return;
            }
            return;
        }
        if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, Integer.valueOf(this.totalUnackedMessages), Integer.valueOf(this.topic.getMaxUnackedMessagesOnSubscription())});
            return;
        }
        if (this.havePendingRead) {
            log.debug("[{}] Cannot schedule next read until previous one is done", this.name);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(intValue), Integer.valueOf(this.consumerList.size())});
        }
        this.havePendingRead = true;
        this.streamingEntryReader.asyncReadEntries(intValue, longValue, PersistentDispatcherMultipleConsumers.ReadType.Normal);
    }
}
