/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.EntryWrapper;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBaseDispatcher
implements Dispatcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractBaseDispatcher.class);
    protected final Subscription subscription;
    protected final ServiceConfiguration serviceConfig;
    protected final boolean dispatchThrottlingOnBatchMessageEnabled;
    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
    protected final FilterContext filterContext;

    protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
        this.subscription = subscription;
        this.serviceConfig = serviceConfig;
        this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
        if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic().getBrokerService().getEntryFilters())) {
            this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
            this.filterContext = new FilterContext();
        } else {
            this.entryFilters = ImmutableList.of();
            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
        }
    }

    protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<Entry> entries) {
        int totalMessages = 0;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            EntryWrapper entryWrapper;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            MessageMetadata msgMetadata = Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)this.subscription.toString(), (long)-1L);
            entryWrappers[i] = entryWrapper = EntryWrapper.get(entry, msgMetadata);
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
        }
        return totalMessages;
    }

    public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
        return this.filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor, isReplayRead, consumer);
    }

    public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset, List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
        int totalMessages = 0;
        long totalBytes = 0L;
        int totalChunkedMessages = 0;
        int totalEntries = 0;
        ArrayList<Position> entriesToFiltered = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList<Position>() : null;
        ArrayList<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList<PositionImpl>() : null;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            BrokerInterceptor interceptor;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            int entryWrapperIndex = i + entryWrapperOffset;
            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null ? entryWrapper.get()[entryWrapperIndex].getMetadata() : null;
            msgMetadata = msgMetadata == null ? Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)this.subscription.toString(), (long)-1L) : msgMetadata;
            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
            if (CollectionUtils.isNotEmpty(this.entryFilters)) {
                this.fillContext(this.filterContext, msgMetadata, this.subscription, consumer);
                filterResult = AbstractBaseDispatcher.getFilterResult(this.filterContext, entry, this.entryFilters);
                if (filterResult == EntryFilter.FilterResult.REJECT) {
                    entriesToFiltered.add(entry.getPosition());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                    entriesToRedeliver.add((PositionImpl)entry.getPosition());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }
            if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                if (Markers.isTxnMarker((MessageMetadata)msgMetadata)) {
                    this.subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), CommandAck.AckType.Individual, Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (((PersistentTopic)this.subscription.getTopic()).isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
                    this.subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), CommandAck.AckType.Individual, Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            } else {
                if (msgMetadata == null || Markers.isServerOnlyMarker((MessageMetadata)msgMetadata)) {
                    PositionImpl pos = (PositionImpl)entry.getPosition();
                    if (Markers.isReplicatedSubscriptionSnapshotMarker((MessageMetadata)msgMetadata)) {
                        this.processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                    }
                    entries.set(i, null);
                    entry.release();
                    this.subscription.acknowledgeMessage(Collections.singletonList(pos), CommandAck.AckType.Individual, Collections.emptyMap());
                    continue;
                }
                if (msgMetadata.hasDeliverAtTime() && this.trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }
            ++totalEntries;
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
            totalBytes += (long)metadataAndPayload.readableBytes();
            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
            batchSizes.setBatchSize(i, batchSize);
            long[] ackSet = null;
            if (indexesAcks != null && cursor != null) {
                ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get((long)entry.getLedgerId(), (long)entry.getEntryId()));
                if (ackSet != null) {
                    indexesAcks.setIndexesAcks(i, (Pair<Integer, long[]>)Pair.of((Object)batchSize, (Object)ackSet));
                } else {
                    indexesAcks.setIndexesAcks(i, null);
                }
            }
            if (null == (interceptor = this.subscription.interceptor())) continue;
            interceptor.beforeSendMessage(this.subscription, entry, ackSet, msgMetadata);
        }
        if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
            this.subscription.acknowledgeMessage(entriesToFiltered, CommandAck.AckType.Individual, Collections.emptyMap());
            int filtered = entriesToFiltered.size();
            Topic topic = this.subscription.getTopic();
            if (topic instanceof AbstractTopic) {
                ((AbstractTopic)topic).addFilteredEntriesCount(filtered);
            }
        }
        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor().schedule(() -> this.subscription.redeliverUnacknowledgedMessages(consumer, entriesToRedeliver), 1L, TimeUnit.SECONDS);
        }
        sendMessageInfo.setTotalMessages(totalMessages);
        sendMessageInfo.setTotalBytes(totalBytes);
        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
        return totalEntries;
    }

    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, ImmutableList<EntryFilterWithClassLoader> entryFilters) {
        for (EntryFilter entryFilter : entryFilters) {
            EntryFilter.FilterResult filterResult = entryFilter.filterEntry(entry, filterContext);
            if (filterResult == null) {
                filterResult = EntryFilter.FilterResult.ACCEPT;
            }
            if (filterResult == EntryFilter.FilterResult.ACCEPT) continue;
            return filterResult;
        }
        return EntryFilter.FilterResult.ACCEPT;
    }

    private void fillContext(FilterContext context, MessageMetadata msgMetadata, Subscription subscription, Consumer consumer) {
        context.reset();
        context.setMsgMetadata(msgMetadata);
        context.setSubscription(subscription);
        context.setConsumer(consumer);
    }

    protected abstract boolean isConsumersExceededOnSubscription();

    protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
        Integer maxConsumersPerSubscription = (Integer)topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
    }

    private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
        Commands.skipMessageMetadata((ByteBuf)headersAndPayload);
        try {
            ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot((ByteBuf)headersAndPayload);
            this.subscription.processReplicatedSubscriptionSnapshot(snapshot);
        }
        catch (Throwable t) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", new Object[]{pos, t.getMessage(), t});
            return;
        }
    }

    @Override
    public void resetCloseFuture() {
    }

    protected abstract void reScheduleRead();

    protected boolean reachDispatchRateLimit(DispatchRateLimiter dispatchRateLimiter) {
        if (dispatchRateLimiter.isDispatchRateLimitingEnabled() && !dispatchRateLimiter.hasMessageDispatchPermit()) {
            this.reScheduleRead();
            return true;
        }
        return false;
    }

    protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter, int messagesToRead, long bytesToRead) {
        return AbstractBaseDispatcher.computeReadLimits(messagesToRead, (int)dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(), bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
    }

    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg, long bytesToRead, long availablePermitsOnByte) {
        if (availablePermitsOnMsg > 0) {
            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
        }
        if (availablePermitsOnByte > 0L) {
            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
        }
        return Pair.of((Object)messagesToRead, (Object)bytesToRead);
    }

    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
        return Commands.peekStickyKey((ByteBuf)metadataAndPayload, (String)this.subscription.getTopicName(), (String)this.subscription.getName());
    }
}

