package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Charsets;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.feature.SettableFeature;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.LogReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.FailpointUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/TestRollLogSegments.class */
public class TestRollLogSegments extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TestRollLogSegments.class);

    private static void ensureOnlyOneInprogressLogSegments(List<LogSegmentMetadata> list) throws Exception {
        int i = 0;
        Iterator<LogSegmentMetadata> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().isInProgress()) {
                i++;
            }
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 60000)
    public void testDisableRollingLogSegments() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        distributedLogConfiguration.setMaxLogSegmentBytes(40L);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-disable-rolling-log-segments");
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        ((SettableFeature) createNewDLM.getFeatureProvider().getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase())).set(true);
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 1; i <= 100; i++) {
            final int i2 = i;
            startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i2)).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new FutureEventListener<DLSN>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestRollLogSegments.1
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(DLSN dlsn) {
                    TestRollLogSegments.logger.info("Completed entry {} : {}.", Integer.valueOf(i2), dlsn);
                    countDownLatch.countDown();
                }

                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                }
            });
        }
        countDownLatch.await();
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        Assert.assertEquals(1L, createNewDLM.getLogSegments().size());
        createNewDLM.close();
    }

    @Test(timeout = 600000)
    public void testLastDLSNInRollingLogSegments() throws Exception {
        final HashMap hashMap = new HashMap();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        distributedLogConfiguration.setMaxLogSegmentBytes(40L);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-lastdlsn-in-rolling-log-segments");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 1; i <= 100; i++) {
            final int i2 = i;
            CompletableFuture<DLSN> whenComplete = bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i2)).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new FutureEventListener<DLSN>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestRollLogSegments.2
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(DLSN dlsn) {
                    TestRollLogSegments.logger.info("Completed entry {} : {}.", Integer.valueOf(i2), dlsn);
                    synchronized (hashMap) {
                        DLSN dlsn2 = (DLSN) hashMap.get(Long.valueOf(dlsn.getLogSegmentSequenceNo()));
                        if (null == dlsn2 || dlsn2.compareTo(dlsn) < 0) {
                            hashMap.put(Long.valueOf(dlsn.getLogSegmentSequenceNo()), dlsn);
                        }
                    }
                    countDownLatch.countDown();
                }

                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                }
            });
            if (i == 1) {
                Utils.ioResult(whenComplete);
            }
        }
        countDownLatch.await();
        bKAsyncLogWriter.closeAndComplete();
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        logger.info("lastDLSNs after writes {} {}", Integer.valueOf(hashMap.size()), hashMap);
        logger.info("segments after writes {} {}", Integer.valueOf(logSegments.size()), logSegments);
        Assert.assertTrue(logSegments.size() >= 2);
        Assert.assertTrue(hashMap.size() >= 2);
        Assert.assertEquals(hashMap.size(), logSegments.size());
        for (LogSegmentMetadata logSegmentMetadata : logSegments) {
            DLSN lastDLSN = logSegmentMetadata.getLastDLSN();
            DLSN dlsn = (DLSN) hashMap.get(Long.valueOf(logSegmentMetadata.getLogSegmentSequenceNumber()));
            Assert.assertNotNull(lastDLSN);
            Assert.assertNotNull(dlsn);
            if (lastDLSN.compareTo(dlsn) != 0) {
                logger.error("Last dlsn recorded in log segment {} is different from the one already seen {}.", lastDLSN, dlsn);
            }
            Assert.assertEquals(0L, lastDLSN.compareTo(dlsn));
        }
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v15, types: [org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.BKAsyncLogWriter] */
    @Test(timeout = 60000)
    public void testUnableToRollLogSegments() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        distributedLogConfiguration.setMaxLogSegmentBytes(1L);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-unable-to-roll-log-segments");
        ?? r0 = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        long j = 1;
        Utils.ioResult(r0.write(DLMTestUtil.getLogRecordInstance(1L)));
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate, FailpointUtils.FailPointActions.FailPointAction_Throw);
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(10);
            for (int i = 0; i < 10; i++) {
                long j2 = j + 1;
                j = r0;
                r0.write(DLMTestUtil.getLogRecordInstance(j2)).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new FutureEventListener<DLSN>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestRollLogSegments.3
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onSuccess(DLSN dlsn) {
                        TestRollLogSegments.logger.info("Completed entry : {}.", dlsn);
                        countDownLatch.countDown();
                    }

                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onFailure(Throwable th) {
                        TestRollLogSegments.logger.error("Failed to write entries : ", th);
                    }
                });
            }
            countDownLatch.await();
            r0.close();
            logger.info("LogSegments: {}", createNewDLM.getLogSegments());
            Assert.assertEquals(1L, r0.size());
            long j3 = 1;
            LogReader inputStream = createNewDLM.getInputStream(DLSN.InitialDLSN);
            for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
                DLMTestUtil.verifyLogRecord(readNext);
                long j4 = j3;
                j3 = j4 + 1;
                Assert.assertEquals(j4, readNext.getTransactionId());
                Assert.assertEquals(readNext.getTransactionId() - 1, readNext.getSequenceId());
            }
            Assert.assertEquals(12L, j3);
            inputStream.close();
            createNewDLM.close();
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRollingLogSegments() throws Exception {
        logger.info("start testRollingLogSegments");
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        distributedLogConfiguration.setMaxLogSegmentBytes(1L);
        distributedLogConfiguration.setLogSegmentRollingConcurrency(Integer.MAX_VALUE);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-rolling-logsegments-hightraffic");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 1; i <= 10; i++) {
            final int i2 = i;
            CompletableFuture<DLSN> whenComplete = bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i2)).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new FutureEventListener<DLSN>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestRollLogSegments.4
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(DLSN dlsn) {
                    TestRollLogSegments.logger.info("Completed entry {} : {}.", Integer.valueOf(i2), dlsn);
                    countDownLatch.countDown();
                }

                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    TestRollLogSegments.logger.error("Failed to write entries : {}", th);
                }
            });
            if (i == 1) {
                Utils.ioResult(whenComplete);
            }
        }
        countDownLatch.await();
        logger.info("Took {} ms to completed all requests.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        logger.info("LogSegments : {}", logSegments);
        Assert.assertTrue(logSegments.size() >= 2);
        ensureOnlyOneInprogressLogSegments(logSegments);
        int size = logSegments.size();
        for (int i3 = 1; i3 <= 10; i3++) {
            logger.info("Completed entry {} : {}", Integer.valueOf(10 + i3), (DLSN) Utils.ioResult(bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(10 + i3))));
        }
        List<LogSegmentMetadata> logSegments2 = createNewDLM.getLogSegments();
        logger.info("LogSegments : {}", logSegments2);
        Assert.assertEquals(size + (10 / 2), logSegments2.size());
        ensureOnlyOneInprogressLogSegments(logSegments2);
        bKAsyncLogWriter.close();
        createNewDLM.close();
    }

    private void checkAndWaitWriterReaderPosition(BKLogSegmentWriter bKLogSegmentWriter, long j, BKAsyncLogReader bKAsyncLogReader, long j2, LedgerHandle ledgerHandle, long j3) throws Exception {
        while (getLedgerHandle(bKLogSegmentWriter).getLastAddConfirmed() < j) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals(j, getLedgerHandle(bKLogSegmentWriter).getLastAddConfirmed());
        Assert.assertEquals(j3, ledgerHandle.readLastConfirmed());
        EntryPosition nextEntryPosition = bKAsyncLogReader.getReadAheadReader().getNextEntryPosition();
        logger.info("ReadAhead moved read position {} : ", nextEntryPosition);
        while (nextEntryPosition.getEntryId() < j2) {
            Thread.sleep(1000L);
            nextEntryPosition = bKAsyncLogReader.getReadAheadReader().getNextEntryPosition();
            logger.info("ReadAhead moved read position {} : ", nextEntryPosition);
        }
        Assert.assertEquals(j2, nextEntryPosition.getEntryId());
    }

    @Test(timeout = 60000)
    public void testCaughtUpReaderOnLogSegmentRolling() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setOutputBufferSize(4194304);
        distributedLogConfiguration.setTraceReadAheadMetadataChanges(true);
        distributedLogConfiguration.setEnsembleSize(1);
        distributedLogConfiguration.setWriteQuorumSize(1);
        distributedLogConfiguration.setAckQuorumSize(1);
        distributedLogConfiguration.setReadLACLongPollTimeout(99999999);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(199999999);
        distributedLogConfiguration.setBKClientReadTimeout(100000000);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-caughtup-reader-on-logsegment-rolling");
        BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        for (int i = 1; i <= 5; i++) {
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i));
            bKSyncLogWriter.flush();
            bKSyncLogWriter.commit();
        }
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, "distrlog-caughtup-reader-on-logsegment-rolling");
        BKAsyncLogReader bKAsyncLogReader = (BKAsyncLogReader) createNewDLM2.getAsyncLogReader(DLSN.InitialDLSN);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 5) {
                break;
            }
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(bKAsyncLogReader.readNext());
            DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
            Assert.assertEquals(j2, logRecordWithDLSN.getTransactionId());
            Assert.assertEquals(logRecordWithDLSN.getTransactionId() - 1, logRecordWithDLSN.getSequenceId());
            j = j2 + 1;
        }
        BKLogSegmentWriter bKLogSegmentWriter = bKSyncLogWriter.segmentWriter;
        LedgerHandle openLedgerNoRecovery = DLMTestUtil.getBookKeeperClient(createNewDLM2).get().openLedgerNoRecovery(getLedgerHandle(bKLogSegmentWriter).getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(Charsets.UTF_8));
        checkAndWaitWriterReaderPosition(bKLogSegmentWriter, 9L, bKAsyncLogReader, 9L, openLedgerNoRecovery, 8L);
        bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(6L));
        bKSyncLogWriter.flush();
        checkAndWaitWriterReaderPosition(bKLogSegmentWriter, 10L, bKAsyncLogReader, 10L, openLedgerNoRecovery, 9L);
        bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(7L));
        bKSyncLogWriter.flush();
        checkAndWaitWriterReaderPosition(bKLogSegmentWriter, 11L, bKAsyncLogReader, 11L, openLedgerNoRecovery, 10L);
        while (true) {
            BKLogSegmentEntryReader bKLogSegmentEntryReader = (BKLogSegmentEntryReader) bKAsyncLogReader.getReadAheadReader().getCurrentSegmentReader().getEntryReader();
            if (null != bKLogSegmentEntryReader && null != bKLogSegmentEntryReader.getOutstandingLongPoll()) {
                break;
            } else {
                Thread.sleep(1000L);
            }
        }
        logger.info("Waiting for long poll getting interrupted with metadata changed");
        BKLogWriteHandler cachedWriteHandler = bKSyncLogWriter.getCachedWriteHandler();
        cachedWriteHandler.completeAndCloseLogSegment(cachedWriteHandler.inprogressZNodeName(bKLogSegmentWriter.getLogSegmentId(), bKLogSegmentWriter.getStartTxId(), bKLogSegmentWriter.getLogSegmentSequenceNumber()), bKLogSegmentWriter.getLogSegmentSequenceNumber(), bKLogSegmentWriter.getLogSegmentId(), bKLogSegmentWriter.getStartTxId(), bKLogSegmentWriter.getLastTxId(), bKLogSegmentWriter.getPositionWithinLogSegment() - 1, 9L, 0L);
        BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(8L));
        bKSyncLogWriter2.flush();
        bKSyncLogWriter2.commit();
        bKSyncLogWriter2.closeAndComplete();
        long j3 = 6;
        while (true) {
            long j4 = j3;
            if (j4 > 8) {
                Utils.close(bKAsyncLogReader);
                createNewDLM2.close();
                return;
            } else {
                LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) Utils.ioResult(bKAsyncLogReader.readNext());
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN2);
                Assert.assertEquals(j4, logRecordWithDLSN2.getTransactionId());
                j3 = j4 + 1;
            }
        }
    }
}
