/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStreamingDispatcherSingleActiveConsumer
extends PersistentDispatcherSingleActiveConsumer
implements StreamingDispatcher {
    private static final Logger log = LoggerFactory.getLogger(PersistentStreamingDispatcherSingleActiveConsumer.class);
    private final StreamingEntryReader streamingEntryReader;
    private final Executor dispatcherExecutor;

    public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor cursor, CommandSubscribe.SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) {
        super(cursor, subscriptionType, partitionIndex, topic, subscription);
        this.streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this, this.topic);
        this.dispatcherExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread((Object)this.name);
    }

    @Override
    public void canReadMoreEntries(boolean withBackoff) {
        this.havePendingRead = false;
        this.topic.getBrokerService().executor().schedule(() -> this.topicExecutor.execute(() -> {
            PersistentStreamingDispatcherSingleActiveConsumer persistentStreamingDispatcherSingleActiveConsumer = this;
            synchronized (persistentStreamingDispatcherSingleActiveConsumer) {
                Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                if (currentConsumer != null && !this.havePendingRead) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}-{}] Scheduling read ", (Object)this.name, (Object)currentConsumer);
                    }
                    this.readMoreEntries(currentConsumer);
                } else {
                    log.info("[{}-{}] Skipping read as we still havePendingRead {}", new Object[]{this.name, currentConsumer, this.havePendingRead});
                }
            }
        }), withBackoff ? this.readFailureBackoff.next() : 0L, TimeUnit.MILLISECONDS);
    }

    @Override
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.streamingEntryReader.cancelReadRequests()) {
            this.havePendingRead = false;
        }
    }

    @Override
    public synchronized void notifyConsumersEndOfTopic() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
            this.checkAndApplyReachedEndOfTopicOrTopicMigration(this.consumers);
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
        this.dispatcherExecutor.execute(() -> this.internalReadEntryComplete(entry, ctx));
    }

    public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
        byte[] key;
        Consumer consumer;
        if (ctx.isLast()) {
            this.readFailureBackoff.reduceToHalf();
            this.havePendingRead = false;
        }
        this.isFirstRead = false;
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, ((Consumer)ctx.ctx).consumerName(), this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
        if (this.isKeyHashRangeFiltered && ((consumer = this.stickyKeyConsumerSelector.select(key = this.peekStickyKey(entry.getDataBuffer()))) == null || currentConsumer != consumer)) {
            entry.release();
            return;
        }
        Consumer consumer2 = (Consumer)ctx.ctx;
        ctx.recycle();
        if (currentConsumer == null || consumer2 != currentConsumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind because no available consumer found to dispatch message to.", (Object)this.name);
            }
            entry.release();
            this.streamingEntryReader.cancelReadRequests();
            this.havePendingRead = false;
            if (currentConsumer != null) {
                this.notifyActiveConsumerChanged(currentConsumer);
                this.readMoreEntries(currentConsumer);
            }
        } else {
            EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(1);
            this.filterEntriesForConsumer(Lists.newArrayList((Object[])new Entry[]{entry}), batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, false, consumer2);
            this.cursor.seek((Position)((ManagedLedgerImpl)this.cursor.getManagedLedger()).getNextValidPosition((PositionImpl)entry.getPosition()));
            this.dispatchEntriesToConsumer(currentConsumer, Lists.newArrayList((Object[])new Entry[]{entry}), batchSizes, batchIndexesAcks, sendMessageInfo, -1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void readMoreEntries(Consumer consumer) {
        if (null == consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", (Object)this.topic.getName());
            }
            return;
        }
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", (Object)this.topic.getName());
            }
            return;
        }
        if (consumer.getAvailablePermits() > 0) {
            PersistentStreamingDispatcherSingleActiveConsumer persistentStreamingDispatcherSingleActiveConsumer = this;
            synchronized (persistentStreamingDispatcherSingleActiveConsumer) {
                if (this.havePendingRead) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Skipping read for the topic, Due to we have pending read.", (Object)this.topic.getName());
                    }
                    return;
                }
                Pair<Integer, Long> calculateResult = this.calculateToRead(consumer);
                int messagesToRead = (Integer)calculateResult.getLeft();
                long bytesToRead = (Long)calculateResult.getRight();
                if (-1 == messagesToRead || bytesToRead == -1L) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, messagesToRead});
                }
                this.havePendingRead = true;
                if (consumer.readCompacted()) {
                    this.topic.getCompactedTopic().asyncReadEntriesOrWait(this.cursor, messagesToRead, bytesToRead, PositionImpl.LATEST, this.isFirstRead, this, consumer);
                } else {
                    this.streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
                }
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Consumer buffer is full, pause reading", (Object)this.name, (Object)consumer);
        }
    }
}

