/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

public class PeerReplicatorTest
extends ReplicatorTestBase {
    @Override
    @BeforeClass(timeOut=300000L)
    void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(timeOut=300000L)
    void shutdown() throws Exception {
        super.shutdown();
    }

    @DataProvider(name="lookupType")
    public Object[][] codecProvider() {
        return new Object[][]{{"http"}, {"binary"}};
    }

    @Test(dataProvider="lookupType", timeOut=10000L)
    public void testPeerClusterTopicLookup(String protocol) throws Exception {
        this.admin1.clusters().updatePeerClusterNames("r1", null);
        this.admin1.clusters().updatePeerClusterNames("r2", null);
        this.admin1.clusters().updatePeerClusterNames("r3", null);
        String serviceUrl = protocol.equalsIgnoreCase("http") ? this.pulsar3.getWebServiceAddress() : this.pulsar3.getBrokerServiceUrl();
        String namespace1 = "pulsar/global/peer1-" + protocol;
        String namespace2 = "pulsar/global/peer2-" + protocol;
        this.admin1.namespaces().createNamespace(namespace1);
        this.admin1.namespaces().createNamespace(namespace2);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace1, (Set)Sets.newHashSet((Object[])new String[]{"r1"}));
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace2, (Set)Sets.newHashSet((Object[])new String[]{"r2"}));
        this.admin1.clusters().updatePeerClusterNames("r3", null);
        this.pulsar1.getConfiguration().setTlsEnabled(false);
        this.pulsar2.getConfiguration().setTlsEnabled(false);
        this.pulsar3.getConfiguration().setTlsEnabled(false);
        String topic1 = "persistent://" + namespace1 + "/topic1";
        String topic2 = "persistent://" + namespace2 + "/topic2";
        PulsarClient client3 = PulsarClient.builder().serviceUrl(serviceUrl).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            client3.newProducer().topic(topic1).create();
            Assert.fail((String)"should have failed as cluster:r3 doesn't own namespace");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        try {
            client3.newProducer().topic(topic2).create();
            Assert.fail((String)"should have failed as cluster:r3 doesn't own namespace");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        this.admin1.clusters().updatePeerClusterNames("r3", Sets.newLinkedHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"r1"})));
        Producer producer = client3.newProducer().topic(topic1).create();
        PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topic1).get();
        Assert.assertNotNull((Object)topic);
        this.pulsar1.getBrokerService().updateRates();
        TopicStats stats = this.admin1.topics().getStats(topic1);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        stats = this.admin3.topics().getStats(topic1);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        producer.close();
        this.admin2.clusters().updatePeerClusterNames("r3", Sets.newLinkedHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"r2"})));
        producer = client3.newProducer().topic(topic2).create();
        topic = (PersistentTopic)this.pulsar2.getBrokerService().getOrCreateTopic(topic2).get();
        Assert.assertNotNull((Object)topic);
        this.pulsar2.getBrokerService().updateRates();
        stats = this.admin3.topics().getStats(topic2);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        stats = this.admin3.topics().getStats(topic2);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        producer.close();
        client3.close();
    }

    @Test(timeOut=10000L)
    public void testGetPeerClusters() throws Exception {
        this.admin1.clusters().updatePeerClusterNames("r1", null);
        this.admin1.clusters().updatePeerClusterNames("r2", null);
        this.admin1.clusters().updatePeerClusterNames("r3", null);
        String mainClusterName = "r1";
        Assert.assertNull((Object)this.admin1.clusters().getPeerClusterNames("r1"));
        LinkedHashSet peerClusters = Sets.newLinkedHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"r2", "r3"}));
        this.admin1.clusters().updatePeerClusterNames("r1", peerClusters);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin1.clusters().getPeerClusterNames("r1").size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 100L);
        Assert.assertEquals((Set)this.admin1.clusters().getPeerClusterNames("r1"), (Set)peerClusters);
    }

    @Test
    public void testPeerClusterInReplicationClusterListChange() throws Exception {
        this.admin1.clusters().updatePeerClusterNames("r1", null);
        this.admin1.clusters().updatePeerClusterNames("r2", null);
        this.admin1.clusters().updatePeerClusterNames("r3", null);
        String serviceUrl = this.pulsar3.getBrokerServiceUrl();
        String namespace1 = "pulsar/global/peer-change-repl-ns-" + System.nanoTime();
        this.admin1.namespaces().createNamespace(namespace1);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace1, (Set)Sets.newHashSet((Object[])new String[]{"r1"}));
        this.admin1.clusters().updatePeerClusterNames("r3", null);
        this.pulsar1.getConfiguration().setTlsEnabled(false);
        this.pulsar2.getConfiguration().setTlsEnabled(false);
        this.pulsar3.getConfiguration().setTlsEnabled(false);
        String topic1 = "persistent://" + namespace1 + "/topic1";
        PulsarClient client3 = PulsarClient.builder().serviceUrl(serviceUrl).statsInterval(0L, TimeUnit.SECONDS).build();
        this.admin1.clusters().updatePeerClusterNames("r3", Sets.newLinkedHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"r1"})));
        this.admin1.clusters().updatePeerClusterNames("r1", Sets.newLinkedHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"r3"})));
        Producer producer = client3.newProducer().topic(topic1).create();
        PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topic1).get();
        Assert.assertNotNull((Object)topic);
        this.pulsar1.getBrokerService().updateRates();
        TopicStats stats = this.admin1.topics().getStats(topic1);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        stats = this.admin3.topics().getStats(topic1);
        Assert.assertNotNull((Object)stats);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        producer.close();
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace1, (Set)Sets.newHashSet((Object[])new String[]{"r3"}));
        NamespaceBundles bundles = this.pulsar1.getNamespaceService().getNamespaceBundleFactory().getBundles(NamespaceName.get((String)namespace1));
        NamespaceBundle bundle = (NamespaceBundle)bundles.getBundles().get(0);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return (Boolean)this.pulsar1.getNamespaceService().isNamespaceBundleOwned(bundle).get() == false;
            }
            catch (Exception e) {
                return false;
            }
        }, 5, 200L);
        Assert.assertFalse((boolean)((Boolean)this.pulsar1.getNamespaceService().isNamespaceBundleOwned(bundle).get()));
        Assert.assertFalse((boolean)this.pulsar1.getBrokerService().getTopics().containsKey((Object)topic1));
        client3.close();
    }
}

