/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageDeduplication {
    private final PulsarService pulsar;
    private final PersistentTopic topic;
    private final ManagedLedger managedLedger;
    private ManagedCursor managedCursor;
    private static final String IS_LAST_CHUNK = "isLastChunk";
    private volatile Status status;
    @VisibleForTesting
    final Map<String, Long> highestSequencedPushed = new ConcurrentHashMap<String, Long>();
    @VisibleForTesting
    final Map<String, Long> highestSequencedPersisted = new ConcurrentHashMap<String, Long>();
    private final int snapshotInterval;
    private int snapshotCounter;
    private volatile long lastSnapshotTimestamp = 0L;
    private final int maxNumberOfProducers;
    private final Map<String, Long> inactiveProducers = new ConcurrentHashMap<String, Long>();
    private final String replicatorPrefix;
    private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);
    private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);

    public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
        this.pulsar = pulsar;
        this.topic = topic;
        this.managedLedger = managedLedger;
        this.status = Status.Initialized;
        this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
        this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
        this.snapshotCounter = 0;
        this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix();
    }

    private CompletableFuture<Void> recoverSequenceIdsMap() {
        this.managedCursor.getProperties().forEach((k, v) -> {
            this.producerRemoved((String)k);
            this.highestSequencedPushed.put((String)k, (Long)v);
            this.highestSequencedPersisted.put((String)k, (Long)v);
        });
        log.info("[{}] Replaying {} entries for deduplication", (Object)this.topic.getName(), (Object)this.managedCursor.getNumberOfEntries());
        CompletableFuture<Position> future = new CompletableFuture<Position>();
        this.replayCursor(future);
        return future.thenCompose(lastPosition -> {
            if (lastPosition != null && this.snapshotCounter >= this.snapshotInterval) {
                this.snapshotCounter = 0;
                return this.takeSnapshot((Position)lastPosition);
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    private void replayCursor(final CompletableFuture<Position> future) {
        this.managedCursor.asyncReadEntries(100, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                Position lastPosition = null;
                for (Entry entry : entries) {
                    ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
                    MessageMetadata md = Commands.parseMessageMetadata((ByteBuf)messageMetadataAndPayload);
                    String producerName = md.getProducerName();
                    long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
                    MessageDeduplication.this.highestSequencedPushed.put(producerName, sequenceId);
                    MessageDeduplication.this.highestSequencedPersisted.put(producerName, sequenceId);
                    MessageDeduplication.this.producerRemoved(producerName);
                    ++MessageDeduplication.this.snapshotCounter;
                    lastPosition = entry.getPosition();
                    entry.release();
                }
                if (MessageDeduplication.this.managedCursor.hasMoreEntries()) {
                    MessageDeduplication.this.pulsar.getExecutor().execute(() -> MessageDeduplication.this.replayCursor(future));
                } else {
                    future.complete(lastPosition);
                }
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, null, PositionFactory.LATEST);
    }

    public Status getStatus() {
        return this.status;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> checkStatus() {
        boolean shouldBeEnabled = this.topic.isDeduplicationEnabled();
        MessageDeduplication messageDeduplication = this;
        synchronized (messageDeduplication) {
            if (this.status == Status.Recovering || this.status == Status.Removing) {
                this.pulsar.getExecutor().schedule(this::checkStatus, 1L, TimeUnit.MINUTES);
                return CompletableFuture.completedFuture(null);
            }
            if (this.status == Status.Initialized && !shouldBeEnabled) {
                this.status = Status.Removing;
                this.managedLedger.asyncDeleteCursor("pulsar.dedup", new AsyncCallbacks.DeleteCursorCallback(){

                    public void deleteCursorComplete(Object ctx) {
                        MessageDeduplication.this.status = Status.Disabled;
                        log.info("[{}] Deleted deduplication cursor", (Object)MessageDeduplication.this.topic.getName());
                    }

                    public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                        if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
                            MessageDeduplication.this.status = Status.Disabled;
                        } else {
                            log.error("[{}] Deleted deduplication cursor error", (Object)MessageDeduplication.this.topic.getName(), (Object)exception);
                        }
                    }
                }, null);
            }
            if (this.status == Status.Enabled && !shouldBeEnabled) {
                final CompletableFuture<Void> future = new CompletableFuture<Void>();
                this.status = Status.Removing;
                this.managedLedger.asyncDeleteCursor("pulsar.dedup", new AsyncCallbacks.DeleteCursorCallback(){

                    public void deleteCursorComplete(Object ctx) {
                        MessageDeduplication.this.status = Status.Disabled;
                        MessageDeduplication.this.managedCursor = null;
                        MessageDeduplication.this.highestSequencedPushed.clear();
                        MessageDeduplication.this.highestSequencedPersisted.clear();
                        future.complete(null);
                        log.info("[{}] Disabled deduplication", (Object)MessageDeduplication.this.topic.getName());
                    }

                    public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                        if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
                            MessageDeduplication.this.status = Status.Disabled;
                            MessageDeduplication.this.managedCursor = null;
                            MessageDeduplication.this.highestSequencedPushed.clear();
                            MessageDeduplication.this.highestSequencedPersisted.clear();
                            future.complete(null);
                        } else {
                            log.warn("[{}] Failed to disable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)exception.getMessage());
                            MessageDeduplication.this.status = Status.Failed;
                            future.completeExceptionally(exception);
                        }
                    }
                }, null);
                return future;
            }
            if ((this.status == Status.Disabled || this.status == Status.Initialized) && shouldBeEnabled) {
                final CompletableFuture<Void> future = new CompletableFuture<Void>();
                this.managedLedger.asyncOpenCursor("pulsar.dedup", new AsyncCallbacks.OpenCursorCallback(){

                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                        cursor.setAlwaysInactive();
                        MessageDeduplication.this.managedCursor = cursor;
                        ((CompletableFuture)MessageDeduplication.this.recoverSequenceIdsMap().thenRun(() -> {
                            MessageDeduplication.this.status = Status.Enabled;
                            future.complete(null);
                            log.info("[{}] Enabled deduplication", (Object)MessageDeduplication.this.topic.getName());
                        })).exceptionally(ex -> {
                            MessageDeduplication.this.status = Status.Failed;
                            log.warn("[{}] Failed to enable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)ex.getMessage());
                            future.completeExceptionally((Throwable)ex);
                            return null;
                        });
                    }

                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                        log.warn("[{}] Failed to enable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)exception.getMessage());
                        future.completeExceptionally(exception);
                    }
                }, null);
                return future;
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    public boolean isEnabled() {
        return this.status == Status.Enabled;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageDupStatus isDuplicate(Topic.PublishContext publishContext, ByteBuf headersAndPayload) {
        if (!this.isEnabled() || publishContext.isMarkerMessage()) {
            return MessageDupStatus.NotDup;
        }
        String producerName = publishContext.getProducerName();
        long sequenceId = publishContext.getSequenceId();
        long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
        MessageMetadata md = null;
        if (producerName.startsWith(this.replicatorPrefix)) {
            int readerIndex = headersAndPayload.readerIndex();
            md = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
            producerName = md.getProducerName();
            sequenceId = md.getSequenceId();
            highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
            publishContext.setOriginalProducerName(producerName);
            publishContext.setOriginalSequenceId(sequenceId);
            publishContext.setOriginalHighestSequenceId(highestSequenceId);
            headersAndPayload.readerIndex(readerIndex);
        }
        long chunkID = -1L;
        long totalChunk = -1L;
        if (publishContext.isChunked()) {
            if (md == null) {
                int readerIndex = headersAndPayload.readerIndex();
                md = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
                headersAndPayload.readerIndex(readerIndex);
            }
            chunkID = md.getChunkId();
            totalChunk = md.getNumChunksFromMsg();
        }
        if (chunkID != -1L && chunkID != totalChunk - 1L) {
            publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
            return MessageDupStatus.NotDup;
        }
        Map<String, Long> map = this.highestSequencedPushed;
        synchronized (map) {
            Long lastSequenceIdPushed = this.highestSequencedPushed.get(producerName);
            if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
                Long lastSequenceIdPersisted;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}", new Object[]{this.topic.getName(), producerName, sequenceId, lastSequenceIdPushed});
                }
                if ((lastSequenceIdPersisted = this.highestSequencedPersisted.get(producerName)) != null && sequenceId <= lastSequenceIdPersisted) {
                    return MessageDupStatus.Dup;
                }
                return MessageDupStatus.Unknown;
            }
            this.highestSequencedPushed.put(producerName, highestSequenceId);
        }
        if (chunkID != -1L && chunkID == totalChunk - 1L) {
            publishContext.setProperty(IS_LAST_CHUNK, Boolean.TRUE);
        }
        return MessageDupStatus.NotDup;
    }

    public void recordMessagePersisted(Topic.PublishContext publishContext, Position position) {
        Boolean isLastChunk;
        if (!this.isEnabled() || publishContext.isMarkerMessage()) {
            return;
        }
        String producerName = publishContext.getProducerName();
        long sequenceId = publishContext.getSequenceId();
        long highestSequenceId = publishContext.getHighestSequenceId();
        if (publishContext.getOriginalProducerName() != null) {
            producerName = publishContext.getOriginalProducerName();
            sequenceId = publishContext.getOriginalSequenceId();
            highestSequenceId = publishContext.getOriginalHighestSequenceId();
        }
        if ((isLastChunk = (Boolean)publishContext.getProperty(IS_LAST_CHUNK)) == null || isLastChunk.booleanValue()) {
            this.highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
        }
        if (++this.snapshotCounter >= this.snapshotInterval) {
            this.snapshotCounter = 0;
            this.takeSnapshot(position);
        }
    }

    public void resetHighestSequenceIdPushed() {
        if (!this.isEnabled()) {
            return;
        }
        this.highestSequencedPushed.clear();
        for (String producer : this.highestSequencedPersisted.keySet()) {
            this.highestSequencedPushed.put(producer, this.highestSequencedPersisted.get(producer));
        }
    }

    private CompletableFuture<Void> takeSnapshot(final Position position) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Taking snapshot of sequence ids map", (Object)this.topic.getName());
        }
        if (!this.snapshotTaking.compareAndSet(false, true)) {
            future.complete(null);
            return future;
        }
        TreeMap snapshot = new TreeMap();
        this.highestSequencedPersisted.forEach((producerName, sequenceId) -> {
            if (snapshot.size() < this.maxNumberOfProducers) {
                snapshot.put(producerName, sequenceId);
            }
        });
        this.getManagedCursor().asyncMarkDelete(position, snapshot, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Stored new deduplication snapshot at {}", (Object)MessageDeduplication.this.topic.getName(), (Object)position);
                }
                MessageDeduplication.this.lastSnapshotTimestamp = System.currentTimeMillis();
                MessageDeduplication.this.snapshotTaking.set(false);
                future.complete(null);
            }

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed to store new deduplication snapshot at {}", new Object[]{MessageDeduplication.this.topic.getName(), position, exception});
                MessageDeduplication.this.snapshotTaking.set(false);
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    public void producerAdded(String producerName) {
        if (!this.isEnabled()) {
            return;
        }
        this.inactiveProducers.remove(producerName);
    }

    public void producerRemoved(String producerName) {
        if (!this.isEnabled()) {
            return;
        }
        this.inactiveProducers.put(producerName, System.currentTimeMillis());
    }

    public synchronized void purgeInactiveProducers() {
        long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
        if (!this.isEnabled()) {
            if (!this.inactiveProducers.isEmpty()) {
                this.inactiveProducers.clear();
            }
            return;
        }
        Iterator<Map.Entry<String, Long>> mapIterator = this.inactiveProducers.entrySet().iterator();
        boolean hasInactive = false;
        while (mapIterator.hasNext()) {
            Map.Entry<String, Long> entry = mapIterator.next();
            String producerName = entry.getKey();
            long lastActiveTimestamp = entry.getValue();
            if (lastActiveTimestamp >= minimumActiveTimestamp) continue;
            log.info("[{}] Purging dedup information for producer {}", (Object)this.topic.getName(), (Object)producerName);
            mapIterator.remove();
            this.highestSequencedPushed.remove(producerName);
            this.highestSequencedPersisted.remove(producerName);
            hasInactive = true;
        }
        if (hasInactive && this.isEnabled()) {
            this.takeSnapshot(this.getManagedCursor().getMarkDeletedPosition());
        }
    }

    public long getLastPublishedSequenceId(String producerName) {
        Long sequenceId = this.highestSequencedPushed.get(producerName);
        return sequenceId != null ? sequenceId : -1L;
    }

    public void takeSnapshot() {
        if (!this.isEnabled()) {
            return;
        }
        Integer interval = (Integer)this.topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
        long currentTimeStamp = System.currentTimeMillis();
        if (interval == null || interval <= 0 || currentTimeStamp - this.lastSnapshotTimestamp < TimeUnit.SECONDS.toMillis(interval.intValue())) {
            return;
        }
        Position position = this.managedLedger.getLastConfirmedEntry();
        if (position == null) {
            return;
        }
        Position markDeletedPosition = this.managedCursor.getMarkDeletedPosition();
        if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
            return;
        }
        this.takeSnapshot(position);
    }

    @VisibleForTesting
    ManagedCursor getManagedCursor() {
        return this.managedCursor;
    }

    @VisibleForTesting
    Map<String, Long> getInactiveProducers() {
        return this.inactiveProducers;
    }

    static enum Status {
        Initialized,
        Disabled,
        Recovering,
        Enabled,
        Removing,
        Failed;

    }

    @VisibleForTesting
    public static enum MessageDupStatus {
        Unknown,
        NotDup,
        Dup;

    }

    public static class MessageDupUnknownException
    extends RuntimeException {
        public MessageDupUnknownException() {
            super("Cannot determine whether the message is a duplicate at this time");
        }
    }
}

