/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMultipleConsumers
implements Dispatcher,
AsyncCallbacks.ReadEntriesCallback {
    private static final int MaxReadBatchSize = 100;
    private static final int MaxRoundRobinBatchSize = 20;
    private final PersistentTopic topic;
    private final ManagedCursor cursor;
    private CompletableFuture<Void> closeFuture = null;
    private ConcurrentLongPairSet messagesToReplay;
    private boolean havePendingRead = false;
    private boolean havePendingReplayRead = false;
    private boolean shouldRewindBeforeReadingOrReplaying = false;
    private final String name;
    private int totalAvailablePermits = 0;
    private int readBatchSize;
    private final Backoff readFailureBackoff = new Backoff(15L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES);
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
    private volatile int totalUnackedMessages = 0;
    private final int maxUnackedMessages;
    private volatile int blockedDispatcherOnUnackedMsgs = 0;
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);

    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) {
        this.cursor = cursor;
        this.name = String.valueOf(topic.getName()) + " / " + Codec.decode((String)cursor.getName());
        this.topic = topic;
        this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
        this.readBatchSize = 100;
        this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) {
        if (AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer ", (Object)this.name, (Object)consumer);
            consumer.disconnect();
            return;
        }
        if (this.consumerList.isEmpty()) {
            if (this.havePendingRead || this.havePendingReplayRead) {
                this.shouldRewindBeforeReadingOrReplaying = true;
            } else {
                this.cursor.rewind();
                this.shouldRewindBeforeReadingOrReplaying = false;
            }
            this.messagesToReplay.clear();
        }
        this.consumerList.add(consumer);
        this.consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());
        this.consumerSet.add((Object)consumer);
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.addUnAckedMessages(-consumer.getUnackedMessages());
        if (this.consumerSet.removeAll((Object)consumer) == 1) {
            this.consumerList.remove(consumer);
            log.info("Removed consumer {} with pending {} acks", (Object)consumer, (Object)consumer.getPendingAcks().size());
            if (this.consumerList.isEmpty()) {
                if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
                    this.havePendingRead = false;
                }
                this.messagesToReplay.clear();
                if (this.closeFuture != null) {
                    log.info("[{}] All consumers removed. Subscription is disconnected", (Object)this.name);
                    this.closeFuture.complete(null);
                }
                this.totalAvailablePermits = 0;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer are left, reading more entries", (Object)this.name);
                }
                consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> this.messagesToReplay.add(ledgerId, entryId));
                this.totalAvailablePermits -= consumer.getAvailablePermits();
                this.readMoreEntries();
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Trying to remove a non-connected consumer: {}", (Object)this.name, (Object)consumer);
        }
    }

    @Override
    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        if (!this.consumerSet.contains((Object)consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", (Object)this.name, (Object)consumer);
            }
            return;
        }
        this.totalAvailablePermits += additionalNumberOfMessages;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Trigger new read after receiving flow control message", (Object)consumer);
        }
        this.readMoreEntries();
    }

    public void readMoreEntries() {
        if (this.totalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            int messagesToRead = Math.min(this.totalAvailablePermits, this.readBatchSize);
            if (!this.messagesToReplay.isEmpty()) {
                if (this.havePendingReplayRead) {
                    log.debug("[{}] Skipping replay while awaiting previous read to complete", (Object)this.name);
                    return;
                }
                Set messagesToReplayNow = this.messagesToReplay.items(messagesToRead).stream().map(pair -> new PositionImpl(pair.first, pair.second)).collect(Collectors.toSet());
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, messagesToReplayNow.size(), this.consumerList.size()});
                }
                this.havePendingReplayRead = true;
                Set deletedMessages = this.cursor.asyncReplayEntries(messagesToReplayNow, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Replay);
                deletedMessages.forEach(position -> {
                    boolean bl = this.messagesToReplay.remove(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId());
                });
                if (messagesToReplayNow.size() - deletedMessages.size() == 0) {
                    this.havePendingReplayRead = false;
                    this.readMoreEntries();
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
                log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, TOTAL_UNACKED_MESSAGES_UPDATER.get(this), this.maxUnackedMessages});
            } else if (!this.havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, messagesToRead, this.consumerList.size()});
                }
                this.havePendingRead = true;
                this.cursor.asyncReadEntriesOrWait(messagesToRead, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Normal);
            } else {
                log.debug("[{}] Cannot schedule next read until previous one is done", (Object)this.name);
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer buffer is full, pause reading", (Object)this.name);
        }
    }

    @Override
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains((Object)consumer);
    }

    @Override
    public CompletableFuture<Void> close() {
        AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.set(this, 1);
        return this.disconnectAllConsumers();
    }

    @Override
    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
        this.closeFuture = new CompletableFuture();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(Consumer::disconnect);
            if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
                this.havePendingRead = false;
            }
        }
        return this.closeFuture;
    }

    @Override
    public void reset() {
        AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Shared;
    }

    public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        int start = 0;
        int entriesToDispatch = entries.size();
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
        }
        if (this.readBatchSize < 100) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, 100);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        if (this.shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing {} messages to {} consumers", new Object[]{this.name, entries.size(), this.consumerList.size()});
        }
        while (entriesToDispatch > 0 && this.totalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            Consumer c = this.getNextConsumer();
            if (c == null) {
                entries.subList(start, entries.size()).forEach(Entry::release);
                this.cursor.rewind();
                return;
            }
            int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), 20);
            if (messagesForC <= 0) continue;
            if (readType == ReadType.Replay) {
                entries.subList(start, start + messagesForC).forEach(entry -> this.messagesToReplay.remove(entry.getLedgerId(), entry.getEntryId()));
            }
            int msgSent = (Integer)c.sendMessages(entries.subList(start, start + messagesForC)).getRight();
            start += messagesForC;
            entriesToDispatch -= messagesForC;
            this.totalAvailablePermits -= msgSent;
        }
        if (entriesToDispatch > 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", (Object)this.name, (Object)(entries.size() - start));
            }
            entries.subList(start, entries.size()).forEach(entry -> {
                this.messagesToReplay.add(entry.getLedgerId(), entry.getEntryId());
                entry.release();
            });
        }
        this.readMoreEntries();
    }

    public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog() == 0L) {
                this.consumerList.forEach(Consumer::reachedEndOfTopic);
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        }
        if (this.shouldRewindBeforeReadingOrReplaying) {
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.cursor.rewind();
        }
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
            if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) {
                PositionImpl markDeletePosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
                this.messagesToReplay.removeIf((ledgerId, entryId) -> ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()).compare(entryId, markDeletePosition.getEntryId()).result() <= 0);
            }
        }
        this.readBatchSize = 1;
        this.topic.getBrokerService().executor().schedule(() -> {
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
            synchronized (persistentDispatcherMultipleConsumers) {
                if (!this.havePendingRead) {
                    log.info("[{}] Retrying read operation", (Object)this.name);
                    this.readMoreEntries();
                } else {
                    log.info("[{}] Skipping read retry: havePendingRead {}", new Object[]{this.name, this.havePendingRead, exception});
                }
            }
        }, waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    private boolean isAtleastOneConsumerAvailable() {
        if (this.consumerList.isEmpty() || AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.get(this) == 1) {
            return false;
        }
        for (Consumer consumer : this.consumerList) {
            if (!this.isConsumerAvailable(consumer)) continue;
            return true;
        }
        return false;
    }

    @Override
    public boolean isConsumerAvailable(Consumer consumer) {
        return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> this.messagesToReplay.add(ledgerId, entryId));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Redelivering unacknowledged messages for consumer {}", (Object)consumer, (Object)this.messagesToReplay);
        }
        this.readMoreEntries();
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        positions.forEach(position -> {
            boolean bl = this.messagesToReplay.add(position.getLedgerId(), position.getEntryId());
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}] Redelivering unacknowledged messages for consumer {}", (Object)consumer, positions);
        }
        this.readMoreEntries();
    }

    @Override
    public void addUnAckedMessages(int numberOfMessages) {
        if (this.maxUnackedMessages <= 0) {
            return;
        }
        int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
        if (unAckedMessages >= this.maxUnackedMessages && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 0, 1)) {
            log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, TOTAL_UNACKED_MESSAGES_UPDATER.get(this), this.maxUnackedMessages});
        } else if (this.topic.getBrokerService().isBrokerDispatchingBlocked() && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
            if (TOTAL_UNACKED_MESSAGES_UPDATER.get(this) < this.topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
                this.topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList((Object[])new PersistentDispatcherMultipleConsumers[]{this}));
            }
        } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1 && unAckedMessages < this.maxUnackedMessages / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked", (Object)this.name);
            this.topic.getBrokerService().executor().submit(() -> this.readMoreEntries());
        }
        this.topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
    }

    public boolean isBlockedDispatcherOnUnackedMsgs() {
        return BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1;
    }

    public void blockDispatcherOnUnackedMsgs() {
        BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, 1);
    }

    public void unBlockDispatcherOnUnackedMsgs() {
        BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, 0);
    }

    public int getTotalUnackedMessages() {
        return TOTAL_UNACKED_MESSAGES_UPDATER.get(this);
    }

    public String getName() {
        return this.name;
    }

    static enum ReadType {
        Normal,
        Replay;

    }
}

