/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class BrokerBkEnsemblesTests {
    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
    protected PulsarService pulsar;
    ServiceConfiguration config;
    URL adminUrl;
    protected PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    protected int BROKER_WEBSERVICE_PORT;
    private final int numberOfBookies;
    private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class);

    public BrokerBkEnsemblesTests() {
        this(3);
    }

    public BrokerBkEnsemblesTests(int numberOfBookies) {
        this.numberOfBookies = numberOfBookies;
    }

    @BeforeMethod
    protected void setup() throws Exception {
        try {
            int ZOOKEEPER_PORT = PortManager.nextFreePort();
            this.BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
            this.bkEnsemble = new LocalBookkeeperEnsemble(this.numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
            this.bkEnsemble.start();
            this.config = new ServiceConfiguration();
            this.config.setZookeeperServers("127.0.0.1:" + ZOOKEEPER_PORT);
            this.config.setAdvertisedAddress("localhost");
            this.config.setWebServicePort(Optional.ofNullable(this.BROKER_WEBSERVICE_PORT));
            this.config.setClusterName("usc");
            this.config.setBrokerServicePort(Optional.ofNullable(BROKER_SERVICE_PORT));
            this.config.setAuthorizationEnabled(false);
            this.config.setAuthenticationEnabled(false);
            this.config.setManagedLedgerMaxEntriesPerLedger(5);
            this.config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.config.setAdvertisedAddress("127.0.0.1");
            this.config.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar = new PulsarService(this.config);
            this.pulsar.start();
            this.adminUrl = new URL("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT);
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl.toString()).build();
            this.admin.clusters().createCluster("usc", new ClusterData(this.adminUrl.toString()));
            this.admin.tenants().createTenant("prop", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"usc"})));
        }
        catch (Throwable t) {
            LOG.error("Error setting up broker test", t);
            Assert.fail((String)"Broker test setup failed");
        }
    }

    @AfterMethod
    protected void shutdown() throws Exception {
        try {
            this.admin.close();
            this.pulsar.close();
            this.bkEnsemble.stop();
        }
        catch (Throwable t) {
            LOG.warn("Error cleaning up broker test setup state", t);
        }
    }

    @Test
    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
        int i;
        ZooKeeper zk = this.bkEnsemble.getZkClient();
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String ns1 = "prop/usc/crash-broker";
        this.admin.namespaces().createNamespace("prop/usc/crash-broker");
        String topic1 = "persistent://prop/usc/crash-broker/my-topic";
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/crash-broker/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = client.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i3 = 0; i3 < 10; ++i3) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            consumer.acknowledge(msg);
        }
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get();
        ManagedCursorImpl cursor = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
        MockedPulsarServiceBaseTest.retryStrategically(test -> cursor.getState().equals("Open"), 5, 100L);
        long cursorLedgerId = cursor.getCursorLedger();
        String ledgerPath = "/ledgers" + StringUtils.getHybridHierarchicalLedgerPath((long)cursorLedgerId);
        Assert.assertNotNull((Object)zk.exists(ledgerPath, false));
        consumer.close();
        producer.close();
        this.pulsar.getBrokerService().removeTopicFromCache("persistent://prop/usc/crash-broker/my-topic");
        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory();
        Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        field.setAccessible(true);
        ConcurrentHashMap ledgers = (ConcurrentHashMap)field.get(factory);
        ledgers.clear();
        consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/crash-broker/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        producer = client.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            consumer.acknowledge(msg);
        }
        topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get();
        ManagedCursorImpl cursor1 = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
        MockedPulsarServiceBaseTest.retryStrategically(test -> cursor1.getState().equals("Open"), 5, 100L);
        long newCursorLedgerId = cursor1.getCursorLedger();
        Assert.assertNotEquals((Object)newCursorLedgerId, (Object)-1);
        Assert.assertNotEquals((Object)cursorLedgerId, (Object)newCursorLedgerId);
        Assert.assertNull((Object)zk.exists(ledgerPath, false));
        producer.close();
        consumer.close();
        client.close();
    }

    @Test
    public void testSkipCorruptDataLedger() throws Exception {
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String ns1 = "prop/usc/crash-broker";
        int totalMessages = 100;
        int totalDataLedgers = 5;
        int entriesPerLedger = 20;
        try {
            this.admin.namespaces().createNamespace("prop/usc/crash-broker");
        }
        catch (Exception exception) {
            // empty catch block
        }
        String topic1 = "persistent://prop/usc/crash-broker/my-topic-" + System.currentTimeMillis();
        Consumer consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").receiverQueueSize(5).subscribe();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic(topic1).get();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)topic.getManagedLedger();
        ManagedCursorImpl cursor = (ManagedCursorImpl)ml.getCursors().iterator().next();
        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
        configField.setAccessible(true);
        ManagedLedgerConfig config = (ManagedLedgerConfig)configField.get(cursor);
        config.setMaxEntriesPerLedger(20);
        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
        bookKeeperField.setAccessible(true);
        BookKeeper bookKeeper = (BookKeeper)bookKeeperField.get(ml);
        Producer producer = client.newProducer().topic(topic1).create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Assert.assertNotNull((Object)consumer.receive(1, TimeUnit.SECONDS));
        consumer.close();
        NavigableMap ledgerInfo = ml.getLedgersInfo();
        Assert.assertEquals((int)ledgerInfo.size(), (int)5);
        Map.Entry lastLedger = ledgerInfo.lastEntry();
        ledgerInfo.entrySet().forEach(entry -> {
            if (!entry.equals(lastLedger)) {
                Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)entry.getValue()).getEntries(), (long)20L);
                try {
                    bookKeeper.deleteLedger(((Long)entry.getKey()).longValue());
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        producer.close();
        this.pulsar.getBrokerService().removeTopicFromCache(topic1);
        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory();
        Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        field.setAccessible(true);
        ConcurrentHashMap ledgers = (ConcurrentHashMap)field.get(factory);
        ledgers.clear();
        Message msg = null;
        consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").subscribe();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
        MockedPulsarServiceBaseTest.retryStrategically(test -> config.isAutoSkipNonRecoverableData(), 5, 100L);
        consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").subscribe();
        for (int i = 0; i < 20; ++i) {
            msg = consumer.receive();
            System.out.println(i);
            consumer.acknowledge(msg);
        }
        producer.close();
        consumer.close();
        client.close();
    }

    @Test(timeOut=20000L)
    public void testTopicWithWildCardChar() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String ns1 = "prop/usc/topicWithSpecialChar";
        try {
            this.admin.namespaces().createNamespace("prop/usc/topicWithSpecialChar");
        }
        catch (Exception exception) {
            // empty catch block
        }
        String topic1 = "persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524";
        String subName1 = "c1";
        byte[] content = "test".getBytes();
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524"}).subscriptionName("c1").subscribe();
        Producer producer = client.newProducer().topic("persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524").create();
        producer.send((Object)content);
        Message msg = consumer.receive();
        Assert.assertEquals((byte[])msg.getData(), (byte[])content);
        consumer.close();
        producer.close();
        client.close();
    }
}

