/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

public class ReplicatorTest
extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

    @BeforeMethod
    public void beforeMethod(Method m) throws Exception {
        this.methodName = m.getName();
    }

    @Override
    @BeforeClass(timeOut=300000L)
    void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(timeOut=300000L)
    void shutdown() throws Exception {
        super.shutdown();
    }

    @DataProvider(name="partitionedTopic")
    public Object[][] partitionedTopicProvider() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test
    public void testConfigChange() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChange ---");
        List results = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            final TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns/topic-%d", i));
            results.add(this.executor.submit(new Callable<Void>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer producer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, dest);
                    try {
                        log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                        ReplicatorTestBase.MessageConsumer consumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, dest);
                        try {
                            log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                            producer.produce(2);
                            consumer.receive(2);
                            Void void_ = null;
                            if (Collections.singletonList(consumer).get(0) != null) {
                                consumer.close();
                            }
                            return void_;
                        }
                        catch (Throwable throwable) {
                            if (Collections.singletonList(consumer).get(0) != null) {
                                consumer.close();
                            }
                            throw throwable;
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
            }));
        }
        for (Future result : results) {
            try {
                result.get();
            }
            catch (Exception e) {
                log.error("exception in getting future result ", (Throwable)e);
                Assert.fail((String)String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        Thread.sleep(1000L);
        ConcurrentOpenHashMap replicationClients1 = this.ns1.getReplicationClients();
        ConcurrentOpenHashMap replicationClients2 = this.ns2.getReplicationClients();
        ConcurrentOpenHashMap replicationClients3 = this.ns3.getReplicationClients();
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1"}));
        Thread.sleep(1000L);
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        Thread.sleep(1000L);
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testConcurrentReplicator() throws Exception {
        log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
        String namespace = "pulsar/concurrent";
        this.admin1.namespaces().createNamespace("pulsar/concurrent");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/concurrent", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        TopicName topicName = TopicName.get((String)String.format("persistent://pulsar/concurrent/topic-%d", 0));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            producer.close();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
            PulsarClientImpl pulsarClient = (PulsarClientImpl)Mockito.spy((Object)((PulsarClientImpl)this.pulsar1.getBrokerService().getReplicationClient("r3")));
            Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
            startRepl.setAccessible(true);
            Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
            replClientField.setAccessible(true);
            ConcurrentOpenHashMap replicationClients = (ConcurrentOpenHashMap)replClientField.get(this.pulsar1.getBrokerService());
            replicationClients.put((Object)"r3", (Object)pulsarClient);
            this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/concurrent", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
            ExecutorService executor = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; ++i) {
                executor.submit(() -> {
                    try {
                        startRepl.invoke((Object)topic, "r3");
                    }
                    catch (Exception e) {
                        Assert.fail((String)"setting replicator failed", (Throwable)e);
                    }
                });
            }
            Thread.sleep(3000L);
            ((PulsarClientImpl)Mockito.verify((Object)pulsarClient, (VerificationMode)Mockito.times((int)1))).createProducerAsync((ProducerConfigurationData)Mockito.any(ProducerConfigurationData.class), (Schema)Mockito.any(Schema.class), (ProducerInterceptors)ArgumentMatchers.eq(null));
            executor.shutdown();
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    @DataProvider(name="namespace")
    public Object[][] namespaceNameProvider() {
        return new Object[][]{{"pulsar/ns"}, {"pulsar/global/ns"}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="namespace")
    public void testReplication(String namespace) throws Exception {
        log.info("--- Starting ReplicatorTest::testReplication ---");
        TopicName dest = TopicName.get((String)String.format("persistent://%s/repltopic-%d", namespace, System.nanoTime()));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest);
            try {
                log.info("--- Starting producer --- " + this.url2);
                ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                producer1.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer2.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer3.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer1.produce(1);
                                producer2.produce(1);
                                consumer1.receive(1);
                                consumer2.receive(1);
                                consumer3.receive(1);
                                consumer1.receive(1);
                                consumer2.receive(1);
                                consumer3.receive(1);
                            }
                            finally {
                                if (Collections.singletonList(consumer3).get(0) != null) {
                                    consumer3.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer2).get(0) != null) {
                                consumer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer1).get(0) != null) {
                            consumer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer3).get(0) != null) {
                        producer3.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicationOverrides() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
        for (int i = 0; i < 10; ++i) {
            TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns/repltopic-%d", System.nanoTime()));
            ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
            try {
                log.info("--- Starting producer --- " + this.url1);
                ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest);
                try {
                    log.info("--- Starting producer --- " + this.url2);
                    ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest);
                    try {
                        log.info("--- Starting producer --- " + this.url3);
                        ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url1);
                            ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url2);
                                ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                                try {
                                    log.info("--- Starting Consumer --- " + this.url3);
                                    producer1.produce(1, (TypedMessageBuilder<byte[]>)producer1.newMessage().disableReplication());
                                    consumer1.receive(1);
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    Assert.assertTrue((boolean)consumer3.drained());
                                    producer1.produce(1, (TypedMessageBuilder<byte[]>)producer1.newMessage().replicationClusters(Lists.newArrayList((Object[])new String[]{"r1", "r3"})));
                                    consumer1.receive(1);
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    consumer3.receive(1);
                                    producer1.produce(1);
                                    consumer1.receive(1);
                                    consumer2.receive(1);
                                    consumer3.receive(1);
                                    Assert.assertTrue((boolean)consumer1.drained());
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    Assert.assertTrue((boolean)consumer3.drained());
                                    continue;
                                }
                                finally {
                                    if (Collections.singletonList(consumer3).get(0) != null) {
                                        consumer3.close();
                                    }
                                }
                            }
                            finally {
                                if (Collections.singletonList(consumer2).get(0) != null) {
                                    consumer2.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer1).get(0) != null) {
                                consumer1.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer3).get(0) != null) {
                            producer3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer2).get(0) != null) {
                        producer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer1).get(0) != null) {
                    producer1.close();
                }
            }
        }
    }

    @Test
    public void testFailures() throws Exception {
        log.info("--- Starting ReplicatorTest::testFailures ---");
        try {
            TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns/res-cons-id", new Object[0]));
            ReplicatorTestBase.MessageConsumer consumer = new ReplicatorTestBase.MessageConsumer(this.url2, dest, "pulsar.repl.");
            consumer.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatePeekAndSkip() throws Exception {
        TopicName dest = TopicName.get((String)"persistent://pulsar/ns/peekAndSeekTopic");
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
            try {
                producer1.produce(2);
                PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                PersistentReplicator replicator = (PersistentReplicator)topic.getReplicators().get(topic.getReplicators().keys().get(0));
                replicator.skipMessages(2);
                CompletableFuture result = replicator.peekNthMessage(1);
                Entry entry = (Entry)result.get(50L, TimeUnit.MILLISECONDS);
                Assert.assertNull((Object)entry);
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatorClearBacklog() throws Exception {
        TreeSet<String> testDests = new TreeSet<String>();
        TopicName dest = TopicName.get((String)"persistent://pulsar/ns/clearBacklogTopic");
        testDests.add(dest.toString());
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
            try {
                producer1.produce(2);
                PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                PersistentReplicator replicator = (PersistentReplicator)Mockito.spy((Object)topic.getReplicators().get(topic.getReplicators().keys().get(0)));
                replicator.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
                replicator.clearBacklog().get();
                Thread.sleep(100L);
                replicator.updateRates();
                replicator.expireMessages(1);
                ReplicatorStats status = replicator.getStats();
                Assert.assertEquals((long)status.replicationBacklog, (long)0L);
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=true, timeOut=30000L)
    public void testResetCursorNotFail() throws Exception {
        log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
        TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns/resetrepltopic-%d", System.nanoTime()));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
            try {
                log.info("--- Starting Consumer --- " + this.url1);
                producer1.produce(2);
                consumer1.receive(2);
                this.admin1.topics().resetCursor(dest.toString(), "sub-id", System.currentTimeMillis());
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicationForBatchMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
        TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime()));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest, true);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest, true);
            try {
                log.info("--- Starting producer --- " + this.url2);
                ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest, true);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                producer1.produceBatch(10);
                                consumer1.receive(10);
                                consumer2.receive(10);
                                consumer3.receive(10);
                                producer2.produceBatch(10);
                                consumer1.receive(10);
                                consumer2.receive(10);
                                consumer3.receive(10);
                            }
                            finally {
                                if (Collections.singletonList(consumer3).get(0) != null) {
                                    consumer3.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer2).get(0) != null) {
                                consumer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer1).get(0) != null) {
                            consumer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer3).get(0) != null) {
                        producer3.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testDeleteReplicatorFailure() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String topicName = "persistent://pulsar/ns/repltopicbatch";
        TopicName dest = TopicName.get((String)"persistent://pulsar/ns/repltopicbatch");
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference("persistent://pulsar/ns/repltopicbatch").get();
            String replicatorClusterName = (String)topic.getReplicators().keys().get(0);
            ManagedLedgerImpl ledger = (ManagedLedgerImpl)topic.getManagedLedger();
            final CountDownLatch latch = new CountDownLatch(1);
            ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new AsyncCallbacks.DeleteCursorCallback(){

                public void deleteCursorComplete(Object ctx) {
                    latch.countDown();
                }

                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }
            }, null);
            latch.await();
            Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
            removeReplicator.setAccessible(true);
            CompletableFuture result = (CompletableFuture)removeReplicator.invoke((Object)topic, replicatorClusterName);
            result.thenApply(v -> {
                Assert.assertNull((Object)topic.getPersistentReplicator(replicatorClusterName));
                return null;
            });
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(priority=5, timeOut=30000L)
    public void testReplicatorProducerClosing() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String topicName = "persistent://pulsar/ns/repltopicbatch";
        TopicName dest = TopicName.get((String)"persistent://pulsar/ns/repltopicbatch");
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference("persistent://pulsar/ns/repltopicbatch").get();
            String replicatorClusterName = (String)topic.getReplicators().keys().get(0);
            Replicator replicator = topic.getPersistentReplicator(replicatorClusterName);
            this.pulsar2.close();
            this.pulsar3.close();
            replicator.disconnect(false);
            Thread.sleep(100L);
            Field field = AbstractReplicator.class.getDeclaredField("producer");
            field.setAccessible(true);
            ProducerImpl producer = (ProducerImpl)field.get(replicator);
            Assert.assertNull((Object)producer);
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=60000L, enabled=true, priority=-1)
    public void testResumptionAfterBacklogRelaxed() throws Exception {
        List policies = Lists.newArrayList();
        policies.add(BacklogQuota.RetentionPolicy.producer_exception);
        policies.add(BacklogQuota.RetentionPolicy.producer_request_hold);
        for (BacklogQuota.RetentionPolicy policy : policies) {
            this.admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(0x100000L, policy));
            Thread.sleep(200L);
            TopicName dest = TopicName.get((String)String.format("persistent://pulsar/ns1/%s-%d", policy, System.currentTimeMillis()));
            ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
            try {
                ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                try {
                    PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                    Replicator replicator = topic.getPersistentReplicator("r2");
                    producer1.produce(1);
                    Thread.sleep(500L);
                    this.admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(1L, policy));
                    Thread.sleep(6000L);
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)0L);
                    producer1.produce(1);
                    Thread.sleep(500L);
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)1L);
                    consumer2.receive(1);
                    consumer2.receive(1);
                    int retry = 10;
                    for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0L; ++i) {
                        if (i == retry - 1) continue;
                        Thread.sleep(100L);
                    }
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)0L);
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) == null) continue;
                    consumer2.close();
                }
            }
            finally {
                if (Collections.singletonList(producer1).get(0) == null) continue;
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=15000L)
    public void testCloseReplicatorStartProducer() throws Exception {
        TopicName dest = TopicName.get((String)"persistent://pulsar/ns1/closeCursor");
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
            try {
                ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                try {
                    PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                    PersistentReplicator replicator = (PersistentReplicator)topic.getPersistentReplicator("r2");
                    Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
                    cursorField.setAccessible(true);
                    ManagedCursor cursor = (ManagedCursor)cursorField.get(replicator);
                    cursor.close();
                    producer1.produce(10);
                    try {
                        cursor.readEntriesOrWait(10);
                        Assert.fail((String)"It should have failed");
                    }
                    catch (Exception e) {
                        Assert.assertEquals(e.getClass(), ManagedLedgerException.CursorAlreadyClosedException.class);
                    }
                    replicator.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.CursorAlreadyClosedException("Cursor already closed exception"), null);
                    Thread.sleep(100L);
                    Field producerField = AbstractReplicator.class.getDeclaredField("producer");
                    producerField.setAccessible(true);
                    ProducerImpl replicatorProducer = (ProducerImpl)producerField.get(replicator);
                    Assert.assertNull((Object)replicatorProducer);
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) != null) {
                        consumer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    @Test(timeOut=30000L)
    public void verifyChecksumAfterReplication() throws Exception {
        String topicName = "persistent://pulsar/ns/checksumAfterReplication";
        PulsarClient c1 = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        Producer p1 = c1.newProducer().topic("persistent://pulsar/ns/checksumAfterReplication").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PulsarClient c2 = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
        RawReader reader2 = (RawReader)RawReader.create((PulsarClient)c2, (String)"persistent://pulsar/ns/checksumAfterReplication", (String)"sub").get();
        p1.send((Object)"Hello".getBytes());
        RawMessage msg = (RawMessage)reader2.readNextAsync().get();
        ByteBuf b = msg.getHeadersAndPayload();
        Assert.assertTrue((boolean)Commands.hasChecksum((ByteBuf)b));
        int parsedChecksum = Commands.readChecksum((ByteBuf)b);
        int computedChecksum = Crc32cIntChecksum.computeChecksum((ByteBuf)b);
        Assert.assertEquals((int)parsedChecksum, (int)computedChecksum);
        p1.close();
        reader2.closeAsync().get();
    }

    @Test(dataProvider="partitionedTopic")
    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/partitionedNs-" + isPartitionedTopic;
        String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
        String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
        BrokerService brokerService = this.pulsar1.getBrokerService();
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        if (isPartitionedTopic) {
            this.admin1.topics().createPartitionedTopic(persistentTopicName, 5);
            this.admin1.topics().createPartitionedTopic(nonPersistentTopicName, 5);
        }
        PulsarClient client = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        client.newProducer().topic("persistent://" + namespace + "/dummyTopic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            brokerService.getOrCreateTopic(persistentTopicName).get();
            if (isPartitionedTopic) {
                Assert.fail((String)"Topic creation fails with partitioned topic as replicator init fails");
            }
        }
        catch (Exception e) {
            if (!isPartitionedTopic) {
                Assert.fail((String)"Topic creation should not fail without any partitioned topic");
            }
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.NamingException));
        }
        try {
            brokerService.getOrCreateTopic(nonPersistentTopicName).get();
            if (isPartitionedTopic) {
                Assert.fail((String)"Topic creation fails with partitioned topic as replicator init fails");
            }
        }
        catch (Exception e) {
            if (!isPartitionedTopic) {
                Assert.fail((String)"Topic creation should not fail without any partitioned topic");
            }
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.NamingException));
        }
    }

    @Test
    public void testReplicatedCluster() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
        String namespace = "pulsar/global/repl";
        String topicName = String.format("persistent://%s/topic1", "pulsar/global/repl");
        this.admin1.namespaces().createNamespace("pulsar/global/repl");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/repl", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(topicName, 4);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Producer producer1 = client1.newProducer().topic(topicName).create();
        Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("s1").subscribe();
        Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("s1").subscribe();
        byte[] value = "test".getBytes();
        TypedMessageBuilder msg = producer1.newMessage().replicationClusters(Lists.newArrayList((Object[])new String[]{"r1"})).value((Object)value);
        msg.send();
        Assert.assertEquals((byte[])((byte[])consumer1.receive().getValue()), (byte[])value);
        Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
        if (msg2 != null) {
            Assert.fail((String)"msg should have not been replicated to remote cluster");
        }
        consumer1.close();
        consumer2.close();
        producer1.close();
    }
}

