/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PersistentMessageFinderTest
extends MockedBookKeeperTestCase {
    public static byte[] createMessageWrittenToLedger(String msg) throws Exception {
        PulsarApi.MessageMetadata.Builder messageMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
        messageMetadataBuilder.setPublishTime(System.currentTimeMillis());
        messageMetadataBuilder.setProducerName("createMessageWrittenToLedger");
        messageMetadataBuilder.setSequenceId(1L);
        PulsarApi.MessageMetadata messageMetadata = messageMetadataBuilder.build();
        ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
        int msgMetadataSize = messageMetadata.getSerializedSize();
        int payloadSize = data.readableBytes();
        int totalSize = 4 + msgMetadataSize + payloadSize;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get((ByteBuf)headers);
        headers.writeInt(msgMetadataSize);
        messageMetadata.writeTo(outStream);
        ByteBuf headersAndPayload = ByteBufPair.coalesce((ByteBufPair)ByteBufPair.get((ByteBuf)headers, (ByteBuf)data));
        byte[] byteMessage = headersAndPayload.nioBuffer().array();
        headersAndPayload.release();
        return byteMessage;
    }

    CompletableFuture<Void> findMessage(final Result result, ManagedCursor c1, long timestamp) {
        PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                result.position = position;
                future.complete(null);
            }

            public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                future.completeExceptionally((Throwable)exception);
            }
        });
        return future;
    }

    @Test
    void testPersistentMessageFinder() throws Exception {
        String ledgerAndCursorName = "testPersistentMessageFinder";
        int entriesPerLedger = 2;
        long beginTimestamp = System.currentTimeMillis();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(entriesPerLedger);
        config.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger ledger = this.factory.open("testPersistentMessageFinder", config);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageFinder");
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained1"));
        Thread.sleep(100L);
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained2"));
        Thread.sleep(100L);
        Position newPosition = ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("retained3"));
        Thread.sleep(100L);
        long timestamp = System.currentTimeMillis();
        Thread.sleep(10L);
        ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("afterresetposition"));
        Position lastPosition = ledger.addEntry(PersistentMessageFinderTest.createMessageWrittenToLedger("not-read"));
        List entries = c1.readEntries(3);
        c1.markDelete(((Entry)entries.get(2)).getPosition());
        c1.close();
        ledger.close();
        entries.forEach(e -> e.release());
        Thread.sleep(1000L);
        ledger = this.factory.open("testPersistentMessageFinder", config);
        c1 = (ManagedCursorImpl)ledger.openCursor("testPersistentMessageFinder");
        long endTimestamp = System.currentTimeMillis();
        Result result = new Result();
        CompletableFuture<Void> future = this.findMessage(result, (ManagedCursor)c1, timestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotNull((Object)result.position);
        Assert.assertEquals((Object)result.position, (Object)newPosition);
        result.reset();
        future = this.findMessage(result, (ManagedCursor)c1, beginTimestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertEquals((Object)result.position, (Object)c1.getFirstPosition());
        result.reset();
        future = this.findMessage(result, (ManagedCursor)c1, endTimestamp);
        future.get();
        Assert.assertNull((Object)result.exception);
        Assert.assertNotEquals((Object)result.position, null);
        Assert.assertEquals((Object)result.position, (Object)lastPosition);
        PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", (ManagedCursor)c1);
        final AtomicBoolean ex = new AtomicBoolean(false);
        messageFinder.findEntryFailed(new ManagedLedgerException("failed"), (Object)new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
            }

            public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
                ex.set(true);
            }
        });
        Assert.assertTrue((boolean)ex.get());
        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), (ManagedCursor)c1);
        monitor.findEntryFailed((ManagedLedgerException)new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null);
        Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
        field.setAccessible(true);
        Assert.assertEquals((Object)0, (Object)field.get(monitor));
        result.reset();
        c1.close();
        ledger.close();
        this.factory.shutdown();
    }

    class Result {
        ManagedLedgerException exception = null;
        Position position = null;

        Result() {
        }

        void reset() {
            this.exception = null;
            this.position = null;
        }
    }
}

