/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Murmur3_32Hash;

public class NonPersistentStickyKeyDispatcherMultipleConsumers
extends NonPersistentDispatcherMultipleConsumers {
    private final StickyKeyConsumerSelector selector;

    public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription, StickyKeyConsumerSelector selector) {
        super(topic, subscription);
        this.selector = selector;
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        this.selector.addConsumer(consumer);
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Key_Shared;
    }

    @Override
    public void sendMessages(List<Entry> entries) {
        if (entries.size() > 0) {
            HashMap groupedEntries = new HashMap();
            for (Entry entry2 : entries) {
                int key = Murmur3_32Hash.getInstance().makeHash(this.peekStickyKey(entry2.getDataBuffer())) % this.selector.getRangeSize();
                groupedEntries.putIfAbsent(key, new ArrayList());
                ((List)groupedEntries.get(key)).add(entry2);
            }
            for (Map.Entry entriesWithSameKey : groupedEntries.entrySet()) {
                Consumer consumer = this.selector.selectByIndex((Integer)entriesWithSameKey.getKey());
                if (consumer != null) {
                    SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                    EntryBatchSizes batchSizes = EntryBatchSizes.get(((List)entriesWithSameKey.getValue()).size());
                    this.filterEntriesForConsumer((List)entriesWithSameKey.getValue(), batchSizes, sendMessageInfo);
                    consumer.sendMessages((List)entriesWithSameKey.getValue(), batchSizes, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), this.getRedeliveryTracker());
                    TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
                    continue;
                }
                entries.forEach(entry -> {
                    int totalMsgs = Commands.getNumberOfMessagesInBatch((ByteBuf)entry.getDataBuffer(), (String)this.subscription.toString(), (long)-1L);
                    if (totalMsgs > 0) {
                        this.msgDrop.recordEvent((long)totalMsgs);
                    }
                    entry.release();
                });
            }
        }
    }
}

