/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.EntryWrapper;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBaseDispatcher
implements Dispatcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractBaseDispatcher.class);
    protected final Subscription subscription;
    protected final ServiceConfiguration serviceConfig;

    protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
        this.subscription = subscription;
        this.serviceConfig = serviceConfig;
    }

    protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<Entry> entries) {
        int totalMessages = 0;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            EntryWrapper entryWrapper;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            MessageMetadata msgMetadata = Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)this.subscription.toString(), (long)-1L);
            entryWrappers[i] = entryWrapper = EntryWrapper.get(entry, msgMetadata);
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
        }
        return totalMessages;
    }

    public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
        this.filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor, isReplayRead);
    }

    public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset, List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
        int totalMessages = 0;
        long totalBytes = 0L;
        int totalChunkedMessages = 0;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            BrokerInterceptor interceptor;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            int entryWrapperIndex = i + entryWrapperOffset;
            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null ? entryWrapper.get()[entryWrapperIndex].getMetadata() : null;
            MessageMetadata messageMetadata = msgMetadata = msgMetadata == null ? Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)this.subscription.toString(), (long)-1L) : msgMetadata;
            if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                if (Markers.isTxnMarker((MessageMetadata)msgMetadata)) {
                    this.individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (((PersistentTopic)this.subscription.getTopic()).isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
                    this.individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            } else {
                if (msgMetadata == null || Markers.isServerOnlyMarker((MessageMetadata)msgMetadata)) {
                    PositionImpl pos = (PositionImpl)entry.getPosition();
                    if (Markers.isReplicatedSubscriptionSnapshotMarker((MessageMetadata)msgMetadata)) {
                        this.processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                    }
                    entries.set(i, null);
                    entry.release();
                    this.individualAcknowledgeMessageIfNeeded((Position)pos, Collections.emptyMap());
                    continue;
                }
                if (msgMetadata.hasDeliverAtTime() && this.trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
            totalBytes += (long)metadataAndPayload.readableBytes();
            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
            batchSizes.setBatchSize(i, batchSize);
            long[] ackSet = null;
            if (indexesAcks != null && cursor != null) {
                ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get((long)entry.getLedgerId(), (long)entry.getEntryId()));
                if (ackSet != null) {
                    indexesAcks.setIndexesAcks(i, (Pair<Integer, long[]>)Pair.of((Object)batchSize, (Object)ackSet));
                } else {
                    indexesAcks.setIndexesAcks(i, null);
                }
            }
            if (null == (interceptor = this.subscription.interceptor())) continue;
            interceptor.beforeSendMessage(this.subscription, entry, ackSet, msgMetadata);
        }
        sendMessageInfo.setTotalMessages(totalMessages);
        sendMessageInfo.setTotalBytes(totalBytes);
        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
    }

    private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
        if (!(this.subscription instanceof CompactorSubscription)) {
            this.subscription.acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, properties);
        }
    }

    protected abstract boolean isConsumersExceededOnSubscription();

    protected boolean isConsumersExceededOnSubscription(BrokerService brokerService, String topic, int consumerSize) {
        Policies policies = null;
        Integer maxConsumersPerSubscription = null;
        try {
            maxConsumersPerSubscription = brokerService.getTopicPolicies(TopicName.get((String)topic)).map(TopicPolicies::getMaxConsumersPerSubscription).orElse(null);
            if (maxConsumersPerSubscription == null) {
                policies = (Policies)brokerService.pulsar().getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path("policies", TopicName.get((String)topic).getNamespace()));
            }
        }
        catch (Exception e) {
            log.debug("Get topic or namespace policies fail", (Throwable)e);
        }
        if (maxConsumersPerSubscription == null) {
            maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription != null && policies.max_consumers_per_subscription >= 0 ? policies.max_consumers_per_subscription.intValue() : brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
        }
        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
    }

    private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
        Commands.skipMessageMetadata((ByteBuf)headersAndPayload);
        try {
            ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot((ByteBuf)headersAndPayload);
            this.subscription.processReplicatedSubscriptionSnapshot(snapshot);
        }
        catch (Throwable t) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", new Object[]{pos, t.getMessage(), t});
            return;
        }
    }

    @Override
    public void resetCloseFuture() {
    }

    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg, long bytesToRead, long availablePermitsOnByte) {
        if (availablePermitsOnMsg > 0) {
            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
        }
        if (availablePermitsOnByte > 0L) {
            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
        }
        return Pair.of((Object)messagesToRead, (Object)bytesToRead);
    }

    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
        return Commands.peekStickyKey((ByteBuf)metadataAndPayload, (String)this.subscription.getTopicName(), (String)this.subscription.getName());
    }
}

