/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed.bucket;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.bucket.Bucket;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.DelayedIndexQueue;
import org.apache.pulsar.broker.delayed.bucket.ImmutableBucket;
import org.apache.pulsar.broker.delayed.bucket.TripleLongPriorityDelayedIndexQueue;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MutableBucket
extends Bucket
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MutableBucket.class);
    private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();

    MutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer<Void> sequencer, BucketSnapshotStorage bucketSnapshotStorage) {
        super(dispatcherName, cursor, sequencer, bucketSnapshotStorage, -1L, -1L);
    }

    Pair<ImmutableBucket, DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> sealBucketAndAsyncPersistent(long timeStepPerBucketSnapshotSegment, int maxIndexesPerBucketSnapshotSegment, TripleLongPriorityQueue sharedQueue) {
        return this.createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, maxIndexesPerBucketSnapshotSegment, sharedQueue, TripleLongPriorityDelayedIndexQueue.wrap(this.priorityQueue), this.startLedgerId, this.endLedgerId);
    }

    Pair<ImmutableBucket, DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> createImmutableBucketAndAsyncPersistent(long timeStepPerBucketSnapshotSegment, int maxIndexesPerBucketSnapshotSegment, TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, long startLedgerId, long endLedgerId) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", new Object[]{this.dispatcherName, startLedgerId, endLedgerId});
        }
        if (delayedIndexQueue.isEmpty()) {
            return null;
        }
        long numMessages = 0L;
        ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>();
        ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata>();
        HashMap<Long, RoaringBitmap> bitMap = new HashMap<Long, RoaringBitmap>();
        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.Builder snapshotSegmentBuilder = DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder();
        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.Builder segmentMetadataBuilder = DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder();
        ArrayList<Long> firstScheduleTimestamps = new ArrayList<Long>();
        long currentTimestampUpperLimit = 0L;
        long currentFirstTimestamp = 0L;
        while (!delayedIndexQueue.isEmpty()) {
            DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = delayedIndexQueue.peek();
            long timestamp = delayedIndex.getTimestamp();
            if (currentTimestampUpperLimit == 0L) {
                currentFirstTimestamp = timestamp;
                firstScheduleTimestamps.add(currentFirstTimestamp);
                currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1L;
            }
            long ledgerId = delayedIndex.getLedgerId();
            long entryId = delayedIndex.getEntryId();
            Preconditions.checkArgument((ledgerId >= startLedgerId && ledgerId <= endLedgerId ? 1 : 0) != 0);
            if (segmentMetadataList.size() == 0) {
                sharedQueue.add(timestamp, ledgerId, entryId);
            }
            delayedIndexQueue.pop();
            ++numMessages;
            bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1L);
            snapshotSegmentBuilder.addIndexes(delayedIndex);
            if (!delayedIndexQueue.isEmpty() && delayedIndexQueue.peek().getTimestamp() <= currentTimestampUpperLimit && (maxIndexesPerBucketSnapshotSegment == -1 || snapshotSegmentBuilder.getIndexesCount() < maxIndexesPerBucketSnapshotSegment)) continue;
            segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
            segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
            currentTimestampUpperLimit = 0L;
            Iterator iterator = bitMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = iterator.next();
                byte[] array = new byte[((RoaringBitmap)entry.getValue()).serializedSizeInBytes()];
                ((RoaringBitmap)entry.getValue()).serialize(ByteBuffer.wrap(array));
                segmentMetadataBuilder.putDelayedIndexBitMap((Long)entry.getKey(), ByteString.copyFrom((byte[])array));
                iterator.remove();
            }
            segmentMetadataList.add(segmentMetadataBuilder.build());
            segmentMetadataBuilder.clear();
            bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
            snapshotSegmentBuilder.clear();
        }
        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata bucketSnapshotMetadata = DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().addAllMetadataList(segmentMetadataList).build();
        int lastSegmentEntryId = segmentMetadataList.size();
        ImmutableBucket bucket = new ImmutableBucket(this.dispatcherName, this.cursor, (FutureUtil.Sequencer<Void>)this.sequencer, this.bucketSnapshotStorage, startLedgerId, endLedgerId);
        bucket.setCurrentSegmentEntryId(1);
        bucket.setNumberBucketDelayedMessages(numMessages);
        bucket.setLastSegmentEntryId(lastSegmentEntryId);
        bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
        bucket.setSnapshotSegments(snapshotSegments);
        Preconditions.checkArgument((!bucketSnapshotSegments.isEmpty() ? 1 : 0) != 0);
        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment)bucketSnapshotSegments.get(0);
        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
        Pair result = Pair.of((Object)bucket, (Object)lastDelayedIndex);
        CompletableFuture<Long> future = this.asyncSaveBucketSnapshot(bucket, bucketSnapshotMetadata, bucketSnapshotSegments);
        bucket.setSnapshotCreateFuture(future);
        return result;
    }

    void moveScheduledMessageToSharedQueue(long cutoffTime, TripleLongPriorityQueue sharedBucketPriorityQueue) {
        long timestamp;
        while (!this.priorityQueue.isEmpty() && (timestamp = this.priorityQueue.peekN1()) <= cutoffTime) {
            long ledgerId = this.priorityQueue.peekN2();
            long entryId = this.priorityQueue.peekN3();
            sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId);
            this.priorityQueue.pop();
        }
    }

    void resetLastMutableBucketRange() {
        this.startLedgerId = -1L;
        this.endLedgerId = -1L;
    }

    void clear() {
        this.resetLastMutableBucketRange();
        this.delayedIndexBitMap.clear();
        this.priorityQueue.clear();
    }

    @Override
    public void close() {
        this.priorityQueue.close();
    }

    long getBufferMemoryUsage() {
        return this.priorityQueue.bytesCapacity();
    }

    boolean isEmpty() {
        return this.priorityQueue.isEmpty();
    }

    long nextDeliveryTime() {
        return this.priorityQueue.peekN1();
    }

    long size() {
        return this.priorityQueue.size();
    }

    void addMessage(long ledgerId, long entryId, long deliverAt) {
        this.priorityQueue.add(deliverAt, ledgerId, entryId);
        if (this.startLedgerId == -1L) {
            this.startLedgerId = ledgerId;
        }
        this.endLedgerId = ledgerId;
        this.putIndexBit(ledgerId, entryId);
    }
}

