package org.apache.pulsar.broker.delayed.bucket;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Table;
import com.google.common.collect.TreeRangeMap;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedMessageIndexStats;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.class */
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {
    public static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.internal.delayed.bucket";
    static final int AsyncOperationTimeoutSeconds = 60;
    private static final int MAX_MERGE_NUM = 4;
    private final long minIndexCountPerBucket;
    private final long timeStepPerBucketSnapshotSegmentInMillis;
    private final int maxIndexesPerBucketSnapshotSegment;
    private final int maxNumBuckets;
    private volatile long numberDelayedMessages;

    @VisibleForTesting
    private final MutableBucket lastMutableBucket;

    @VisibleForTesting
    private final TripleLongPriorityQueue sharedBucketPriorityQueue;

    @VisibleForTesting
    private final RangeMap<Long, ImmutableBucket> immutableBuckets;
    private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;
    private final BucketDelayedMessageIndexStats stats;
    private CompletableFuture<Void> pendingLoad;
    private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTracker.class);
    static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
    private static final Long INVALID_BUCKET_ID = -1L;

    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers, Timer timer, long j, boolean z, BucketSnapshotStorage bucketSnapshotStorage, long j2, long j3, int i, int i2) {
        this(persistentDispatcherMultipleConsumers, timer, j, Clock.systemUTC(), z, bucketSnapshotStorage, j2, j3, i, i2);
    }

    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers, Timer timer, long j, Clock clock, boolean z, BucketSnapshotStorage bucketSnapshotStorage, long j2, long j3, int i, int i2) {
        super(persistentDispatcherMultipleConsumers, timer, j, clock, z);
        this.pendingLoad = null;
        this.minIndexCountPerBucket = j2;
        this.timeStepPerBucketSnapshotSegmentInMillis = j3;
        this.maxIndexesPerBucketSnapshotSegment = i;
        this.maxNumBuckets = i2;
        this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
        this.immutableBuckets = TreeRangeMap.create();
        this.snapshotSegmentLastIndexTable = HashBasedTable.create();
        this.lastMutableBucket = new MutableBucket(persistentDispatcherMultipleConsumers.getName(), persistentDispatcherMultipleConsumers.getCursor(), FutureUtil.Sequencer.create(), bucketSnapshotStorage);
        this.stats = new BucketDelayedMessageIndexStats();
        this.numberDelayedMessages = recoverBucketSnapshot();
    }

    private synchronized long recoverBucketSnapshot() throws RuntimeException {
        ManagedCursor cursor = this.lastMutableBucket.getCursor();
        FutureUtil.Sequencer<Void> sequencer = this.lastMutableBucket.getSequencer();
        HashMap hashMap = new HashMap();
        cursor.getCursorProperties().keySet().forEach(str -> {
            if (str.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
                String[] split = str.split(ClusterReplicationMetrics.SEPARATOR);
                Preconditions.checkArgument(split.length == 3);
                ImmutableBucket immutableBucket = new ImmutableBucket(this.dispatcher.getName(), cursor, sequencer, this.lastMutableBucket.bucketSnapshotStorage, Long.parseLong(split[1]), Long.parseLong(split[2]));
                putAndCleanOverlapRange(Range.closed(Long.valueOf(immutableBucket.startLedgerId), Long.valueOf(immutableBucket.endLedgerId)), immutableBucket, hashMap);
            }
        });
        Map asMapOfRanges = this.immutableBuckets.asMapOfRanges();
        if (asMapOfRanges.isEmpty()) {
            log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", this.dispatcher.getName());
            return 0L;
        }
        HashMap hashMap2 = new HashMap(asMapOfRanges.size());
        for (Map.Entry entry : asMapOfRanges.entrySet()) {
            hashMap2.put((Range) entry.getKey(), ((ImmutableBucket) entry.getValue()).asyncRecoverBucketSnapshotEntry(() -> {
                return Long.valueOf(this.getCutoffTime());
            }));
        }
        try {
            FutureUtil.waitForAll(hashMap2.values()).get(120L, TimeUnit.SECONDS);
            for (Map.Entry entry2 : hashMap2.entrySet()) {
                Range range = (Range) entry2.getKey();
                List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> list = (List) ((CompletableFuture) entry2.getValue()).getNow(null);
                ImmutableBucket immutableBucket = (ImmutableBucket) asMapOfRanges.get(range);
                if (CollectionUtils.isEmpty(list)) {
                    hashMap.put(range, immutableBucket);
                } else {
                    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex) list.get(list.size() - 1);
                    this.snapshotSegmentLastIndexTable.put(Long.valueOf(delayedIndex.getLedgerId()), Long.valueOf(delayedIndex.getEntryId()), immutableBucket);
                    for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex2 : list) {
                        this.sharedBucketPriorityQueue.add(delayedIndex2.getTimestamp(), delayedIndex2.getLedgerId(), delayedIndex2.getEntryId());
                    }
                }
            }
            for (Map.Entry entry3 : hashMap.entrySet()) {
                Range range2 = (Range) entry3.getKey();
                ImmutableBucket immutableBucket2 = (ImmutableBucket) entry3.getValue();
                asMapOfRanges.remove(range2);
                immutableBucket2.asyncDeleteBucketSnapshot(this.stats);
            }
            MutableLong mutableLong = new MutableLong(0L);
            asMapOfRanges.values().forEach(immutableBucket3 -> {
                mutableLong.add(immutableBucket3.numberBucketDelayedMessages);
            });
            log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", new Object[]{this.dispatcher.getName(), Integer.valueOf(asMapOfRanges.size()), mutableLong.getValue()});
            return mutableLong.getValue().longValue();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("[{}] Failed to recover delayed message index bucket snapshot.", this.dispatcher.getName(), e);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(e);
        }
    }

    private synchronized void putAndCleanOverlapRange(Range<Long> range, ImmutableBucket immutableBucket, Map<Range<Long>, ImmutableBucket> map) {
        RangeMap subRangeMap = this.immutableBuckets.subRangeMap(range);
        boolean z = false;
        if (subRangeMap.asMapOfRanges().isEmpty()) {
            z = true;
        } else {
            for (Map.Entry entry : subRangeMap.asMapOfRanges().entrySet()) {
                if (range.encloses((Range) entry.getKey())) {
                    map.put((Range) entry.getKey(), (ImmutableBucket) entry.getValue());
                    z = true;
                }
            }
        }
        if (z) {
            this.immutableBuckets.put(range, immutableBucket);
        }
    }

    @Override // org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker
    public void run(Timeout timeout) throws Exception {
        synchronized (this) {
            if (timeout != null) {
                if (!timeout.isCancelled()) {
                    this.lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), this.sharedBucketPriorityQueue);
                    super.run(timeout);
                }
            }
        }
    }

    private Optional<ImmutableBucket> findImmutableBucket(long j) {
        return this.immutableBuckets.asMapOfRanges().isEmpty() ? Optional.empty() : Optional.ofNullable((ImmutableBucket) this.immutableBuckets.get(Long.valueOf(j)));
    }

    private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> pair, long j) {
        if (pair != null) {
            ImmutableBucket immutableBucket = (ImmutableBucket) pair.getLeft();
            this.immutableBuckets.put(Range.closed(Long.valueOf(immutableBucket.startLedgerId), Long.valueOf(immutableBucket.endLedgerId)), immutableBucket);
            DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex) pair.getRight();
            this.snapshotSegmentLastIndexTable.put(Long.valueOf(delayedIndex.getLedgerId()), Long.valueOf(delayedIndex.getEntryId()), immutableBucket);
            immutableBucket.getSnapshotCreateFuture().ifPresent(completableFuture -> {
                immutableBucket.setSnapshotCreateFuture(completableFuture.handle((l, th) -> {
                    if (th == null) {
                        immutableBucket.setSnapshotSegments(null);
                        immutableBucket.asyncUpdateSnapshotLength();
                        log.info("[{}] Create bucket snapshot finish, bucketKey: {}", this.dispatcher.getName(), immutableBucket.bucketKey());
                        this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, System.currentTimeMillis() - j);
                        return l;
                    }
                    log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", new Object[]{this.dispatcher.getName(), immutableBucket.bucketKey(), th});
                    this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
                    synchronized (this) {
                        immutableBucket.getSnapshotSegments().ifPresent(list -> {
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex2 : ((DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment) it.next()).getIndexesList()) {
                                    this.sharedBucketPriorityQueue.add(delayedIndex2.getTimestamp(), delayedIndex2.getLedgerId(), delayedIndex2.getEntryId());
                                }
                            }
                            immutableBucket.setSnapshotSegments(null);
                        });
                        immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
                        this.immutableBuckets.asMapOfRanges().remove(Range.closed(Long.valueOf(immutableBucket.startLedgerId), Long.valueOf(immutableBucket.endLedgerId)));
                        this.snapshotSegmentLastIndexTable.remove(Long.valueOf(delayedIndex.getLedgerId()), Long.valueOf(delayedIndex.getTimestamp()));
                    }
                    return INVALID_BUCKET_ID;
                }));
            });
        }
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public synchronized boolean addMessage(long j, long j2, long j3) {
        if (containsMessage(j, j2)) {
            return true;
        }
        if (j3 < 0 || j3 <= getCutoffTime()) {
            return false;
        }
        boolean isPresent = findImmutableBucket(j).isPresent();
        if (!isPresent && j > this.lastMutableBucket.endLedgerId && this.lastMutableBucket.size() >= this.minIndexCountPerBucket && !this.lastMutableBucket.isEmpty()) {
            long currentTimeMillis = System.currentTimeMillis();
            this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
            afterCreateImmutableBucket(this.lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis, this.maxIndexesPerBucketSnapshotSegment, this.sharedBucketPriorityQueue), currentTimeMillis);
            this.lastMutableBucket.resetLastMutableBucketRange();
            if (this.maxNumBuckets > 0 && this.immutableBuckets.asMapOfRanges().size() > this.maxNumBuckets) {
                asyncMergeBucketSnapshot();
            }
        }
        if (j < this.lastMutableBucket.startLedgerId || isPresent) {
            this.sharedBucketPriorityQueue.add(j3, j, j2);
        } else {
            Preconditions.checkArgument(j >= this.lastMutableBucket.endLedgerId);
            this.lastMutableBucket.addMessage(j, j2, j3);
        }
        this.numberDelayedMessages++;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", new Object[]{this.dispatcher.getName(), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3 - this.clock.millis())});
        }
        updateTimer();
        return true;
    }

    private synchronized List<ImmutableBucket> selectMergedBuckets(List<ImmutableBucket> list, int i) {
        Preconditions.checkArgument(i < list.size());
        long j = Long.MAX_VALUE;
        long j2 = Long.MAX_VALUE;
        int i2 = -1;
        for (int i3 = 0; i3 + (i - 1) < list.size(); i3++) {
            List<ImmutableBucket> subList = list.subList(i3, i3 + i);
            if (subList.stream().allMatch(immutableBucket -> {
                return immutableBucket.lastSegmentEntryId > immutableBucket.currentSegmentEntryId && !immutableBucket.merging;
            })) {
                long sum = subList.stream().mapToLong(immutableBucket2 -> {
                    return immutableBucket2.numberBucketDelayedMessages;
                }).sum();
                if (sum <= j) {
                    j = sum;
                    long asLong = subList.stream().mapToLong(immutableBucket3 -> {
                        return immutableBucket3.firstScheduleTimestamps.get(immutableBucket3.currentSegmentEntryId + 1).longValue();
                    }).min().getAsLong();
                    if (asLong < j2) {
                        j2 = asLong;
                        i2 = i3;
                    }
                }
            }
        }
        return i2 >= 0 ? list.subList(i2, i2 + i) : i > 2 ? selectMergedBuckets(list, i - 1) : Collections.emptyList();
    }

    private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
        List<ImmutableBucket> selectMergedBuckets = selectMergedBuckets(this.immutableBuckets.asMapOfRanges().values().stream().toList(), 4);
        if (selectMergedBuckets.isEmpty()) {
            log.warn("[{}] Can't find able merged buckets", this.dispatcher.getName());
            return CompletableFuture.completedFuture(null);
        }
        String replaceAll = ((String) selectMergedBuckets.stream().map((v0) -> {
            return v0.bucketKey();
        }).collect(Collectors.joining(","))).replaceAll("#pulsar.internal.delayed.bucket_", "");
        if (log.isDebugEnabled()) {
            log.info("[{}] Merging bucket snapshot, bucketKeys: {}", this.dispatcher.getName(), replaceAll);
        }
        Iterator<ImmutableBucket> it = selectMergedBuckets.iterator();
        while (it.hasNext()) {
            it.next().merging = true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
        return asyncMergeBucketSnapshot(selectMergedBuckets).whenComplete((r12, th) -> {
            synchronized (this) {
                Iterator it2 = selectMergedBuckets.iterator();
                while (it2.hasNext()) {
                    ((ImmutableBucket) it2.next()).merging = false;
                }
            }
            if (th != null) {
                log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}", new Object[]{this.dispatcher.getName(), replaceAll, th});
                this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
            } else {
                log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}", new Object[]{this.dispatcher.getName(), replaceAll, Integer.valueOf(this.immutableBuckets.asMapOfRanges().size())});
                this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge, System.currentTimeMillis() - currentTimeMillis);
            }
        });
    }

    private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<ImmutableBucket> list) {
        List list2 = list.stream().map(immutableBucket -> {
            return immutableBucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
        }).toList();
        return FutureUtil.waitForAll(list2).thenCompose(r7 -> {
            if (list2.stream().anyMatch(completableFuture -> {
                return INVALID_BUCKET_ID.equals(completableFuture.join());
            })) {
                return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
            }
            List list3 = list.stream().map((v0) -> {
                return v0.getRemainSnapshotSegment();
            }).toList();
            return FutureUtil.waitForAll(list3).thenApply(r4 -> {
                return CombinedSegmentDelayedIndexQueue.wrap(list3.stream().map((v0) -> {
                    return v0.join();
                }).toList());
            }).thenAccept(combinedSegmentDelayedIndexQueue -> {
                synchronized (this) {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
                    Pair<ImmutableBucket, DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> createImmutableBucketAndAsyncPersistent = this.lastMutableBucket.createImmutableBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis, this.maxIndexesPerBucketSnapshotSegment, this.sharedBucketPriorityQueue, combinedSegmentDelayedIndexQueue, ((ImmutableBucket) list.get(0)).startLedgerId, ((ImmutableBucket) list.get(list.size() - 1)).endLedgerId);
                    HashMap hashMap = new HashMap(((ImmutableBucket) list.get(0)).getDelayedIndexBitMap());
                    for (int i = 1; i < list.size(); i++) {
                        ((ImmutableBucket) list.get(i)).delayedIndexBitMap.forEach((l, roaringBitmap) -> {
                            hashMap.compute(l, (l, roaringBitmap) -> {
                                if (roaringBitmap == null) {
                                    return roaringBitmap;
                                }
                                roaringBitmap.or(roaringBitmap);
                                return roaringBitmap;
                            });
                        });
                    }
                    ((ImmutableBucket) createImmutableBucketAndAsyncPersistent.getLeft()).setDelayedIndexBitMap(hashMap);
                    afterCreateImmutableBucket(createImmutableBucketAndAsyncPersistent, currentTimeMillis);
                    ((ImmutableBucket) createImmutableBucketAndAsyncPersistent.getLeft()).getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).thenCompose(l2 -> {
                        return FutureUtil.waitForAll(list.stream().map(immutableBucket2 -> {
                            return immutableBucket2.asyncDeleteBucketSnapshot(this.stats);
                        }).toList());
                    });
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        ImmutableBucket immutableBucket2 = (ImmutableBucket) it.next();
                        this.immutableBuckets.asMapOfRanges().remove(Range.closed(Long.valueOf(immutableBucket2.startLedgerId), Long.valueOf(immutableBucket2.endLedgerId)));
                    }
                }
            });
        });
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public synchronized boolean hasMessageAvailable() {
        boolean z = getNumberOfDelayedMessages() > 0 && nextDeliveryTime() <= getCutoffTime();
        if (!z) {
            updateTimer();
        }
        return z;
    }

    @Override // org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker
    protected long nextDeliveryTime() {
        return (!this.lastMutableBucket.isEmpty() || this.sharedBucketPriorityQueue.isEmpty()) ? (!this.sharedBucketPriorityQueue.isEmpty() || this.lastMutableBucket.isEmpty()) ? Math.min(this.lastMutableBucket.nextDeliveryTime(), this.sharedBucketPriorityQueue.peekN1()) : this.lastMutableBucket.nextDeliveryTime() : this.sharedBucketPriorityQueue.peekN1();
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public long getNumberOfDelayedMessages() {
        return this.numberDelayedMessages;
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public long getBufferMemoryUsage() {
        return this.lastMutableBucket.getBufferMemoryUsage() + this.sharedBucketPriorityQueue.bytesCapacity();
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public synchronized NavigableSet<PositionImpl> getScheduledMessages(int i) {
        if (!checkPendingOpDone()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", this.dispatcher.getName());
            }
            return Collections.emptyNavigableSet();
        }
        long cutoffTime = getCutoffTime();
        this.lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, this.sharedBucketPriorityQueue);
        TreeSet treeSet = new TreeSet();
        int i2 = i;
        while (true) {
            if (i2 <= 0 || this.sharedBucketPriorityQueue.isEmpty() || this.sharedBucketPriorityQueue.peekN1() > cutoffTime) {
                break;
            }
            long peekN2 = this.sharedBucketPriorityQueue.peekN2();
            long peekN3 = this.sharedBucketPriorityQueue.peekN3();
            ImmutableBucket immutableBucket = (ImmutableBucket) this.snapshotSegmentLastIndexTable.get(Long.valueOf(peekN2), Long.valueOf(peekN3));
            if (immutableBucket != null && this.immutableBuckets.asMapOfRanges().containsValue(immutableBucket)) {
                if (!immutableBucket.merging) {
                    int i3 = immutableBucket.currentSegmentEntryId;
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", new Object[]{this.dispatcher.getName(), immutableBucket.bucketKey(), Integer.valueOf(i3 + 1)});
                    }
                    if (!immutableBucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) {
                        log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", this.dispatcher.getName(), immutableBucket.bucketKey());
                        break;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    this.stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
                    CompletableFuture<Void> whenComplete = immutableBucket.asyncLoadNextBucketSnapshotEntry().thenAccept(list -> {
                        synchronized (this) {
                            this.snapshotSegmentLastIndexTable.remove(Long.valueOf(peekN2), Long.valueOf(peekN3));
                            if (CollectionUtils.isEmpty(list)) {
                                this.immutableBuckets.asMapOfRanges().remove(Range.closed(Long.valueOf(immutableBucket.startLedgerId), Long.valueOf(immutableBucket.endLedgerId)));
                                immutableBucket.asyncDeleteBucketSnapshot(this.stats);
                                return;
                            }
                            DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex) list.get(list.size() - 1);
                            this.snapshotSegmentLastIndexTable.put(Long.valueOf(delayedIndex.getLedgerId()), Long.valueOf(delayedIndex.getEntryId()), immutableBucket);
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex2 = (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex) it.next();
                                this.sharedBucketPriorityQueue.add(delayedIndex2.getTimestamp(), delayedIndex2.getLedgerId(), delayedIndex2.getEntryId());
                            }
                        }
                    }).whenComplete((r13, th) -> {
                        if (th != null) {
                            immutableBucket.setCurrentSegmentEntryId(i3);
                            log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", new Object[]{this.dispatcher.getName(), immutableBucket.bucketKey(), Integer.valueOf(i3 + 1), th});
                            this.stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
                        } else {
                            Logger logger = log;
                            Object[] objArr = new Object[3];
                            objArr[0] = this.dispatcher.getName();
                            objArr[1] = immutableBucket.bucketKey();
                            objArr[2] = i3 == immutableBucket.lastSegmentEntryId ? "-1" : Integer.valueOf(i3 + 1);
                            logger.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", objArr);
                            this.stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, System.currentTimeMillis() - currentTimeMillis);
                        }
                        synchronized (this) {
                            if (this.timeout != null) {
                                this.timeout.cancel();
                            }
                            this.timeout = this.timer.newTimeout(this, this.tickTimeMillis, TimeUnit.MILLISECONDS);
                        }
                    });
                    this.pendingLoad = whenComplete;
                    if (!checkPendingOpDone() || whenComplete.isCompletedExceptionally()) {
                        break;
                    }
                } else {
                    log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", this.dispatcher.getName(), immutableBucket.bucketKey());
                    break;
                }
            }
            treeSet.add(new PositionImpl(peekN2, peekN3));
            this.sharedBucketPriorityQueue.pop();
            removeIndexBit(peekN2, peekN3);
            i2--;
            this.numberDelayedMessages--;
        }
        updateTimer();
        return treeSet;
    }

    private synchronized boolean checkPendingOpDone() {
        if (this.pendingLoad != null && !this.pendingLoad.isDone()) {
            return false;
        }
        this.pendingLoad = null;
        return true;
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public boolean shouldPauseAllDeliveries() {
        return false;
    }

    @Override // org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    public synchronized CompletableFuture<Void> clear() {
        CompletableFuture<Void> cleanImmutableBuckets = cleanImmutableBuckets();
        this.sharedBucketPriorityQueue.clear();
        this.lastMutableBucket.clear();
        this.snapshotSegmentLastIndexTable.clear();
        this.numberDelayedMessages = 0L;
        return cleanImmutableBuckets;
    }

    @Override // org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker, org.apache.pulsar.broker.delayed.DelayedDeliveryTracker, java.lang.AutoCloseable
    public synchronized void close() {
        super.close();
        this.lastMutableBucket.close();
        this.sharedBucketPriorityQueue.close();
        try {
            FutureUtil.waitForAll(this.immutableBuckets.asMapOfRanges().values().stream().map(immutableBucket -> {
                return immutableBucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
            }).toList()).get(60L, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.warn("[{}] Failed wait to snapshot generate", this.dispatcher.getName(), e);
        }
    }

    private CompletableFuture<Void> cleanImmutableBuckets() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.immutableBuckets.asMapOfRanges().values().iterator();
        while (it.hasNext()) {
            ImmutableBucket immutableBucket = (ImmutableBucket) it.next();
            arrayList.add(immutableBucket.clear(this.stats));
            this.numberDelayedMessages -= immutableBucket.getNumberBucketDelayedMessages();
            it.remove();
        }
        return FutureUtil.waitForAll(arrayList);
    }

    private boolean removeIndexBit(long j, long j2) {
        if (this.lastMutableBucket.removeIndexBit(j, j2)) {
            return true;
        }
        return ((Boolean) findImmutableBucket(j).map(immutableBucket -> {
            return Boolean.valueOf(immutableBucket.removeIndexBit(j, j2));
        }).orElse(false)).booleanValue();
    }

    public boolean containsMessage(long j, long j2) {
        if (this.lastMutableBucket.containsMessage(j, j2)) {
            return true;
        }
        return ((Boolean) findImmutableBucket(j).map(immutableBucket -> {
            return Boolean.valueOf(immutableBucket.containsMessage(j, j2));
        }).orElse(false)).booleanValue();
    }

    public Map<String, TopicMetricBean> genTopicMetricMap() {
        this.stats.recordNumOfBuckets(this.immutableBuckets.asMapOfRanges().size() + 1);
        this.stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
        MutableLong mutableLong = new MutableLong();
        this.immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
            mutableLong.add(immutableBucket.getSnapshotLength());
        });
        this.stats.recordBucketSnapshotSizeBytes(mutableLong.longValue());
        return this.stats.genTopicMetricMap();
    }

    public MutableBucket getLastMutableBucket() {
        return this.lastMutableBucket;
    }

    public TripleLongPriorityQueue getSharedBucketPriorityQueue() {
        return this.sharedBucketPriorityQueue;
    }

    public RangeMap<Long, ImmutableBucket> getImmutableBuckets() {
        return this.immutableBuckets;
    }
}
