package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentMessageFinderTest.class */
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/PersistentMessageFinderTest$Result.class */
    public class Result {
        ManagedLedgerException exception = null;
        Position position = null;

        Result() {
        }

        void reset() {
            this.exception = null;
            this.position = null;
        }
    }

    public static byte[] createMessageWrittenToLedger(String str) throws Exception {
        PulsarApi.MessageMetadata.Builder newBuilder = PulsarApi.MessageMetadata.newBuilder();
        newBuilder.setPublishTime(System.currentTimeMillis());
        newBuilder.setProducerName("createMessageWrittenToLedger");
        newBuilder.setSequenceId(1L);
        PulsarApi.MessageMetadata build = newBuilder.build();
        ByteBuf writeBytes = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(str.getBytes());
        int serializedSize = build.getSerializedSize();
        int readableBytes = 4 + serializedSize + writeBytes.readableBytes();
        ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(readableBytes, readableBytes);
        ByteBufCodedOutputStream byteBufCodedOutputStream = ByteBufCodedOutputStream.get(heapBuffer);
        heapBuffer.writeInt(serializedSize);
        build.writeTo(byteBufCodedOutputStream);
        ByteBuf coalesce = ByteBufPair.coalesce(ByteBufPair.get(heapBuffer, writeBytes));
        byte[] array = coalesce.nioBuffer().array();
        coalesce.release();
        return array;
    }

    CompletableFuture<Void> findMessage(final Result result, ManagedCursor managedCursor, long j) {
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder("topicname", managedCursor);
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        persistentMessageFinder.findMessages(j, new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.PersistentMessageFinderTest.1
            public void findEntryComplete(Position position, Object obj) {
                result.position = position;
                completableFuture.complete(null);
            }

            public void findEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                result.exception = managedLedgerException;
                completableFuture.completeExceptionally(managedLedgerException);
            }
        });
        return completableFuture;
    }

    @Test
    void testPersistentMessageFinder() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setRetentionSizeInMB(10L);
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger open = this.factory.open("testPersistentMessageFinder", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testPersistentMessageFinder");
        open.addEntry(createMessageWrittenToLedger("retained1"));
        Thread.sleep(100L);
        open.addEntry(createMessageWrittenToLedger("retained2"));
        Thread.sleep(100L);
        Position addEntry = open.addEntry(createMessageWrittenToLedger("retained3"));
        Thread.sleep(100L);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(10L);
        open.addEntry(createMessageWrittenToLedger("afterresetposition"));
        Position addEntry2 = open.addEntry(createMessageWrittenToLedger("not-read"));
        List readEntries = openCursor.readEntries(3);
        openCursor.markDelete(((Entry) readEntries.get(2)).getPosition());
        openCursor.close();
        open.close();
        readEntries.forEach(entry -> {
            entry.release();
        });
        Thread.sleep(1000L);
        ManagedLedger open2 = this.factory.open("testPersistentMessageFinder", managedLedgerConfig);
        ManagedCursorImpl openCursor2 = open2.openCursor("testPersistentMessageFinder");
        long currentTimeMillis3 = System.currentTimeMillis();
        Result result = new Result();
        findMessage(result, openCursor2, currentTimeMillis2).get();
        Assert.assertEquals(result.exception, (Object) null);
        Assert.assertTrue(result.position != null);
        Assert.assertEquals(result.position, addEntry);
        result.reset();
        findMessage(result, openCursor2, currentTimeMillis).get();
        Assert.assertEquals(result.exception, (Object) null);
        Assert.assertEquals(result.position, (Object) null);
        result.reset();
        findMessage(result, openCursor2, currentTimeMillis3).get();
        Assert.assertEquals(result.exception, (Object) null);
        Assert.assertNotEquals(result.position, (Object) null);
        Assert.assertEquals(result.position, addEntry2);
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder("topicname", openCursor2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        persistentMessageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.PersistentMessageFinderTest.2
            public void findEntryComplete(Position position, Object obj) {
            }

            public void findEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                atomicBoolean.set(true);
            }
        });
        Assert.assertTrue(atomicBoolean.get());
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = new PersistentMessageExpiryMonitor("topicname", openCursor2.getName(), openCursor2);
        persistentMessageExpiryMonitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), (Object) null);
        Field declaredField = persistentMessageExpiryMonitor.getClass().getDeclaredField("expirationCheckInProgress");
        declaredField.setAccessible(true);
        Assert.assertEquals(0, declaredField.get(persistentMessageExpiryMonitor));
        result.reset();
        openCursor2.close();
        open2.close();
        this.factory.shutdown();
    }
}
