/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDispatcherMultipleConsumers {
    private final StickyKeyConsumerSelector selector;
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, StickyKeyConsumerSelector selector) {
        super(topic, cursor, subscription);
        this.selector = selector;
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        this.selector.addConsumer(consumer);
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
    }

    @Override
    protected void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> entries) {
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        if (entries.size() == 0) {
            this.readMoreEntries();
            return;
        }
        HashMap groupedEntries = new HashMap();
        for (Entry entry2 : entries) {
            int key = Murmur3_32Hash.getInstance().makeHash(this.peekStickyKey(entry2.getDataBuffer())) % this.selector.getRangeSize();
            groupedEntries.putIfAbsent(key, new ArrayList());
            ((List)groupedEntries.get(key)).add(entry2);
        }
        Iterator iterator = groupedEntries.entrySet().iterator();
        AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
        while (iterator.hasNext() && this.totalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            int availablePermits;
            Map.Entry entriesWithSameKey = iterator.next();
            Consumer consumer = this.selector.selectByIndex((Integer)entriesWithSameKey.getKey());
            if (consumer == null) {
                log.info("[{}] rewind because no available consumer found for key {} from total {}", new Object[]{this.name, entriesWithSameKey.getKey(), this.consumerList.size()});
                ((List)entriesWithSameKey.getValue()).forEach(Entry::release);
                this.cursor.rewind();
                return;
            }
            int n = availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
            if (log.isDebugEnabled() && !consumer.isWritable()) {
                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", new Object[]{this.topic.getName(), this.name, consumer});
            }
            int messagesForC = Math.min(((List)entriesWithSameKey.getValue()).size(), availablePermits);
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}", new Object[]{this.name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType});
            }
            if (messagesForC <= 0) continue;
            ArrayList<Entry> subList = new ArrayList<Entry>(((List)entriesWithSameKey.getValue()).subList(0, messagesForC));
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                subList.forEach(entry -> this.messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
            this.filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
            consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), this.getRedeliveryTracker()).addListener(future -> {
                if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                    this.readMoreEntries();
                }
            });
            ((List)entriesWithSameKey.getValue()).removeAll(subList);
            this.totalAvailablePermits -= sendMessageInfo.getTotalMessages();
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
            if (((List)entriesWithSameKey.getValue()).size() != 0) continue;
            iterator.remove();
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                ((DispatchRateLimiter)this.dispatchRateLimiter.get()).tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
        }
        if (groupedEntries.size() > 0) {
            int laterReplay = 0;
            for (List entryList : groupedEntries.values()) {
                laterReplay += entryList.size();
                entryList.forEach(entry -> {
                    this.messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
                    entry.release();
                });
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", (Object)this.name, (Object)laterReplay);
            }
        }
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Key_Shared;
    }

    @Override
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }
}

