/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.URL;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public class ReplicatorTestBase {
    URL url1;
    URL urlTls1;
    ServiceConfiguration config1 = new ServiceConfiguration();
    PulsarService pulsar1;
    BrokerService ns1;
    PulsarAdmin admin1;
    LocalBookkeeperEnsemble bkEnsemble1;
    URL url2;
    URL urlTls2;
    ServiceConfiguration config2 = new ServiceConfiguration();
    PulsarService pulsar2;
    BrokerService ns2;
    PulsarAdmin admin2;
    LocalBookkeeperEnsemble bkEnsemble2;
    URL url3;
    URL urlTls3;
    ServiceConfiguration config3 = new ServiceConfiguration();
    PulsarService pulsar3;
    BrokerService ns3;
    PulsarAdmin admin3;
    LocalBookkeeperEnsemble bkEnsemble3;
    ZookeeperServerTest globalZkS;
    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new DefaultThreadFactory("ReplicatorTestBase"));
    static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    protected static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    protected static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTestBase.class);

    public int getBrokerServicePurgeInactiveFrequency() {
        return 60;
    }

    public boolean isBrokerServicePurgeInactiveTopic() {
        return false;
    }

    void setup() throws Exception {
        log.info("--- Starting ReplicatorTestBase::setup ---");
        int globalZKPort = PortManager.nextFreePort();
        this.globalZkS = new ZookeeperServerTest(globalZKPort);
        this.globalZkS.start();
        int zkPort1 = PortManager.nextFreePort();
        this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
        this.bkEnsemble1.start();
        int webServicePort1 = PortManager.nextFreePort();
        int webServicePortTls1 = PortManager.nextFreePort();
        this.config1.setClusterName("r1");
        this.config1.setAdvertisedAddress("localhost");
        this.config1.setWebServicePort(Optional.ofNullable(webServicePort1));
        this.config1.setWebServicePortTls(Optional.ofNullable(webServicePortTls1));
        this.config1.setZookeeperServers("127.0.0.1:" + zkPort1);
        this.config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
        this.config1.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
        this.config1.setBrokerServicePurgeInactiveFrequencyInSeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
        this.config1.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
        this.config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config1.setBacklogQuotaCheckIntervalInSeconds(5);
        this.config1.setDefaultNumberOfNamespaceBundles(1);
        this.config1.setAllowAutoTopicCreationType("non-partitioned");
        this.pulsar1 = new PulsarService(this.config1);
        this.pulsar1.start();
        this.ns1 = this.pulsar1.getBrokerService();
        this.url1 = new URL("http://localhost:" + webServicePort1);
        this.urlTls1 = new URL("https://localhost:" + webServicePortTls1);
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        int zkPort2 = PortManager.nextFreePort();
        this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
        this.bkEnsemble2.start();
        int webServicePort2 = PortManager.nextFreePort();
        int webServicePortTls2 = PortManager.nextFreePort();
        this.config2.setClusterName("r2");
        this.config2.setAdvertisedAddress("localhost");
        this.config2.setWebServicePort(Optional.ofNullable(webServicePort2));
        this.config2.setWebServicePortTls(Optional.ofNullable(webServicePortTls2));
        this.config2.setZookeeperServers("127.0.0.1:" + zkPort2);
        this.config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
        this.config2.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
        this.config2.setBrokerServicePurgeInactiveFrequencyInSeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
        this.config2.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
        this.config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config2.setBacklogQuotaCheckIntervalInSeconds(5);
        this.config2.setDefaultNumberOfNamespaceBundles(1);
        this.config2.setAllowAutoTopicCreationType("non-partitioned");
        this.pulsar2 = new PulsarService(this.config2);
        this.pulsar2.start();
        this.ns2 = this.pulsar2.getBrokerService();
        this.url2 = new URL("http://localhost:" + webServicePort2);
        this.urlTls2 = new URL("https://localhost:" + webServicePortTls2);
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        int zkPort3 = PortManager.nextFreePort();
        this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
        this.bkEnsemble3.start();
        int webServicePort3 = PortManager.nextFreePort();
        int webServicePortTls3 = PortManager.nextFreePort();
        this.config3.setClusterName("r3");
        this.config3.setAdvertisedAddress("localhost");
        this.config3.setWebServicePort(Optional.ofNullable(webServicePort3));
        this.config3.setWebServicePortTls(Optional.ofNullable(webServicePortTls3));
        this.config3.setZookeeperServers("127.0.0.1:" + zkPort3);
        this.config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
        this.config3.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
        this.config3.setBrokerServicePurgeInactiveFrequencyInSeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
        this.config3.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
        this.config3.setTlsEnabled(true);
        this.config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config3.setDefaultNumberOfNamespaceBundles(1);
        this.config3.setAllowAutoTopicCreationType("non-partitioned");
        this.pulsar3 = new PulsarService(this.config3);
        this.pulsar3.start();
        this.ns3 = this.pulsar3.getBrokerService();
        this.url3 = new URL("http://localhost:" + webServicePort3);
        this.urlTls3 = new URL("https://localhost:" + webServicePortTls3);
        this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
        this.admin1.clusters().createCluster("r1", new ClusterData(this.url1.toString(), this.urlTls1.toString(), this.pulsar1.getSafeBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r2", new ClusterData(this.url2.toString(), this.urlTls2.toString(), this.pulsar2.getSafeBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r3", new ClusterData(this.url3.toString(), this.urlTls3.toString(), this.pulsar3.getSafeBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrlTls()));
        this.admin1.tenants().createTenant("pulsar", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2", "appid3"}), (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"})));
        this.admin1.namespaces().createNamespace("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        this.admin1.namespaces().createNamespace("pulsar/ns1", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getServiceUrl(), (String)this.url1.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getServiceUrl(), (String)this.url2.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getServiceUrl(), (String)this.url3.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), (String)this.pulsar1.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), (String)this.pulsar2.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), (String)this.pulsar3.getSafeBrokerServiceUrl());
        this.admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
        this.admin1.namespaces().createNamespace("pulsar/global/ns");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        Thread.sleep(100L);
        log.info("--- ReplicatorTestBase::setup completed ---");
    }

    private int inSec(int time, TimeUnit unit) {
        return (int)TimeUnit.SECONDS.convert(time, unit);
    }

    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdown();
        this.admin1.close();
        this.admin2.close();
        this.admin3.close();
        this.pulsar3.close();
        this.pulsar2.close();
        this.pulsar1.close();
        this.bkEnsemble1.stop();
        this.bkEnsemble2.stop();
        this.bkEnsemble3.stop();
        this.globalZkS.stop();
    }

    static class MessageConsumer
    implements AutoCloseable {
        final URL url;
        final String namespace;
        final String topicName;
        final PulsarClient client;
        final Consumer<byte[]> consumer;

        MessageConsumer(URL url, TopicName dest) throws Exception {
            this(url, dest, "sub-id");
        }

        MessageConsumer(URL url, TopicName dest, String subId) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.consumer = this.client.newConsumer().topic(new String[]{this.topicName}).subscriptionName(subId).subscribe();
            }
            catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        void receive(int messages) throws Exception {
            log.info("Start receiving messages");
            TreeSet<String> receivedMessages = new TreeSet<String>();
            int i = 0;
            while (i < messages) {
                Message msg = this.consumer.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)msg);
                this.consumer.acknowledge(msg);
                String msgData = new String(msg.getData());
                log.info("Received message {}", (Object)msgData);
                boolean added = receivedMessages.add(msgData);
                if (added) {
                    Assert.assertEquals((String)msgData, (String)("test-" + i));
                    ++i;
                    continue;
                }
                log.info("Ignoring duplicate {}", (Object)msgData);
            }
        }

        boolean drained() throws Exception {
            return this.consumer.receive(0, TimeUnit.MICROSECONDS) == null;
        }

        @Override
        public void close() {
            try {
                this.client.close();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to close client", (Throwable)e);
            }
        }
    }

    static class MessageProducer
    implements AutoCloseable {
        URL url;
        String namespace;
        String topicName;
        PulsarClient client;
        Producer<byte[]> producer;

        MessageProducer(URL url, TopicName dest) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            this.producer = this.client.newProducer().topic(this.topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        }

        MessageProducer(URL url, TopicName dest, boolean batch) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            ProducerBuilder producerBuilder = this.client.newProducer().topic(this.topicName).enableBatching(batch).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).batchingMaxMessages(5);
            this.producer = producerBuilder.create();
        }

        void produceBatch(int messages) throws Exception {
            log.info("Start sending batch messages");
            for (int i = 0; i < messages; ++i) {
                this.producer.sendAsync((Object)("test-" + i).getBytes());
                log.info("queued message {}", (Object)("test-" + i));
            }
            this.producer.flush();
        }

        void produce(int messages) throws Exception {
            log.info("Start sending messages");
            for (int i = 0; i < messages; ++i) {
                this.producer.send((Object)("test-" + i).getBytes());
                log.info("Sent message {}", (Object)("test-" + i));
            }
        }

        TypedMessageBuilder<byte[]> newMessage() {
            return this.producer.newMessage();
        }

        void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
            log.info("Start sending messages");
            for (int i = 0; i < messages; ++i) {
                String m = new String("test-" + i);
                messageBuilder.value((Object)m.getBytes()).send();
                log.info("Sent message {}", (Object)m);
            }
        }

        @Override
        public void close() {
            try {
                this.client.close();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to close client", (Throwable)e);
            }
        }
    }
}

