/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class CompactedTopicTest
extends MockedPulsarServiceBaseTest {
    private final Random r = new Random(0L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT));
        this.admin.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Triple<Long, List<Pair<PulsarApi.MessageIdData, Long>>, List<Pair<PulsarApi.MessageIdData, Long>>> buildCompactedLedger(BookKeeper bk, int count) throws Exception {
        LedgerHandle lh = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        ArrayList positions = new ArrayList();
        ArrayList idsInGaps = new ArrayList();
        AtomicLong ledgerIds = new AtomicLong(10L);
        AtomicLong entryIds = new AtomicLong(0L);
        CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, count).mapToObj(i -> {
            long delta;
            ArrayList<PulsarApi.MessageIdData> idsInGap = new ArrayList<PulsarApi.MessageIdData>();
            if (this.r.nextInt(10) == 1) {
                delta = this.r.nextInt(10) + 1;
                idsInGap.add(PulsarApi.MessageIdData.newBuilder().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L).build());
                ledgerIds.addAndGet(delta);
                entryIds.set(0L);
            }
            if ((delta = (long)this.r.nextInt(5)) != 0L) {
                idsInGap.add(PulsarApi.MessageIdData.newBuilder().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L).build());
            }
            PulsarApi.MessageIdData id = PulsarApi.MessageIdData.newBuilder().setLedgerId(ledgerIds.get()).setEntryId(entryIds.addAndGet(delta + 1L)).build();
            RawMessageImpl m = new RawMessageImpl(id, Unpooled.EMPTY_BUFFER);
            try {
                CompletableFuture f = new CompletableFuture();
                ByteBuf buffer = m.serialize();
                lh.asyncAddEntry(buffer, (rc, ledger, eid, ctx) -> {
                    if (rc != 0) {
                        f.completeExceptionally(BKException.create((int)rc));
                    } else {
                        positions.add(Pair.of((Object)id, (Object)eid));
                        idsInGap.forEach(gid -> idsInGaps.add(Pair.of((Object)gid, (Object)eid)));
                        f.complete(null);
                    }
                }, null);
                CompletableFuture completableFuture = f;
                return completableFuture;
            }
            finally {
                if (Collections.singletonList(m).get(0) != null) {
                    m.close();
                }
            }
        }).toArray(CompletableFuture[]::new)).get();
        lh.close();
        return Triple.of((Object)lh.getId(), positions, idsInGaps);
    }

    @Test
    public void testEntryLookup() throws Exception {
        PositionImpl pos;
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, Optional.empty(), null);
        Triple<Long, List<Pair<PulsarApi.MessageIdData, Long>>, List<Pair<PulsarApi.MessageIdData, Long>>> compactedLedgerData = this.buildCompactedLedger(bk, 500);
        List positions = (List)compactedLedgerData.getMiddle();
        List idsInGaps = (List)compactedLedgerData.getRight();
        LedgerHandle lh = bk.openLedger(((Long)compactedLedgerData.getLeft()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        long lastEntryId = lh.getLastAddConfirmed();
        AsyncLoadingCache cache = CompactedTopicImpl.createCache((LedgerHandle)lh, (long)50L);
        PulsarApi.MessageIdData firstPositionId = (PulsarApi.MessageIdData)((Pair)positions.get(0)).getLeft();
        Pair lastPosition = (Pair)positions.get(positions.size() - 1);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(0L, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(Long.MAX_VALUE, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(firstPositionId.getLedgerId(), 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(((PulsarApi.MessageIdData)lastPosition.getLeft()).getLedgerId(), ((PulsarApi.MessageIdData)lastPosition.getLeft()).getEntryId() + 1L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Collections.shuffle(positions, this.r);
        Collections.shuffle(idsInGaps, this.r);
        for (Pair p : positions) {
            pos = new PositionImpl(((PulsarApi.MessageIdData)p.getLeft()).getLedgerId(), ((PulsarApi.MessageIdData)p.getLeft()).getEntryId());
            Long got = (Long)CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get();
            Assert.assertEquals((Object)got, (Object)((Long)p.getRight()));
        }
        for (Pair gap : idsInGaps) {
            pos = new PositionImpl(((PulsarApi.MessageIdData)gap.getLeft()).getLedgerId(), ((PulsarApi.MessageIdData)gap.getLeft()).getEntryId());
            Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)((Long)gap.getRight()));
        }
    }

    @Test
    public void testCleanupOldCompactedTopicLedger() throws Exception {
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, Optional.empty(), null);
        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        oldCompactedLedger.close();
        LedgerHandle newCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        newCompactedLedger.close();
        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
        compactedTopic.newCompactedLedger((Position)new PositionImpl(1L, 2L), oldCompactedLedger.getId()).get();
        bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        compactedTopic.newCompactedLedger((Position)new PositionImpl(1L, 2L), newCompactedLedger.getId()).get();
        try {
            bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            Assert.fail((String)"Should have failed to open old ledger");
        }
        catch (BKException.BKNoSuchLedgerExistsException bKNoSuchLedgerExistsException) {
            // empty catch block
        }
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
    }
}

