/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private boolean isDispatcherStuckOnReplays = false;
    private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>(){

        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
            return new HashMap<Consumer, List<Entry>>();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, PulsarApi.KeySharedMeta ksm) {
        super(topic, cursor, subscription);
        this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
        this.recentlyJoinedConsumers = this.allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap();
        switch (ksm.getKeySharedMode()) {
            case AUTO_SPLIT: {
                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    break;
                }
                this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                break;
            }
            case STICKY: {
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid key-shared mode: " + ksm.getKeySharedMode());
            }
        }
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        this.selector.addConsumer(consumer);
        if (!this.allowOutOfOrderDelivery && this.consumerList.size() > 1 && this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1L) {
            this.recentlyJoinedConsumers.put(consumer, (PositionImpl)this.cursor.getReadPosition());
        }
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
        this.recentlyJoinedConsumers.remove(consumer);
    }

    @Override
    protected void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> entries) {
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        int entriesCount = entries.size();
        if (entriesCount == 0) {
            this.readMoreEntries();
            return;
        }
        if (this.consumerSet.isEmpty()) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            return;
        }
        Map groupedEntries = (Map)localGroupedEntries.get();
        groupedEntries.clear();
        for (int i = 0; i < entriesCount; ++i) {
            Entry entry = entries.get(i);
            Consumer c = this.selector.select(this.peekStickyKey(entry.getDataBuffer()));
            groupedEntries.computeIfAbsent(c, k -> new ArrayList()).add(entry);
        }
        AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
        for (Map.Entry current : groupedEntries.entrySet()) {
            Entry entry;
            int i;
            Consumer consumer = (Consumer)current.getKey();
            List entriesWithSameKey = (List)current.getValue();
            int entriesWithSameKeyCount = entriesWithSameKey.size();
            int maxMessagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits());
            int messagesForC = this.getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC);
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, consumer.consumerName(), messagesForC, readType});
            }
            if (messagesForC < entriesWithSameKeyCount) {
                for (i = messagesForC; i < entriesWithSameKeyCount; ++i) {
                    entry = (Entry)entriesWithSameKey.get(i);
                    this.messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
                    entry.release();
                    entriesWithSameKey.set(i, null);
                }
            }
            if (messagesForC <= 0) continue;
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                for (i = 0; i < messagesForC; ++i) {
                    entry = (Entry)entriesWithSameKey.get(i);
                    this.messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
                }
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
            this.filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor);
            consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.getRedeliveryTracker()).addListener(future -> {
                if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                    this.readMoreEntries();
                }
            });
            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                ((DispatchRateLimiter)this.dispatchRateLimiter.get()).tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
        }
        if (totalMessagesSent == 0L && this.recentlyJoinedConsumers.isEmpty()) {
            this.isDispatcherStuckOnReplays = true;
            this.readMoreEntries();
        }
    }

    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
        if (maxMessages == 0) {
            return 0;
        }
        PositionImpl maxReadPosition = this.recentlyJoinedConsumers.get(consumer);
        if (maxReadPosition == null) {
            return maxMessages;
        }
        PositionImpl markDeletePosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
        if (maxReadPosition.compareTo(markDeletePosition.getNext()) <= 0) {
            this.recentlyJoinedConsumers.remove(consumer);
            return maxMessages;
        }
        for (int i = 0; i < maxMessages; ++i) {
            if (((PositionImpl)entries.get(i).getPosition()).compareTo(maxReadPosition) < 0) continue;
            return i;
        }
        return maxMessages;
    }

    @Override
    public synchronized void acknowledgementWasProcessed() {
        if (!this.recentlyJoinedConsumers.isEmpty()) {
            this.readMoreEntries();
        }
    }

    @Override
    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
        if (this.isDispatcherStuckOnReplays) {
            this.isDispatcherStuckOnReplays = false;
            return Collections.emptySet();
        }
        return super.getMessagesToReplayNow(maxMessagesToRead);
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Key_Shared;
    }

    @Override
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }
}

