/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class PersistentTopicTest {
    private PulsarService pulsar;
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String successPartitionTopicName = "persistent://prop/use/ns-abc/successTopic-partition-0";
    final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    final String successSubName = "successSub";
    final String successSubName2 = "successSub2";
    final String successSubName3 = "successSub3";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

    @BeforeMethod
    public void setup() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.pulsar = (PulsarService)Mockito.spy((Object)new PulsarService(svcConfig));
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        ((PulsarService)Mockito.doReturn((Object)Mockito.mock(Compactor.class)).when((Object)this.pulsar)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)this.pulsar)).getManagedLedgerFactory();
        MockZooKeeper mockZk = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)mockZk).when((Object)this.pulsar)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper((ZooKeeper)mockZk, this.pulsar.getOrderedExecutor().chooseThread(0L)))).when((Object)this.pulsar)).getBookKeeperClient();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)this.pulsar)).getLocalZkCache();
        this.configCacheService = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.empty()).when((Object)zkDataCache)).get((String)Mockito.any());
        LocalZooKeeperCacheService zkCache = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkDataCache)).getAsync((String)Mockito.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)zkCache)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((PulsarService)Mockito.doReturn((Object)zkCache).when((Object)this.pulsar)).getLocalZkCacheService();
        this.brokerService = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsar));
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)this.pulsar)).getBrokerService();
        this.serverCnx = (ServerCnx)Mockito.spy((Object)new ServerCnx(this.pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress("localhost", 1234)).when((Object)this.serverCnx)).clientAddress();
        NamespaceService nsSvc = (NamespaceService)Mockito.mock(NamespaceService.class);
        ((PulsarService)Mockito.doReturn((Object)nsSvc).when((Object)this.pulsar)).getNamespaceService();
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitOwned((ServiceUnitId)Mockito.any());
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitActive((TopicName)Mockito.any());
        this.setupMLAsyncCallbackMocks();
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.brokerService.getTopics().clear();
        this.brokerService.close();
        try {
            this.pulsar.close();
        }
        catch (Exception e) {
            log.warn("Failed to close pulsar service", (Throwable)e);
            throw e;
        }
    }

    @Test
    public void testCreateTopic() throws Exception {
        final ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        String topicName = "persistent://prop/use/ns-abc/topic1";
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        CompletionStage future = ((CompletableFuture)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic1").thenAccept(topic -> Assert.assertTrue((boolean)topic.toString().contains("persistent://prop/use/ns-abc/topic1")))).exceptionally(t -> {
            Assert.fail((String)"should not fail");
            return null;
        });
        try {
            ((CompletableFuture)future).get(1L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Assert.fail((String)"Should not fail or time out");
        }
    }

    @Test
    public void testCreateTopicMLFailure() throws Exception {
        String jinxedTopicName = "persistent://prop/use/ns-abc/topic3";
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                new Thread(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null)).start();
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        CompletableFuture future = this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic3");
        try {
            future.get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"should have failed");
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Should not time out");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPublishMessage() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        CountDownLatch latch = new CountDownLatch(1);
        topic.publishMessage(payload, (exception, ledgerId, entryId) -> latch.countDown());
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testDispatcherMultiConsumerReadFailed() throws Exception {
        PersistentTopic topic = (PersistentTopic)Mockito.spy((Object)new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        Mockito.when((Object)cursor.getName()).thenReturn((Object)"cursor");
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, null);
        dispatcher.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
        ((PersistentTopic)Mockito.verify((Object)topic, (VerificationMode)Mockito.atLeast((int)1))).getBrokerService();
    }

    @Test
    public void testDispatcherSingleConsumerReadFailed() throws Exception {
        PersistentTopic topic = (PersistentTopic)Mockito.spy((Object)new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        Mockito.when((Object)cursor.getName()).thenReturn((Object)"cursor");
        PersistentDispatcherSingleActiveConsumer dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, PulsarApi.CommandSubscribe.SubType.Exclusive, 1, topic, null);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        dispatcher.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object)consumer);
        ((PersistentTopic)Mockito.verify((Object)topic, (VerificationMode)Mockito.atLeast((int)1))).getBrokerService();
    }

    @Test
    public void testPublishMessageMLFailure() throws Exception {
        String successTopicName = "persistent://prop/use/ns-abc/successTopic";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", ledgerMock, this.brokerService);
        PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
        messageMetadata.setPublishTime(System.currentTimeMillis());
        messageMetadata.setProducerName("prod-name");
        messageMetadata.setSequenceId(1L);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        CountDownLatch latch = new CountDownLatch(1);
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addFailed(new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)ledgerMock)).asyncAddEntry((ByteBuf)Mockito.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)Mockito.any(AsyncCallbacks.AddEntryCallback.class), Mockito.any());
        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
            if (exception == null) {
                Assert.fail((String)"publish should have failed");
            } else {
                latch.countDown();
            }
        });
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testAddRemoveProducer() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        String role = "appid1";
        Producer producer = new Producer((Topic)topic, this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest);
        topic.addProducer(producer);
        Assert.assertEquals((long)topic.getProducers().size(), (long)1L);
        try {
            topic.addProducer(producer);
            Assert.fail((String)"Should have failed with naming exception because producer 'null' is already connected to the topic");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.NamingException));
        }
        Assert.assertEquals((long)topic.getProducers().size(), (long)1L);
        PersistentTopic failTopic = new PersistentTopic("persistent://prop/use/ns-abc/failTopic", this.ledgerMock, this.brokerService);
        Producer failProducer = new Producer((Topic)failTopic, this.serverCnx, 2L, "prod-name", role, false, null, SchemaVersion.Latest);
        try {
            topic.addProducer(failProducer);
            Assert.fail((String)"should have failed");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        topic.removeProducer(producer);
        Assert.assertEquals((long)topic.getProducers().size(), (long)0L);
        topic.removeProducer(producer);
    }

    public void testMaxProducers() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        String role = "appid1";
        Producer producer = new Producer((Topic)topic, this.serverCnx, 1L, "prod-name1", role, false, null, SchemaVersion.Latest);
        topic.addProducer(producer);
        Assert.assertEquals((long)topic.getProducers().size(), (long)1L);
        Producer producer2 = new Producer((Topic)topic, this.serverCnx, 2L, "prod-name2", role, false, null, SchemaVersion.Latest);
        topic.addProducer(producer2);
        Assert.assertEquals((long)topic.getProducers().size(), (long)2L);
        try {
            Producer producer3 = new Producer((Topic)topic, this.serverCnx, 3L, "prod-name3", role, false, null, SchemaVersion.Latest);
            topic.addProducer(producer3);
            Assert.fail((String)"should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ProducerBusyException));
        }
    }

    @Test
    public void testMaxProducersForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxProducersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxProducers();
    }

    @Test
    public void testMaxProducersForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_producers_per_topic = 2;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        this.testMaxProducers();
    }

    @Test
    public void testSubscribeFail() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f1 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        try {
            f1.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.NamingException));
        }
    }

    @Test
    public void testSubscribeUnsubscribe() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f1 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f1.get();
        CompletableFuture f2 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        try {
            f2.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        CompletableFuture f3 = topic.unsubscribe("successSub");
        f3.get();
        Assert.assertNull((Object)topic.getSubscription("successSub"));
    }

    @Test
    public void testChangeSubscriptionType() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "change-sub-type", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer);
        consumer.close();
        PulsarApi.CommandSubscribe.SubType previousSubType = PulsarApi.CommandSubscribe.SubType.Exclusive;
        for (PulsarApi.CommandSubscribe.SubType subType : Lists.newArrayList((Object[])new PulsarApi.CommandSubscribe.SubType[]{PulsarApi.CommandSubscribe.SubType.Shared, PulsarApi.CommandSubscribe.SubType.Failover, PulsarApi.CommandSubscribe.SubType.Key_Shared, PulsarApi.CommandSubscribe.SubType.Exclusive})) {
            Dispatcher previousDispatcher = sub.getDispatcher();
            consumer = new Consumer((Subscription)sub, subType, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
            sub.addConsumer(consumer);
            Assert.assertTrue((boolean)sub.getDispatcher().isConsumerConnected());
            Assert.assertFalse((boolean)sub.getDispatcher().isClosed());
            Assert.assertEquals((Object)sub.getDispatcher().getType(), (Object)subType);
            Assert.assertFalse((boolean)previousDispatcher.isConsumerConnected());
            Assert.assertTrue((boolean)previousDispatcher.isClosed());
            Assert.assertEquals((Object)previousDispatcher.getType(), (Object)previousSubType);
            consumer.close();
            previousSubType = subType;
        }
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer);
        Assert.assertTrue((boolean)sub.getDispatcher().isConsumerConnected());
        try {
            sub.addConsumer(consumer);
            Assert.fail((String)"Should fail with ConsumerBusyException");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ConsumerBusyException));
        }
        sub.removeConsumer(consumer);
        Assert.assertFalse((boolean)sub.getDispatcher().isConsumerConnected());
        try {
            sub.removeConsumer(consumer);
            Assert.fail((String)"Should fail with ServerMetadataException");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ServerMetadataException));
        }
    }

    @Test
    public void testAddRemoveConsumerDurableCursor() throws Exception {
        ((ManagedCursor)Mockito.doReturn((Object)false).when((Object)this.cursorMock)).isDurable();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer);
        Assert.assertFalse((boolean)sub.getDispatcher().isClosed());
        sub.removeConsumer(consumer);
        for (int i = 0; i < 100 && !sub.getDispatcher().isClosed(); ++i) {
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)sub.getDispatcher().isClosed());
    }

    public void testMaxConsumersShared() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", this.cursorMock, false);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        subscriptions.put((Object)"sub-1", (Object)sub);
        subscriptions.put((Object)"sub-2", (Object)sub2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Consumer consumer = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)1);
        Consumer consumer2 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer2);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)2);
        try {
            Consumer consumer3 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
            sub.addConsumer(consumer3);
            Assert.fail((String)"should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)2);
        Consumer consumer4 = new Consumer((Subscription)sub2, PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 4L, 0, "Cons4", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub2.addConsumer(consumer4);
        Assert.assertEquals((int)sub2.getConsumers().size(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        try {
            Consumer consumer5 = new Consumer((Subscription)sub2, PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 5L, 0, "Cons5", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
            sub2.addConsumer(consumer5);
            Assert.fail((String)"should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ConsumerBusyException));
        }
    }

    @Test
    public void testMaxConsumersSharedForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration)Mockito.doReturn((Object)3).when((Object)svcConfig)).getMaxConsumersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxConsumersShared();
    }

    @Test
    public void testMaxConsumersSharedForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn((Object)policies);
        this.testMaxConsumersShared();
    }

    public void testMaxConsumersFailover() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", this.cursorMock, false);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        subscriptions.put((Object)"sub-1", (Object)sub);
        subscriptions.put((Object)"sub-2", (Object)sub2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Consumer consumer = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)1);
        Consumer consumer2 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer2);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)2);
        try {
            Consumer consumer3 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
            sub.addConsumer(consumer3);
            Assert.fail((String)"should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)2);
        Consumer consumer4 = new Consumer((Subscription)sub2, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 4L, 0, "Cons4", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub2.addConsumer(consumer4);
        Assert.assertEquals((int)sub2.getConsumers().size(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        try {
            Consumer consumer5 = new Consumer((Subscription)sub2, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 5L, 0, "Cons5", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
            sub2.addConsumer(consumer5);
            Assert.fail((String)"should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ConsumerBusyException));
        }
    }

    @Test
    public void testMaxConsumersFailoverForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration)Mockito.doReturn((Object)3).when((Object)svcConfig)).getMaxConsumersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxConsumersFailover();
    }

    @Test
    public void testMaxConsumersFailoverForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn((Object)policies);
        this.testMaxConsumersFailover();
    }

    @Test
    public void testUbsubscribeRaceConditions() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        Consumer consumer1 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        sub.addConsumer(consumer1);
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                Thread.sleep(1000L);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(AsyncCallbacks.DeleteCursorCallback.class), Mockito.any());
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(() -> {
            sub.doUnsubscribe(consumer1);
            return null;
        }).get();
        try {
            Thread.sleep(10L);
            new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.SubscriptionFencedException));
        }
    }

    @Test
    public void testDeleteTopic() throws Exception {
        PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        String role = "appid1";
        topic.delete().get();
        Assert.assertFalse((boolean)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Producer producer = new Producer((Topic)topic, this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest);
        topic.addProducer(producer);
        Assert.assertTrue((boolean)topic.delete().isCompletedExceptionally());
        topic.removeProducer(producer);
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f1 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f1.get();
        Assert.assertTrue((boolean)topic.delete().isCompletedExceptionally());
        topic.unsubscribe("successSub");
    }

    @Test
    public void testDeleteAndUnsubscribeTopic() throws Exception {
        final PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f1 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f1.get();
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    Assert.assertFalse((boolean)topic.delete().isCompletedExceptionally());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread unsubscriber = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    topic.unsubscribe("successSub");
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        unsubscriber.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
        final PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f1 = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f1.get();
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    Thread.sleep(5L, 0);
                    log.info("deleter outcome is {}", topic.delete().get());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread unsubscriber = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    ConcurrentOpenHashMap subscriptions = topic.getSubscriptions();
                    PersistentSubscription ps = (PersistentSubscription)subscriptions.get((Object)"successSub");
                    log.info("unsubscriber outcome is {}", ps.doUnsubscribe((Consumer)ps.getConsumers().get(0)).get());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        unsubscriber.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    @Test
    public void testDeleteTopicRaceConditions() throws Exception {
        PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(1000L);
                ((AsyncCallbacks.DeleteLedgerCallback)invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback)Mockito.any(AsyncCallbacks.DeleteLedgerCallback.class), Mockito.any());
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(() -> {
            topic.delete();
            return null;
        }).get();
        try {
            String role = "appid1";
            Thread.sleep(10L);
            Producer producer = new Producer((Topic)topic, this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest);
            topic.addProducer(producer);
            Assert.fail((String)"Should have failed");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.TopicFencedException));
        }
        PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f = topic.subscribe(this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        try {
            f.get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.TopicFencedException));
        }
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        final CompletableFuture closeFuture = new CompletableFuture();
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursor)Mockito.doReturn((Object)"mockCursor").when((Object)this.cursorMock)).getName();
        ((ManagedCursor)Mockito.doReturn((Object)true).when((Object)this.cursorMock)).isDurable();
        ((ManagedCursor)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return closeFuture.complete(null);
            }
        }).when((Object)this.cursorMock)).asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                log.info("[{}] Successfully closed cursor ledger", (Object)"mockCursor");
                closeFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Error closing cursor for subscription", (Throwable)exception);
                closeFuture.completeExceptionally((Throwable)new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null);
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentTopicTest.this.ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addComplete((Position)new PositionImpl(1L, 1L), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncAddEntry((ByteBuf)Mockito.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)Mockito.any(AsyncCallbacks.AddEntryCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[2]).openCursorComplete(PersistentTopicTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[3]).openCursorComplete(PersistentTopicTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (Map)Mockito.any(Map.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback)invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback)Mockito.any(AsyncCallbacks.DeleteLedgerCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(AsyncCallbacks.DeleteCursorCallback.class), Mockito.any());
        ((ManagedCursor)Mockito.doAnswer(invokactionOnMock -> {
            ((AsyncCallbacks.MarkDeleteCallback)invokactionOnMock.getArguments()[2]).markDeleteComplete(invokactionOnMock.getArguments()[3]);
            return null;
        }).when((Object)this.cursorMock)).asyncMarkDelete((Position)Mockito.any(), (Map)Mockito.any(), (AsyncCallbacks.MarkDeleteCallback)Mockito.any(AsyncCallbacks.MarkDeleteCallback.class), Mockito.any());
    }

    @Test
    public void testFailoverSubscription() throws Exception {
        PersistentTopic topic1 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe cmd1 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        CompletableFuture f1 = topic1.subscribe(this.serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(), cmd1.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f1.get();
        PersistentTopic topic2 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic-partition-0", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe cmd2 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        CompletableFuture f2 = topic2.subscribe(this.serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(), cmd2.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f2.get();
        PulsarApi.CommandSubscribe cmd3 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(2L).setConsumerName("C2").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        CompletableFuture f3 = topic2.subscribe(this.serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(), cmd3.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f3.get();
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)1L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), (String)"C2");
        PulsarApi.CommandSubscribe cmd4 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(3L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        CompletableFuture f4 = topic2.subscribe(this.serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(), cmd4.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f4.get();
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)1L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), (long)3L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerName(), (String)"C2");
        PulsarApi.CommandSubscribe cmd5 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(2L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f5 = topic2.subscribe(this.serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(), cmd5.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        try {
            f5.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.SubscriptionBusyException));
        }
        PulsarApi.CommandSubscribe cmd6 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(4L).setConsumerName("C3").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub2").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        CompletableFuture f6 = topic2.subscribe(this.serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(), cmd6.getReadCompacted(), PulsarApi.CommandSubscribe.InitialPosition.Latest, false);
        f6.get();
        CompletableFuture f7 = topic2.unsubscribe("successSub2");
        f7.get();
        Assert.assertNull((Object)topic2.getSubscription("successSub2"));
        PersistentSubscription sub = topic2.getSubscription("successSub");
        Consumer cons = (Consumer)sub.getDispatcher().getConsumers().get(0);
        sub.removeConsumer(cons);
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)3L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        cons = (Consumer)sub.getDispatcher().getConsumers().get(0);
        sub.removeConsumer(cons);
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C2");
        CompletableFuture f8 = topic2.unsubscribe("successSub");
        f8.get();
        Assert.assertNull((Object)topic2.getSubscription("successSub"));
    }

    @Test
    public void testAtomicReplicationRemoval() throws Exception {
        String globalTopicName = "persistent://prop/global/ns-abc/successTopic";
        String localCluster = "local";
        String remoteCluster = "remote";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doNothing().when((Object)ledgerMock)).asyncDeleteCursor((String)Mockito.any(), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(), Mockito.any());
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/global/ns-abc/successTopic", ledgerMock, this.brokerService);
        String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster;
        ConcurrentOpenHashMap replicatorMap = topic.getReplicators();
        URL brokerUrl = new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get());
        PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursor)Mockito.doReturn((Object)remoteCluster).when((Object)cursor)).getName();
        this.brokerService.getReplicationClients().put((Object)remoteCluster, (Object)client);
        PersistentReplicator replicator = (PersistentReplicator)Mockito.spy((Object)new PersistentReplicator(topic, cursor, localCluster, remoteCluster, this.brokerService));
        replicatorMap.put((Object)remoteReplicatorName, (Object)replicator);
        Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
        removeMethod.setAccessible(true);
        removeMethod.invoke((Object)topic, remoteReplicatorName);
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/global/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
        topic.startReplProducers();
        ((PersistentReplicator)Mockito.verify((Object)replicator, (VerificationMode)Mockito.times((int)0))).startProducer();
        ArgumentCaptor captor = ArgumentCaptor.forClass(AsyncCallbacks.DeleteCursorCallback.class);
        ((ManagedLedger)Mockito.verify((Object)ledgerMock)).asyncDeleteCursor((String)Mockito.any(), (AsyncCallbacks.DeleteCursorCallback)captor.capture(), Mockito.any());
        AsyncCallbacks.DeleteCursorCallback callback = (AsyncCallbacks.DeleteCursorCallback)captor.getValue();
        callback.deleteCursorComplete(null);
    }

    @Test
    public void testClosingReplicationProducerTwice() throws Exception {
        String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice";
        String localCluster = "local";
        String remoteCluster = "remote";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doNothing().when((Object)ledgerMock)).asyncDeleteCursor((String)Mockito.any(), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(), Mockito.any());
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/global/ns/testClosingReplicationProducerTwice", ledgerMock, this.brokerService);
        URL brokerUrl = new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get());
        PulsarClient client = (PulsarClient)Mockito.spy((Object)PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
        PulsarClientImpl clientImpl = (PulsarClientImpl)client;
        ((PulsarClientImpl)Mockito.doReturn(new CompletableFuture()).when((Object)clientImpl)).createProducerAsync((ProducerConfigurationData)Mockito.any(ProducerConfigurationData.class), (Schema)Mockito.any(Schema.class));
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursor)Mockito.doReturn((Object)remoteCluster).when((Object)cursor)).getName();
        this.brokerService.getReplicationClients().put((Object)remoteCluster, (Object)client);
        PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, this.brokerService);
        ((PulsarClientImpl)Mockito.verify((Object)clientImpl)).createProducerAsync((ProducerConfigurationData)Mockito.any(ProducerConfigurationData.class), (Schema)Mockito.any(), (ProducerInterceptors)Mockito.eq(null));
        replicator.disconnect(false);
        replicator.disconnect(false);
        replicator.startProducer();
        ((PulsarClientImpl)Mockito.verify((Object)clientImpl, (VerificationMode)Mockito.times((int)2))).createProducerAsync((ProducerConfigurationData)Mockito.any(), (Schema)Mockito.any(), (ProducerInterceptors)Mockito.any());
    }

    @Test
    public void testCompactorSubscription() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic)Mockito.mock(CompactedTopic.class);
        CompactorSubscription sub = new CompactorSubscription(topic, compactedTopic, "__compaction", this.cursorMock);
        PositionImpl position = new PositionImpl(1L, 1L);
        long ledgerId = 202112766L;
        sub.acknowledgeMessage(Collections.singletonList(position), PulsarApi.CommandAck.AckType.Cumulative, (Map)ImmutableMap.of((Object)"CompactedTopicLedger", (Object)ledgerId));
        ((CompactedTopic)Mockito.verify((Object)compactedTopic, (VerificationMode)Mockito.times((int)1))).newCompactedLedger((Position)position, ledgerId);
    }

    @Test
    public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
        long ledgerId = 202112766L;
        ImmutableMap properties = ImmutableMap.of((Object)"CompactedTopicLedger", (Object)ledgerId);
        PositionImpl position = new PositionImpl(1L, 1L);
        ((ManagedCursor)Mockito.doAnswer(arg_0 -> PersistentTopicTest.lambda$testCompactorSubscriptionUpdatedOnInit$7((Map)properties, arg_0)).when((Object)this.cursorMock)).getProperties();
        ((ManagedCursor)Mockito.doAnswer(invokactionOnMock -> position).when((Object)this.cursorMock)).getMarkDeletedPosition();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic)Mockito.mock(CompactedTopic.class);
        new CompactorSubscription(topic, compactedTopic, "__compaction", this.cursorMock);
        ((CompactedTopic)Mockito.verify((Object)compactedTopic, (VerificationMode)Mockito.times((int)1))).newCompactedLedger((Position)position, ledgerId);
    }

    @Test
    public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
        ((ManagedLedger)Mockito.doReturn((Object)10L).when((Object)this.ledgerMock)).getEstimatedBacklogSize();
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        ManagedCursor subCursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((ManagedLedger)Mockito.doReturn((Object)Lists.newArrayList((Object[])new ManagedCursor[]{subCursor})).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursor)Mockito.doReturn((Object)"__compaction").when((Object)subCursor)).getName();
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
        ((ManagedCursor)Mockito.doReturn((Object)10L).when((Object)subCursor)).getEstimatedSizeSinceMarkDeletePosition();
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionDisabledWithZeroThreshold() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 0L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        ((ManagedLedger)Mockito.doReturn((Object)1000L).when((Object)this.ledgerMock)).getEstimatedBacklogSize();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
    }

    private static /* synthetic */ Object lambda$testCompactorSubscriptionUpdatedOnInit$7(Map properties, InvocationOnMock invokactionOnMock) throws Throwable {
        return properties;
    }
}

