/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageDeduplication {
    private final PulsarService pulsar;
    private final PersistentTopic topic;
    private final ManagedLedger managedLedger;
    private ManagedCursor managedCursor;
    private volatile Status status;
    private final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap(16, 1);
    private final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap(16, 1);
    private final int snapshotInterval;
    private int snapshotCounter;
    private final int maxNumberOfProducers;
    private final Map<String, Long> inactiveProducers = new HashMap<String, Long>();
    private final String replicatorPrefix;
    private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);

    public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
        this.pulsar = pulsar;
        this.topic = topic;
        this.managedLedger = managedLedger;
        this.status = Status.Disabled;
        this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
        this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
        this.snapshotCounter = 0;
        this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix();
    }

    private CompletableFuture<Void> recoverSequenceIdsMap() {
        this.managedCursor.getProperties().forEach((k, v) -> {
            this.highestSequencedPushed.put(k, v);
            this.highestSequencedPersisted.put(k, v);
        });
        log.info("[{}] Replaying {} entries for deduplication", (Object)this.topic.getName(), (Object)this.managedCursor.getNumberOfEntries());
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.replayCursor(future);
        return future;
    }

    private void replayCursor(final CompletableFuture<Void> future) {
        this.managedCursor.asyncReadEntries(100, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                for (Entry entry : entries) {
                    ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
                    PulsarApi.MessageMetadata md = Commands.parseMessageMetadata((ByteBuf)messageMetadataAndPayload);
                    String producerName = md.getProducerName();
                    long sequenceId = md.getSequenceId();
                    MessageDeduplication.this.highestSequencedPushed.put((Object)producerName, (Object)sequenceId);
                    MessageDeduplication.this.highestSequencedPersisted.put((Object)producerName, (Object)sequenceId);
                    md.recycle();
                    entry.release();
                }
                if (MessageDeduplication.this.managedCursor.hasMoreEntries()) {
                    MessageDeduplication.this.pulsar.getExecutor().submit(() -> MessageDeduplication.this.replayCursor(future));
                } else {
                    future.complete(null);
                }
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, null);
    }

    public CompletableFuture<Void> initialize() {
        for (ManagedCursor cursor : this.managedLedger.getCursors()) {
            if (!cursor.getName().equals("pulsar.dedup")) continue;
            this.status = Status.Recovering;
            this.managedCursor = cursor;
            break;
        }
        if (this.status == Status.Recovering) {
            return this.recoverSequenceIdsMap().thenCompose(v -> this.checkStatus());
        }
        return CompletableFuture.completedFuture(null);
    }

    public Status getStatus() {
        return this.status;
    }

    public CompletableFuture<Void> checkStatus() {
        return this.isDeduplicationEnabled().thenCompose(shouldBeEnabled -> {
            MessageDeduplication messageDeduplication = this;
            synchronized (messageDeduplication) {
                if (this.status == Status.Recovering || this.status == Status.Removing) {
                    this.pulsar.getExecutor().schedule(this::checkStatus, 1L, TimeUnit.MINUTES);
                    return CompletableFuture.completedFuture(null);
                }
                if (this.status == Status.Enabled && !shouldBeEnabled.booleanValue()) {
                    final CompletableFuture future = new CompletableFuture();
                    this.status = Status.Removing;
                    this.managedLedger.asyncDeleteCursor("pulsar.dedup", new AsyncCallbacks.DeleteCursorCallback(){

                        public void deleteCursorComplete(Object ctx) {
                            MessageDeduplication.this.status = Status.Disabled;
                            MessageDeduplication.this.managedCursor = null;
                            MessageDeduplication.this.highestSequencedPushed.clear();
                            MessageDeduplication.this.highestSequencedPersisted.clear();
                            future.complete(null);
                            log.info("[{}] Disabled deduplication", (Object)MessageDeduplication.this.topic.getName());
                        }

                        public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                            log.warn("[{}] Failed to disable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)exception.getMessage());
                            MessageDeduplication.this.status = Status.Failed;
                            future.completeExceptionally(exception);
                        }
                    }, null);
                    return future;
                }
                if (this.status == Status.Disabled && shouldBeEnabled.booleanValue()) {
                    final CompletableFuture future = new CompletableFuture();
                    this.managedLedger.asyncOpenCursor("pulsar.dedup", new AsyncCallbacks.OpenCursorCallback(){

                        public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                            cursor.setInactive();
                            MessageDeduplication.this.managedCursor = cursor;
                            ((CompletableFuture)MessageDeduplication.this.recoverSequenceIdsMap().thenRun(() -> {
                                MessageDeduplication.this.status = Status.Enabled;
                                future.complete(null);
                                log.info("[{}] Enabled deduplication", (Object)MessageDeduplication.this.topic.getName());
                            })).exceptionally(ex -> {
                                MessageDeduplication.this.status = Status.Failed;
                                log.warn("[{}] Failed to enable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)ex.getMessage());
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        }

                        public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                            log.warn("[{}] Failed to enable deduplication: {}", (Object)MessageDeduplication.this.topic.getName(), (Object)exception.getMessage());
                            future.completeExceptionally(exception);
                        }
                    }, null);
                    return future;
                }
                return CompletableFuture.completedFuture(null);
            }
        });
    }

    public boolean isEnabled() {
        return this.status == Status.Enabled;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldPublishNextMessage(Topic.PublishContext publishContext, ByteBuf headersAndPayload) {
        if (!this.isEnabled()) {
            return true;
        }
        String producerName = publishContext.getProducerName();
        long sequenceId = publishContext.getSequenceId();
        if (producerName.startsWith(this.replicatorPrefix)) {
            int readerIndex = headersAndPayload.readerIndex();
            PulsarApi.MessageMetadata md = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
            producerName = md.getProducerName();
            sequenceId = md.getSequenceId();
            publishContext.setOriginalProducerName(producerName);
            publishContext.setOriginalSequenceId(sequenceId);
            headersAndPayload.readerIndex(readerIndex);
            md.recycle();
        }
        ConcurrentOpenHashMap<String, Long> concurrentOpenHashMap = this.highestSequencedPushed;
        synchronized (concurrentOpenHashMap) {
            block7: {
                Long lastSequenceIdPushed = (Long)this.highestSequencedPushed.get((Object)producerName);
                if (lastSequenceIdPushed == null || sequenceId > lastSequenceIdPushed) break block7;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}", new Object[]{this.topic.getName(), producerName, sequenceId, lastSequenceIdPushed});
                }
                return false;
            }
            this.highestSequencedPushed.put((Object)producerName, (Object)sequenceId);
        }
        return true;
    }

    public void recordMessagePersisted(Topic.PublishContext publishContext, PositionImpl position) {
        if (!this.isEnabled()) {
            return;
        }
        String producerName = publishContext.getProducerName();
        long sequenceId = publishContext.getSequenceId();
        if (publishContext.getOriginalProducerName() != null) {
            producerName = publishContext.getOriginalProducerName();
            sequenceId = publishContext.getOriginalSequenceId();
        }
        this.highestSequencedPersisted.put((Object)producerName, (Object)sequenceId);
        if (++this.snapshotCounter >= this.snapshotInterval) {
            this.snapshotCounter = 0;
            this.takeSnapshot(position);
        }
    }

    private void takeSnapshot(final PositionImpl position) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Taking snapshot of sequence ids map", (Object)this.topic.getName());
        }
        TreeMap snapshot = new TreeMap();
        this.highestSequencedPersisted.forEach((producerName, sequenceId) -> {
            if (snapshot.size() < this.maxNumberOfProducers) {
                snapshot.put(producerName, sequenceId);
            }
        });
        this.managedCursor.asyncMarkDelete((Position)position, snapshot, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Stored new deduplication snapshot at {}", (Object)MessageDeduplication.this.topic.getName(), (Object)position);
                }
            }

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed to store new deduplication snapshot at {}", (Object)MessageDeduplication.this.topic.getName(), (Object)position);
            }
        }, null);
    }

    private CompletableFuture<Boolean> isDeduplicationEnabled() {
        DestinationName name = DestinationName.get((String)this.topic.getName());
        return this.pulsar.getConfigurationCache().policiesCache().getAsync(AdminResource.path("policies", name.getNamespace())).thenApply(policies -> {
            if (policies.isPresent() && ((Policies)policies.get()).deduplicationEnabled != null) {
                return ((Policies)policies.get()).deduplicationEnabled;
            }
            return this.pulsar.getConfiguration().isBrokerDeduplicationEnabled();
        });
    }

    public synchronized void producerAdded(String producerName) {
        this.inactiveProducers.remove(producerName);
    }

    public synchronized void producerRemoved(String producerName) {
        this.inactiveProducers.put(producerName, System.currentTimeMillis());
    }

    public synchronized void purgeInactiveProducers() {
        long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
        Iterator<Map.Entry<String, Long>> mapIterator = this.inactiveProducers.entrySet().iterator();
        while (mapIterator.hasNext()) {
            Map.Entry<String, Long> entry = mapIterator.next();
            String producerName = entry.getKey();
            long lastActiveTimestamp = entry.getValue();
            mapIterator.remove();
            if (lastActiveTimestamp >= minimumActiveTimestamp) continue;
            log.info("[{}] Purging dedup information for producer {}", (Object)this.topic.getName(), (Object)producerName);
            this.highestSequencedPushed.remove((Object)producerName);
            this.highestSequencedPersisted.remove((Object)producerName);
        }
    }

    public long getLastPublishedSequenceId(String producerName) {
        Long sequenceId = (Long)this.highestSequencedPushed.get((Object)producerName);
        return sequenceId != null ? sequenceId : -1L;
    }

    static enum Status {
        Disabled,
        Recovering,
        Enabled,
        Removing,
        Failed;

    }
}

