/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed;

import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryDelayedDeliveryTracker
implements DelayedDeliveryTracker,
TimerTask {
    private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTracker.class);
    private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
    private final PersistentDispatcherMultipleConsumers dispatcher;
    private final Timer timer;
    private Timeout timeout;
    private long currentTimeoutTarget;
    private final long tickTimeMillis;
    private final Clock clock;

    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
    }

    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, Clock clock) {
        this.dispatcher = dispatcher;
        this.timer = timer;
        this.tickTimeMillis = tickTimeMillis;
        this.clock = clock;
    }

    @Override
    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
        long now = this.clock.millis();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", new Object[]{this.dispatcher.getName(), ledgerId, entryId, deliveryAt - now});
        }
        if (deliveryAt < now + this.tickTimeMillis) {
            return false;
        }
        this.priorityQueue.add(deliveryAt, ledgerId, entryId);
        this.updateTimer();
        return true;
    }

    @Override
    public boolean hasMessageAvailable() {
        return !this.priorityQueue.isEmpty() && this.priorityQueue.peekN1() <= this.clock.millis();
    }

    @Override
    public Set<PositionImpl> getScheduledMessages(int maxMessages) {
        long timestamp;
        TreeSet<PositionImpl> positions = new TreeSet<PositionImpl>();
        long now = this.clock.millis();
        long cutoffTime = now + this.tickTimeMillis;
        for (int n = maxMessages; n > 0 && !this.priorityQueue.isEmpty() && (timestamp = this.priorityQueue.peekN1()) <= cutoffTime; --n) {
            long ledgerId = this.priorityQueue.peekN2();
            long entryId = this.priorityQueue.peekN3();
            positions.add(new PositionImpl(ledgerId, entryId));
            this.priorityQueue.pop();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get scheduled messages - found {}", (Object)this.dispatcher.getName(), (Object)positions.size());
        }
        this.updateTimer();
        return positions;
    }

    @Override
    public long getNumberOfDelayedMessages() {
        return this.priorityQueue.size();
    }

    private void updateTimer() {
        if (this.priorityQueue.isEmpty()) {
            if (this.timeout != null) {
                this.currentTimeoutTarget = -1L;
                this.timeout.cancel();
                this.timeout = null;
            }
            return;
        }
        long timestamp = this.priorityQueue.peekN1();
        if (timestamp == this.currentTimeoutTarget) {
            return;
        }
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        long delayMillis = timestamp - this.clock.millis();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Start timer in {} millis", (Object)this.dispatcher.getName(), (Object)delayMillis);
        }
        if (delayMillis < 0L) {
            return;
        }
        this.currentTimeoutTarget = timestamp;
        this.timeout = this.timer.newTimeout((TimerTask)this, delayMillis, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(Timeout timeout) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Timer triggered", (Object)this.dispatcher.getName());
        }
        if (timeout.isCancelled()) {
            return;
        }
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this.dispatcher;
        synchronized (persistentDispatcherMultipleConsumers) {
            this.currentTimeoutTarget = -1L;
            timeout = null;
            this.dispatcher.readMoreEntries();
        }
    }

    @Override
    public void close() {
        this.priorityQueue.close();
        if (this.timeout != null) {
            this.timeout.cancel();
        }
    }
}

