/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryDelayedDeliveryTracker
extends AbstractDelayedDeliveryTracker {
    private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTracker.class);
    protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
    @VisibleForTesting
    private final long fixedDelayDetectionLookahead;
    private long highestDeliveryTimeTracked = 0L;
    private boolean messagesHaveFixedDelay = true;

    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) {
        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
    }

    public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) {
        super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
    }

    @Override
    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
        if (deliverAt < 0L || deliverAt <= this.getCutoffTime()) {
            this.messagesHaveFixedDelay = false;
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", new Object[]{this.dispatcher.getName(), ledgerId, entryId, deliverAt - this.clock.millis()});
        }
        this.priorityQueue.add(deliverAt, ledgerId, entryId);
        this.updateTimer();
        this.checkAndUpdateHighest(deliverAt);
        return true;
    }

    private void checkAndUpdateHighest(long deliverAt) {
        if (deliverAt < this.highestDeliveryTimeTracked - this.tickTimeMillis) {
            this.messagesHaveFixedDelay = false;
        }
        this.highestDeliveryTimeTracked = Math.max(this.highestDeliveryTimeTracked, deliverAt);
    }

    @Override
    public boolean hasMessageAvailable() {
        boolean hasMessageAvailable;
        boolean bl = hasMessageAvailable = !this.priorityQueue.isEmpty() && this.priorityQueue.peekN1() <= this.getCutoffTime();
        if (!hasMessageAvailable) {
            this.updateTimer();
        }
        return hasMessageAvailable;
    }

    @Override
    public NavigableSet<Position> getScheduledMessages(int maxMessages) {
        long timestamp;
        TreeSet<Position> positions = new TreeSet<Position>();
        long cutoffTime = this.getCutoffTime();
        for (int n = maxMessages; n > 0 && !this.priorityQueue.isEmpty() && (timestamp = this.priorityQueue.peekN1()) <= cutoffTime; --n) {
            long ledgerId = this.priorityQueue.peekN2();
            long entryId = this.priorityQueue.peekN3();
            positions.add(PositionFactory.create((long)ledgerId, (long)entryId));
            this.priorityQueue.pop();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get scheduled messages - found {}", (Object)this.dispatcher.getName(), (Object)positions.size());
        }
        if (this.priorityQueue.isEmpty()) {
            this.highestDeliveryTimeTracked = 0L;
            this.messagesHaveFixedDelay = true;
        }
        this.updateTimer();
        return positions;
    }

    @Override
    public CompletableFuture<Void> clear() {
        this.priorityQueue.clear();
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public long getNumberOfDelayedMessages() {
        return this.priorityQueue.size();
    }

    @Override
    public long getBufferMemoryUsage() {
        return this.priorityQueue.bytesCapacity();
    }

    @Override
    public void close() {
        super.close();
        this.priorityQueue.close();
    }

    @Override
    public boolean shouldPauseAllDeliveries() {
        return this.fixedDelayDetectionLookahead > 0L && this.messagesHaveFixedDelay && this.getNumberOfDelayedMessages() >= this.fixedDelayDetectionLookahead && !this.hasMessageAvailable();
    }

    @Override
    protected long nextDeliveryTime() {
        return this.priorityQueue.peekN1();
    }

    public long getFixedDelayDetectionLookahead() {
        return this.fixedDelayDetectionLookahead;
    }
}

