/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed.bucket;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Table;
import com.google.common.collect.TreeRangeMap;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.Bucket;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedMessageIndexStats;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.CombinedSegmentDelayedIndexQueue;
import org.apache.pulsar.broker.delayed.bucket.DelayedIndexQueue;
import org.apache.pulsar.broker.delayed.bucket.ImmutableBucket;
import org.apache.pulsar.broker.delayed.bucket.MutableBucket;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class BucketDelayedDeliveryTracker
extends AbstractDelayedDeliveryTracker {
    private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTracker.class);
    public static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.internal.delayed.bucket";
    static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
    static final int AsyncOperationTimeoutSeconds = 60;
    private static final Long INVALID_BUCKET_ID = -1L;
    private static final int MAX_MERGE_NUM = 4;
    private final long minIndexCountPerBucket;
    private final long timeStepPerBucketSnapshotSegmentInMillis;
    private final int maxIndexesPerBucketSnapshotSegment;
    private final int maxNumBuckets;
    private volatile long numberDelayedMessages;
    @VisibleForTesting
    private final MutableBucket lastMutableBucket;
    @VisibleForTesting
    private final TripleLongPriorityQueue sharedBucketPriorityQueue;
    @VisibleForTesting
    private final RangeMap<Long, ImmutableBucket> immutableBuckets;
    private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;
    private final BucketDelayedMessageIndexStats stats;
    private CompletableFuture<Void> pendingLoad = null;

    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, BucketSnapshotStorage bucketSnapshotStorage, long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) throws RecoverDelayedDeliveryTrackerException {
        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
    }

    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict, BucketSnapshotStorage bucketSnapshotStorage, long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) throws RecoverDelayedDeliveryTrackerException {
        super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
        this.minIndexCountPerBucket = minIndexCountPerBucket;
        this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
        this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
        this.maxNumBuckets = maxNumBuckets;
        this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
        this.immutableBuckets = TreeRangeMap.create();
        this.snapshotSegmentLastIndexTable = HashBasedTable.create();
        this.lastMutableBucket = new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), (FutureUtil.Sequencer<Void>)FutureUtil.Sequencer.create(), bucketSnapshotStorage);
        this.stats = new BucketDelayedMessageIndexStats();
        try {
            this.numberDelayedMessages = this.recoverBucketSnapshot();
        }
        catch (RecoverDelayedDeliveryTrackerException e) {
            this.close();
            throw e;
        }
    }

    private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryTrackerException {
        ImmutableBucket immutableBucket;
        Range key2;
        ManagedCursor cursor = this.lastMutableBucket.getCursor();
        Map cursorProperties = cursor.getCursorProperties();
        if (MapUtils.isEmpty((Map)cursorProperties)) {
            log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", (Object)this.dispatcher.getName());
            return 0L;
        }
        FutureUtil.Sequencer<Void> sequencer = this.lastMutableBucket.getSequencer();
        HashMap<Range, ImmutableBucket> toBeDeletedBucketMap = new HashMap<Range, ImmutableBucket>();
        cursorProperties.keySet().forEach(key -> {
            if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
                String[] keys = key.split("_");
                Preconditions.checkArgument((keys.length == 3 ? 1 : 0) != 0);
                ImmutableBucket immutableBucket = new ImmutableBucket(this.dispatcher.getName(), cursor, sequencer, this.lastMutableBucket.bucketSnapshotStorage, Long.parseLong(keys[1]), Long.parseLong(keys[2]));
                this.putAndCleanOverlapRange((Range<Long>)Range.closed((Comparable)Long.valueOf(immutableBucket.startLedgerId), (Comparable)Long.valueOf(immutableBucket.endLedgerId)), immutableBucket, toBeDeletedBucketMap);
            }
        });
        Map immutableBucketMap = this.immutableBuckets.asMapOfRanges();
        if (immutableBucketMap.isEmpty()) {
            log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", (Object)this.dispatcher.getName());
            return 0L;
        }
        HashMap<Range, CompletableFuture<List<DelayedIndex>>> futures = new HashMap<Range, CompletableFuture<List<DelayedIndex>>>(immutableBucketMap.size());
        for (Map.Entry entry : immutableBucketMap.entrySet()) {
            key2 = (Range)entry.getKey();
            immutableBucket = (ImmutableBucket)entry.getValue();
            futures.put(key2, immutableBucket.asyncRecoverBucketSnapshotEntry(() -> this.getCutoffTime()));
        }
        try {
            FutureUtil.waitForAll(futures.values()).get(300L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("[{}] Failed to recover delayed message index bucket snapshot.", (Object)this.dispatcher.getName(), (Object)e);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RecoverDelayedDeliveryTrackerException(e);
        }
        for (Map.Entry entry : futures.entrySet()) {
            key2 = (Range)entry.getKey();
            List indexList = ((CompletableFuture)entry.getValue()).getNow(null);
            ImmutableBucket immutableBucket2 = (ImmutableBucket)immutableBucketMap.get(key2);
            if (CollectionUtils.isEmpty((Collection)indexList)) {
                toBeDeletedBucketMap.put(key2, immutableBucket2);
                continue;
            }
            DelayedIndex lastDelayedIndex = (DelayedIndex)indexList.get(indexList.size() - 1);
            this.snapshotSegmentLastIndexTable.put((Object)lastDelayedIndex.getLedgerId(), (Object)lastDelayedIndex.getEntryId(), (Object)immutableBucket2);
            for (DelayedIndex index : indexList) {
                this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId());
            }
        }
        for (Map.Entry mapEntry : toBeDeletedBucketMap.entrySet()) {
            key2 = (Range)mapEntry.getKey();
            immutableBucket = (ImmutableBucket)mapEntry.getValue();
            immutableBucketMap.remove(key2);
            immutableBucket.asyncDeleteBucketSnapshot(this.stats);
        }
        MutableLong numberDelayedMessages = new MutableLong(0L);
        immutableBucketMap.values().forEach(bucket -> numberDelayedMessages.add(bucket.numberBucketDelayedMessages));
        log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", new Object[]{this.dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue()});
        return numberDelayedMessages.getValue();
    }

    private synchronized void putAndCleanOverlapRange(Range<Long> range, ImmutableBucket immutableBucket, Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap) {
        RangeMap subRangeMap = this.immutableBuckets.subRangeMap(range);
        boolean canPut = false;
        if (!subRangeMap.asMapOfRanges().isEmpty()) {
            for (Map.Entry rangeEntry : subRangeMap.asMapOfRanges().entrySet()) {
                if (!range.encloses((Range)rangeEntry.getKey())) continue;
                toBeDeletedBucketMap.put((Range<Long>)((Range)rangeEntry.getKey()), (ImmutableBucket)rangeEntry.getValue());
                canPut = true;
            }
        } else {
            canPut = true;
        }
        if (canPut) {
            this.immutableBuckets.put(range, (Object)immutableBucket);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(Timeout timeout) throws Exception {
        BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
        synchronized (bucketDelayedDeliveryTracker) {
            if (timeout == null || timeout.isCancelled()) {
                return;
            }
            this.lastMutableBucket.moveScheduledMessageToSharedQueue(this.getCutoffTime(), this.sharedBucketPriorityQueue);
        }
        super.run(timeout);
    }

    private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
        if (this.immutableBuckets.asMapOfRanges().isEmpty()) {
            return Optional.empty();
        }
        return Optional.ofNullable((ImmutableBucket)this.immutableBuckets.get((Comparable)Long.valueOf(ledgerId)));
    }

    private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair, long startTime) {
        if (immutableBucketDelayedIndexPair != null) {
            ImmutableBucket immutableBucket = (ImmutableBucket)immutableBucketDelayedIndexPair.getLeft();
            this.immutableBuckets.put(Range.closed((Comparable)Long.valueOf(immutableBucket.startLedgerId), (Comparable)Long.valueOf(immutableBucket.endLedgerId)), (Object)immutableBucket);
            DelayedIndex lastDelayedIndex = (DelayedIndex)immutableBucketDelayedIndexPair.getRight();
            this.snapshotSegmentLastIndexTable.put((Object)lastDelayedIndex.getLedgerId(), (Object)lastDelayedIndex.getEntryId(), (Object)immutableBucket);
            immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
                CompletionStage future = createFuture.handle((bucketId, ex) -> {
                    if (ex == null) {
                        immutableBucket.setSnapshotSegments(null);
                        immutableBucket.asyncUpdateSnapshotLength();
                        log.info("[{}] Create bucket snapshot finish, bucketKey: {}", (Object)this.dispatcher.getName(), (Object)immutableBucket.bucketKey());
                        this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, System.currentTimeMillis() - startTime);
                        return bucketId;
                    }
                    log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", new Object[]{this.dispatcher.getName(), immutableBucket.bucketKey(), ex});
                    this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
                    BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
                    synchronized (bucketDelayedDeliveryTracker) {
                        immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
                            for (SnapshotSegment snapshotSegment : snapshotSegments) {
                                for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
                                    this.sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(), delayedIndex.getLedgerId(), delayedIndex.getEntryId());
                                }
                            }
                            immutableBucket.setSnapshotSegments(null);
                        });
                        immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
                        this.immutableBuckets.asMapOfRanges().remove(Range.closed((Comparable)Long.valueOf(immutableBucket.startLedgerId), (Comparable)Long.valueOf(immutableBucket.endLedgerId)));
                        this.snapshotSegmentLastIndexTable.remove((Object)lastDelayedIndex.getLedgerId(), (Object)lastDelayedIndex.getTimestamp());
                    }
                    return INVALID_BUCKET_ID;
                });
                immutableBucket.setSnapshotCreateFuture((CompletableFuture<Long>)future);
            });
        }
    }

    @Override
    public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) {
        if (this.containsMessage(ledgerId, entryId)) {
            return true;
        }
        if (deliverAt < 0L || deliverAt <= this.getCutoffTime()) {
            return false;
        }
        boolean existBucket = this.findImmutableBucket(ledgerId).isPresent();
        if (!existBucket && ledgerId > this.lastMutableBucket.endLedgerId && this.lastMutableBucket.size() >= this.minIndexCountPerBucket && !this.lastMutableBucket.isEmpty()) {
            long createStartTime = System.currentTimeMillis();
            this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
            Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair = this.lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis, this.maxIndexesPerBucketSnapshotSegment, this.sharedBucketPriorityQueue);
            this.afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
            this.lastMutableBucket.resetLastMutableBucketRange();
            if (this.maxNumBuckets > 0 && this.immutableBuckets.asMapOfRanges().size() > this.maxNumBuckets) {
                this.asyncMergeBucketSnapshot();
            }
        }
        if (ledgerId < this.lastMutableBucket.startLedgerId || existBucket) {
            this.sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
            this.lastMutableBucket.putIndexBit(ledgerId, entryId);
        } else {
            Preconditions.checkArgument((ledgerId >= this.lastMutableBucket.endLedgerId ? 1 : 0) != 0);
            this.lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
        }
        ++this.numberDelayedMessages;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", new Object[]{this.dispatcher.getName(), ledgerId, entryId, deliverAt - this.clock.millis()});
        }
        this.updateTimer();
        return true;
    }

    private synchronized List<ImmutableBucket> selectMergedBuckets(List<ImmutableBucket> values, int mergeNum) {
        Preconditions.checkArgument((mergeNum < values.size() ? 1 : 0) != 0);
        long minNumberMessages = Long.MAX_VALUE;
        long minScheduleTimestamp = Long.MAX_VALUE;
        int minIndex = -1;
        int i = 0;
        while (i + (mergeNum - 1) < values.size()) {
            long numberMessages;
            List<ImmutableBucket> immutableBuckets = values.subList(i, i + mergeNum);
            if (immutableBuckets.stream().allMatch(bucket -> bucket.lastSegmentEntryId > bucket.currentSegmentEntryId && !bucket.merging) && (numberMessages = immutableBuckets.stream().mapToLong(bucket -> bucket.numberBucketDelayedMessages).sum()) <= minNumberMessages) {
                minNumberMessages = numberMessages;
                long scheduleTimestamp = immutableBuckets.stream().mapToLong(bucket -> bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId + 1)).min().getAsLong();
                if (scheduleTimestamp < minScheduleTimestamp) {
                    minScheduleTimestamp = scheduleTimestamp;
                    minIndex = i;
                }
            }
            ++i;
        }
        if (minIndex >= 0) {
            return values.subList(minIndex, minIndex + mergeNum);
        }
        if (mergeNum > 2) {
            return this.selectMergedBuckets(values, mergeNum - 1);
        }
        return Collections.emptyList();
    }

    private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
        List<ImmutableBucket> immutableBucketList = this.immutableBuckets.asMapOfRanges().values().stream().toList();
        List<ImmutableBucket> toBeMergeImmutableBuckets = this.selectMergedBuckets(immutableBucketList, 4);
        if (toBeMergeImmutableBuckets.isEmpty()) {
            log.warn("[{}] Can't find able merged buckets", (Object)this.dispatcher.getName());
            return CompletableFuture.completedFuture(null);
        }
        String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(Collectors.joining(",")).replaceAll("#pulsar.internal.delayed.bucket_", "");
        if (log.isDebugEnabled()) {
            log.info("[{}] Merging bucket snapshot, bucketKeys: {}", (Object)this.dispatcher.getName(), (Object)bucketsStr);
        }
        for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
            immutableBucket.merging = true;
        }
        long mergeStartTime = System.currentTimeMillis();
        this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
        return this.asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
            BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
            synchronized (bucketDelayedDeliveryTracker) {
                for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
                    immutableBucket.merging = false;
                }
            }
            if (ex != null) {
                log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}", new Object[]{this.dispatcher.getName(), bucketsStr, ex});
                this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
            } else {
                log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}", new Object[]{this.dispatcher.getName(), bucketsStr, this.immutableBuckets.asMapOfRanges().size()});
                this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge, System.currentTimeMillis() - mergeStartTime);
            }
        });
    }

    private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<ImmutableBucket> buckets) {
        List<CompletableFuture> createFutures = buckets.stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
        return FutureUtil.waitForAll(createFutures).thenCompose(bucketId -> {
            if (createFutures.stream().anyMatch(future -> INVALID_BUCKET_ID.equals(future.join()))) {
                return FutureUtil.failedFuture((Throwable)new RuntimeException("Can't merge buckets due to bucket create failed"));
            }
            List<CompletableFuture> getRemainFutures = buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
            return ((CompletableFuture)FutureUtil.waitForAll(getRemainFutures).thenApply(__ -> CombinedSegmentDelayedIndexQueue.wrap(getRemainFutures.stream().map(CompletableFuture::join).toList()))).thenAccept(combinedDelayedIndexQueue -> {
                BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
                synchronized (bucketDelayedDeliveryTracker) {
                    long createStartTime = System.currentTimeMillis();
                    this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
                    Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair = this.lastMutableBucket.createImmutableBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis, this.maxIndexesPerBucketSnapshotSegment, this.sharedBucketPriorityQueue, (DelayedIndexQueue)combinedDelayedIndexQueue, ((ImmutableBucket)buckets.get((int)0)).startLedgerId, ((ImmutableBucket)buckets.get((int)(buckets.size() - 1))).endLedgerId);
                    HashMap<Long, RoaringBitmap> delayedIndexBitMap = new HashMap<Long, RoaringBitmap>(((ImmutableBucket)buckets.get(0)).getDelayedIndexBitMap());
                    for (int i = 1; i < buckets.size(); ++i) {
                        ((ImmutableBucket)buckets.get((int)i)).delayedIndexBitMap.forEach((ledgerId, bitMapB) -> delayedIndexBitMap.compute((Long)ledgerId, (k, bitMap) -> {
                            if (bitMap == null) {
                                return bitMapB;
                            }
                            bitMap.or(bitMapB);
                            return bitMap;
                        }));
                    }
                    delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
                    ((ImmutableBucket)immutableBucketDelayedIndexPair.getLeft()).setDelayedIndexBitMap(delayedIndexBitMap);
                    this.afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
                    ((ImmutableBucket)immutableBucketDelayedIndexPair.getLeft()).getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
                        List<CompletableFuture> removeFutures = buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(this.stats)).toList();
                        return FutureUtil.waitForAll(removeFutures);
                    });
                    for (ImmutableBucket bucket : buckets) {
                        this.immutableBuckets.asMapOfRanges().remove(Range.closed((Comparable)Long.valueOf(bucket.startLedgerId), (Comparable)Long.valueOf(bucket.endLedgerId)));
                    }
                }
            });
        });
    }

    @Override
    public synchronized boolean hasMessageAvailable() {
        boolean hasMessageAvailable;
        long cutoffTime = this.getCutoffTime();
        boolean bl = hasMessageAvailable = this.getNumberOfDelayedMessages() > 0L && this.nextDeliveryTime() <= cutoffTime;
        if (!hasMessageAvailable) {
            this.updateTimer();
        }
        return hasMessageAvailable;
    }

    @Override
    protected long nextDeliveryTime() {
        if (this.lastMutableBucket.isEmpty() && !this.sharedBucketPriorityQueue.isEmpty()) {
            return this.sharedBucketPriorityQueue.peekN1();
        }
        if (this.sharedBucketPriorityQueue.isEmpty() && !this.lastMutableBucket.isEmpty()) {
            return this.lastMutableBucket.nextDeliveryTime();
        }
        long timestamp = this.lastMutableBucket.nextDeliveryTime();
        long bucketTimestamp = this.sharedBucketPriorityQueue.peekN1();
        return Math.min(timestamp, bucketTimestamp);
    }

    @Override
    public long getNumberOfDelayedMessages() {
        return this.numberDelayedMessages;
    }

    @Override
    public long getBufferMemoryUsage() {
        return this.lastMutableBucket.getBufferMemoryUsage() + this.sharedBucketPriorityQueue.bytesCapacity();
    }

    @Override
    public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
        long timestamp;
        if (!this.checkPendingLoadDone()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", (Object)this.dispatcher.getName());
            }
            return Collections.emptyNavigableSet();
        }
        long cutoffTime = this.getCutoffTime();
        this.lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, this.sharedBucketPriorityQueue);
        TreeSet<PositionImpl> positions = new TreeSet<PositionImpl>();
        int n = maxMessages;
        while (n > 0 && !this.sharedBucketPriorityQueue.isEmpty() && (timestamp = this.sharedBucketPriorityQueue.peekN1()) <= cutoffTime) {
            long ledgerId = this.sharedBucketPriorityQueue.peekN2();
            long entryId = this.sharedBucketPriorityQueue.peekN3();
            ImmutableBucket bucket = (ImmutableBucket)this.snapshotSegmentLastIndexTable.get((Object)ledgerId, (Object)entryId);
            if (bucket != null && this.immutableBuckets.asMapOfRanges().containsValue(bucket)) {
                boolean createFutureDone;
                if (bucket.merging) {
                    log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", (Object)this.dispatcher.getName(), (Object)bucket.bucketKey());
                    break;
                }
                int preSegmentEntryId = bucket.currentSegmentEntryId;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", new Object[]{this.dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1});
                }
                if (!(createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone())) {
                    log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", (Object)this.dispatcher.getName(), (Object)bucket.bucketKey());
                    break;
                }
                long loadStartTime = System.currentTimeMillis();
                this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
                this.pendingLoad = ((CompletableFuture)bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
                    BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
                    synchronized (bucketDelayedDeliveryTracker) {
                        this.snapshotSegmentLastIndexTable.remove((Object)ledgerId, (Object)entryId);
                        if (CollectionUtils.isEmpty((Collection)indexList)) {
                            this.immutableBuckets.asMapOfRanges().remove(Range.closed((Comparable)Long.valueOf(bucket.startLedgerId), (Comparable)Long.valueOf(bucket.endLedgerId)));
                            bucket.asyncDeleteBucketSnapshot(this.stats);
                            return;
                        }
                        DelayedIndex lastDelayedIndex = (DelayedIndex)indexList.get(indexList.size() - 1);
                        this.snapshotSegmentLastIndexTable.put((Object)lastDelayedIndex.getLedgerId(), (Object)lastDelayedIndex.getEntryId(), (Object)bucket);
                        for (DelayedIndex index : indexList) {
                            this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId());
                        }
                    }
                })).whenComplete((__, ex) -> {
                    if (ex != null) {
                        bucket.setCurrentSegmentEntryId(preSegmentEntryId);
                        log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", new Object[]{this.dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex});
                        this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
                    } else {
                        log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", new Object[]{this.dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId == bucket.lastSegmentEntryId ? "-1" : Integer.valueOf(preSegmentEntryId + 1)});
                        this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, System.currentTimeMillis() - loadStartTime);
                    }
                    BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker = this;
                    synchronized (bucketDelayedDeliveryTracker) {
                        if (this.timeout != null) {
                            this.timeout.cancel();
                        }
                        this.timeout = this.timer.newTimeout((TimerTask)this, 0L, TimeUnit.MILLISECONDS);
                    }
                });
                CompletionStage loadFuture = this.pendingLoad;
                if (!this.checkPendingLoadDone() || ((CompletableFuture)loadFuture).isCompletedExceptionally()) break;
            }
            positions.add(new PositionImpl(ledgerId, entryId));
            this.sharedBucketPriorityQueue.pop();
            this.removeIndexBit(ledgerId, entryId);
            --n;
            --this.numberDelayedMessages;
        }
        this.updateTimer();
        return positions;
    }

    private synchronized boolean checkPendingLoadDone() {
        if (this.pendingLoad == null || this.pendingLoad.isDone()) {
            this.pendingLoad = null;
            return true;
        }
        return false;
    }

    @Override
    public boolean shouldPauseAllDeliveries() {
        return false;
    }

    @Override
    public synchronized CompletableFuture<Void> clear() {
        CompletableFuture<Void> future = this.cleanImmutableBuckets();
        this.sharedBucketPriorityQueue.clear();
        this.lastMutableBucket.clear();
        this.snapshotSegmentLastIndexTable.clear();
        this.numberDelayedMessages = 0L;
        return future;
    }

    @Override
    public synchronized void close() {
        super.close();
        this.lastMutableBucket.close();
        this.sharedBucketPriorityQueue.close();
        try {
            List<CompletableFuture> completableFutures = this.immutableBuckets.asMapOfRanges().values().stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
            FutureUtil.waitForAll(completableFutures).get(60L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.warn("[{}] Failed wait to snapshot generate", (Object)this.dispatcher.getName(), (Object)e);
        }
    }

    private CompletableFuture<Void> cleanImmutableBuckets() {
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        Iterator iterator = this.immutableBuckets.asMapOfRanges().values().iterator();
        while (iterator.hasNext()) {
            ImmutableBucket bucket = (ImmutableBucket)iterator.next();
            futures.add(bucket.clear(this.stats));
            this.numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
            iterator.remove();
        }
        return FutureUtil.waitForAll(futures);
    }

    private boolean removeIndexBit(long ledgerId, long entryId) {
        if (this.lastMutableBucket.removeIndexBit(ledgerId, entryId)) {
            return true;
        }
        return this.findImmutableBucket(ledgerId).map(bucket -> bucket.removeIndexBit(ledgerId, entryId)).orElse(false);
    }

    public boolean containsMessage(long ledgerId, long entryId) {
        if (this.lastMutableBucket.containsMessage(ledgerId, entryId)) {
            return true;
        }
        return this.findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId)).orElse(false);
    }

    public Map<String, TopicMetricBean> genTopicMetricMap() {
        this.stats.recordNumOfBuckets(this.immutableBuckets.asMapOfRanges().size() + 1);
        this.stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
        MutableLong totalSnapshotLength = new MutableLong();
        this.immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> totalSnapshotLength.add(immutableBucket.getSnapshotLength()));
        this.stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
        return this.stats.genTopicMetricMap();
    }

    public MutableBucket getLastMutableBucket() {
        return this.lastMutableBucket;
    }

    public TripleLongPriorityQueue getSharedBucketPriorityQueue() {
        return this.sharedBucketPriorityQueue;
    }

    public RangeMap<Long, ImmutableBucket> getImmutableBuckets() {
        return this.immutableBuckets;
    }
}

