/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBaseDispatcher
implements Dispatcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractBaseDispatcher.class);
    protected final Subscription subscription;
    protected final ServiceConfiguration serviceConfig;
    public static final String NONE_KEY = "NONE_KEY";

    protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
        this.subscription = subscription;
        this.serviceConfig = serviceConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
        int totalMessages = 0;
        long totalBytes = 0L;
        int totalChunkedMessages = 0;
        boolean isAfterTxnCommitMarker = false;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            PulsarApi.MessageMetadata msgMetadata = Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)this.subscription.toString(), (long)-1L);
            try {
                BrokerInterceptor interceptor;
                if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                    if (Markers.isTxnCommitMarker((PulsarApi.MessageMetadata)msgMetadata)) {
                        this.handleTxnCommitMarker(entry);
                        if (!isAfterTxnCommitMarker) {
                            isAfterTxnCommitMarker = true;
                        }
                    } else if (Markers.isTxnAbortMarker((PulsarApi.MessageMetadata)msgMetadata)) {
                        this.handleTxnAbortMarker(entry);
                    }
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (msgMetadata == null || Markers.isServerOnlyMarker((PulsarApi.MessageMetadata)msgMetadata)) {
                    PositionImpl pos = (PositionImpl)entry.getPosition();
                    if (Markers.isReplicatedSubscriptionSnapshotMarker((PulsarApi.MessageMetadata)msgMetadata)) {
                        this.processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                    }
                    entries.set(i, null);
                    entry.release();
                    this.subscription.acknowledgeMessage(Collections.singletonList(pos), PulsarApi.CommandAck.AckType.Individual, Collections.emptyMap());
                    continue;
                }
                if (msgMetadata.hasDeliverAtTime() && this.trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (isAfterTxnCommitMarker) {
                    this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                int batchSize = msgMetadata.getNumMessagesInBatch();
                totalMessages += batchSize;
                totalBytes += (long)metadataAndPayload.readableBytes();
                totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
                batchSizes.setBatchSize(i, batchSize);
                long[] ackSet = null;
                if (indexesAcks != null && cursor != null) {
                    ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get((long)entry.getLedgerId(), (long)entry.getEntryId()));
                    if (ackSet != null) {
                        indexesAcks.setIndexesAcks(i, (Pair<Integer, long[]>)Pair.of((Object)batchSize, (Object)ackSet));
                    } else {
                        indexesAcks.setIndexesAcks(i, null);
                    }
                }
                if (null == (interceptor = this.subscription.interceptor())) continue;
                interceptor.beforeSendMessage(this.subscription, entry, ackSet, msgMetadata);
                continue;
            }
            finally {
                msgMetadata.recycle();
            }
        }
        sendMessageInfo.setTotalMessages(totalMessages);
        sendMessageInfo.setTotalBytes(totalBytes);
        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
    }

    private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
        Commands.skipMessageMetadata((ByteBuf)headersAndPayload);
        try {
            PulsarMarkers.ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot((ByteBuf)headersAndPayload);
            this.subscription.processReplicatedSubscriptionSnapshot(snapshot);
        }
        catch (Throwable t) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", new Object[]{pos, t.getMessage(), t});
            return;
        }
    }

    @Override
    public void resetCloseFuture() {
    }

    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
        metadataAndPayload.markReaderIndex();
        PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)metadataAndPayload);
        metadataAndPayload.resetReaderIndex();
        byte[] key = NONE_KEY.getBytes();
        if (metadata.hasOrderingKey()) {
            return metadata.getOrderingKey().toByteArray();
        }
        if (metadata.hasPartitionKey()) {
            return metadata.getPartitionKey().getBytes();
        }
        metadata.recycle();
        return key;
    }

    protected boolean addMessageToReplay(long ledgerId, long entryId) {
        return false;
    }

    private void handleTxnCommitMarker(Entry entry) {
        ByteBuf byteBuf = entry.getDataBuffer();
        Commands.skipMessageMetadata((ByteBuf)byteBuf);
        try {
            PulsarMarkers.TxnCommitMarker commitMarker = Markers.parseCommitMarker((ByteBuf)byteBuf);
            for (PulsarMarkers.MessageIdData messageIdData : commitMarker.getMessageIdList()) {
                this.addMessageToReplay(messageIdData.getLedgerId(), messageIdData.getEntryId());
            }
        }
        catch (IOException e) {
            log.error("Failed to parse commit marker.", (Throwable)e);
        }
    }

    private void handleTxnAbortMarker(Entry entry) {
        ((PersistentTopic)this.subscription.getTopic()).getBrokerService().getPulsar().getOrderedExecutor().execute(() -> {
            ByteBuf byteBuf = entry.getDataBuffer();
            Commands.skipMessageMetadata((ByteBuf)byteBuf);
            try {
                ArrayList<Position> positionList = new ArrayList<Position>();
                PulsarMarkers.TxnCommitMarker abortMarker = Markers.parseCommitMarker((ByteBuf)byteBuf);
                for (PulsarMarkers.MessageIdData messageIdData : abortMarker.getMessageIdList()) {
                    positionList.add((Position)PositionImpl.get((long)messageIdData.getLedgerId(), (long)messageIdData.getEntryId()));
                }
                this.subscription.acknowledgeMessage(positionList, PulsarApi.CommandAck.AckType.Individual, Collections.emptyMap());
            }
            catch (IOException e) {
                log.error("Failed to parse abort marker.", (Throwable)e);
            }
        });
    }

    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg, long bytesToRead, long availablePermitsOnByte) {
        if (availablePermitsOnMsg > 0) {
            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
        }
        if (availablePermitsOnByte > 0L) {
            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
        }
        return Pair.of((Object)messagesToRead, (Object)bytesToRead);
    }
}

