/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class PersistentDispatcherFailoverConsumerTest {
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ServerCnx serverCnxWithOldVersion;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    private ChannelHandlerContext channelCtx;
    private LinkedBlockingQueue<PulsarApi.CommandActiveConsumerChange> consumerChanges;
    final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
    final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class);

    @BeforeMethod
    public void setup() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        PulsarService pulsar = (PulsarService)Mockito.spy((Object)new PulsarService(svcConfig));
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)pulsar)).getConfiguration();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)pulsar)).getManagedLedgerFactory();
        MockZooKeeper mockZk = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)mockZk).when((Object)pulsar)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper((ZooKeeper)mockZk, pulsar.getOrderedExecutor().chooseThread(0L)))).when((Object)pulsar)).getBookKeeperClient();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)pulsar)).getLocalZkCache();
        this.configCacheService = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        LocalZooKeeperCacheService zkCache = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkDataCache)).getAsync((String)Mockito.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)zkCache)).policiesCache();
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)pulsar)).getConfigurationCache();
        ((PulsarService)Mockito.doReturn((Object)zkCache).when((Object)pulsar)).getLocalZkCacheService();
        this.brokerService = (BrokerService)Mockito.spy((Object)new BrokerService(pulsar));
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)pulsar)).getBrokerService();
        this.consumerChanges = new LinkedBlockingQueue();
        this.channelCtx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext)Mockito.doAnswer(invocationOnMock -> {
            ByteBuf buf = (ByteBuf)invocationOnMock.getArgument(0);
            ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
            try {
                int cmdSize = (int)cmdBuf.readUnsignedInt();
                int writerIndex = cmdBuf.writerIndex();
                cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize);
                ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get((ByteBuf)cmdBuf);
                PulsarApi.BaseCommand.Builder cmdBuilder = PulsarApi.BaseCommand.newBuilder();
                PulsarApi.BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
                cmdBuilder.recycle();
                cmdBuf.writerIndex(writerIndex);
                cmdInputStream.recycle();
                if (cmd.hasActiveConsumerChange()) {
                    this.consumerChanges.put(cmd.getActiveConsumerChange());
                }
                cmd.recycle();
            }
            finally {
                cmdBuf.release();
            }
            return null;
        }).when((Object)this.channelCtx)).writeAndFlush(Mockito.any(), (ChannelPromise)Mockito.any());
        this.serverCnx = (ServerCnx)Mockito.spy((Object)new ServerCnx(pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress("localhost", 1234)).when((Object)this.serverCnx)).clientAddress();
        Mockito.when((Object)this.serverCnx.getRemoteEndpointProtocolVersion()).thenReturn((Object)PulsarApi.ProtocolVersion.v12.getNumber());
        Mockito.when((Object)this.serverCnx.ctx()).thenReturn((Object)this.channelCtx);
        this.serverCnxWithOldVersion = (ServerCnx)Mockito.spy((Object)new ServerCnx(pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnxWithOldVersion)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnxWithOldVersion)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress("localhost", 1234)).when((Object)this.serverCnxWithOldVersion)).clientAddress();
        Mockito.when((Object)this.serverCnxWithOldVersion.getRemoteEndpointProtocolVersion()).thenReturn((Object)PulsarApi.ProtocolVersion.v11.getNumber());
        Mockito.when((Object)this.serverCnxWithOldVersion.ctx()).thenReturn((Object)this.channelCtx);
        NamespaceService nsSvc = (NamespaceService)Mockito.mock(NamespaceService.class);
        ((PulsarService)Mockito.doReturn((Object)nsSvc).when((Object)pulsar)).getNamespaceService();
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitOwned((ServiceUnitId)Mockito.any(NamespaceBundle.class));
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitActive((TopicName)Mockito.any(TopicName.class));
        this.setupMLAsyncCallbackMocks();
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursor)Mockito.doReturn((Object)"mockCursor").when((Object)this.cursorMock)).getName();
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentDispatcherFailoverConsumerTest.this.ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addComplete((Position)new PositionImpl(1L, 1L), null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncAddEntry((byte[])Mockito.any(byte[].class), (AsyncCallbacks.AddEntryCallback)Mockito.any(AsyncCallbacks.AddEntryCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[2]).openCursorComplete(PersistentDispatcherFailoverConsumerTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback)invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback)Mockito.any(AsyncCallbacks.DeleteLedgerCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(AsyncCallbacks.DeleteCursorCallback.class), Mockito.any());
    }

    private void verifyActiveConsumerChange(PulsarApi.CommandActiveConsumerChange change, long consumerId, boolean isActive) {
        AssertJUnit.assertEquals((long)consumerId, (long)change.getConsumerId());
        AssertJUnit.assertEquals((boolean)isActive, (boolean)change.getIsActive());
        change.recycle();
    }

    @Test
    public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        int partitionIndex = 0;
        PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, topic, (Subscription)sub);
        Assert.assertFalse((boolean)pdfc.isConsumerConnected());
        Consumer consumer1 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        pdfc.addConsumer(consumer1);
        CopyOnWriteArrayList consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)((Consumer)consumers.get(0)).consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)1, (int)consumers.size());
        Assert.assertNull((Object)this.consumerChanges.poll());
        ((ChannelHandlerContext)Mockito.verify((Object)this.channelCtx, (VerificationMode)Mockito.times((int)0))).write(Mockito.any());
        Consumer consumer2 = new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        pdfc.addConsumer(consumer2);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)((Consumer)consumers.get(0)).consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)2, (int)consumers.size());
        PulsarApi.CommandActiveConsumerChange change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 2L, false);
        ((ChannelHandlerContext)Mockito.verify((Object)this.channelCtx, (VerificationMode)Mockito.times((int)1))).writeAndFlush(Mockito.any(), (ChannelPromise)Mockito.any());
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        int partitionIndex = 4;
        PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, topic, (Subscription)sub);
        Assert.assertFalse((boolean)pdfc.isConsumerConnected());
        Consumer consumer1 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer1);
        CopyOnWriteArrayList consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)((Consumer)consumers.get(0)).consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)1, (int)consumers.size());
        PulsarApi.CommandActiveConsumerChange change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, true);
        ((Consumer)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        pdfc.addConsumer(consumer1);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)((Consumer)consumers.get(0)).consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)2, (int)consumers.size());
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, true);
        ((Consumer)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.times((int)2))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        Consumer consumer2 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer2);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)3, (int)consumers.size());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 2L, false);
        ((Consumer)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.times((int)2))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        ((Consumer)Mockito.verify((Object)consumer2, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        Consumer consumer0 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Exclusive, topic.getName(), 0L, 0, "Cons0", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer0);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer0.consumerName());
        AssertJUnit.assertEquals((int)4, (int)consumers.size());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 0L, true);
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, false);
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, false);
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 2L, false);
        ((Consumer)Mockito.verify((Object)consumer0, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer0));
        ((Consumer)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.times((int)2))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        ((Consumer)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.times((int)2))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer0));
        ((Consumer)Mockito.verify((Object)consumer2, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        ((Consumer)Mockito.verify((Object)consumer2, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer0));
        pdfc.removeConsumer(consumer2);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)3, (int)consumers.size());
        Assert.assertNull((Object)this.consumerChanges.poll());
        Assert.assertFalse((boolean)pdfc.canUnsubscribe(consumer0));
        pdfc.removeConsumer(consumer0);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)2, (int)consumers.size());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, true);
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 1L, true);
        String cause = "";
        try {
            pdfc.removeConsumer(consumer0);
        }
        catch (Exception e) {
            cause = e.getMessage();
        }
        AssertJUnit.assertEquals((String)cause, (String)"Consumer was not connected");
        pdfc.removeConsumer(consumer1);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        AssertJUnit.assertEquals((int)1, (int)consumers.size());
        Assert.assertNull((Object)this.consumerChanges.poll());
        AssertJUnit.assertTrue((boolean)pdfc.canUnsubscribe(consumer1));
    }

    @Test
    public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
        log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        int partitionIndex = -1;
        PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, topic, (Subscription)sub);
        Assert.assertFalse((boolean)pdfc.isConsumerConnected());
        Consumer consumer1 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 1L, 1, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer1);
        CopyOnWriteArrayList consumers = pdfc.getConsumers();
        AssertJUnit.assertEquals((int)1, (int)consumers.size());
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        Consumer consumer2 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 2L, 1, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer2);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertEquals((int)2, (int)consumers.size());
        PulsarApi.CommandActiveConsumerChange change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 2L, false);
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        ((Consumer)Mockito.verify((Object)consumer2, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        Consumer consumer3 = (Consumer)Mockito.spy((Object)new Consumer((Subscription)sub, PulsarApi.CommandSubscribe.SubType.Failover, topic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest));
        pdfc.addConsumer(consumer3);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertEquals((int)3, (int)consumers.size());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 3L, false);
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer1.consumerName());
        ((Consumer)Mockito.verify((Object)consumer3, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer1));
        pdfc.removeConsumer(consumer1);
        consumers = pdfc.getConsumers();
        AssertJUnit.assertEquals((int)2, (int)consumers.size());
        change = this.consumerChanges.take();
        this.verifyActiveConsumerChange(change, 2L, true);
        AssertJUnit.assertSame((Object)pdfc.getActiveConsumer().consumerName(), (Object)consumer2.consumerName());
        ((Consumer)Mockito.verify((Object)consumer2, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer2));
        ((Consumer)Mockito.verify((Object)consumer3, (VerificationMode)Mockito.times((int)1))).notifyActiveConsumerChange((Consumer)Mockito.same((Object)consumer2));
    }

    @Test
    public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, this.cursorMock, null);
        Consumer consumer1 = this.createConsumer(0, 2, false, 1);
        Consumer consumer2 = this.createConsumer(0, 2, false, 2);
        Consumer consumer3 = this.createConsumer(0, 2, false, 3);
        Consumer consumer4 = this.createConsumer(1, 2, false, 4);
        Consumer consumer5 = this.createConsumer(1, 1, false, 5);
        Consumer consumer6 = this.createConsumer(1, 2, false, 6);
        Consumer consumer7 = this.createConsumer(2, 1, false, 7);
        Consumer consumer8 = this.createConsumer(2, 1, false, 8);
        Consumer consumer9 = this.createConsumer(2, 1, false, 9);
        dispatcher.addConsumer(consumer1);
        dispatcher.addConsumer(consumer2);
        dispatcher.addConsumer(consumer3);
        dispatcher.addConsumer(consumer4);
        dispatcher.addConsumer(consumer5);
        dispatcher.addConsumer(consumer6);
        dispatcher.addConsumer(consumer7);
        dispatcher.addConsumer(consumer8);
        dispatcher.addConsumer(consumer9);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer5);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer6);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer6);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer7);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer8);
        Consumer consumer10 = this.createConsumer(0, 2, false, 10);
        dispatcher.addConsumer(consumer10);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer10);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer10);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer9);
    }

    @Test
    public void testFewBlockedConsumerSamePriority() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, this.cursorMock, null);
        Consumer consumer1 = this.createConsumer(0, 2, false, 1);
        Consumer consumer2 = this.createConsumer(0, 2, false, 2);
        Consumer consumer3 = this.createConsumer(0, 2, false, 3);
        Consumer consumer4 = this.createConsumer(0, 2, false, 4);
        Consumer consumer5 = this.createConsumer(0, 1, true, 5);
        Consumer consumer6 = this.createConsumer(0, 2, true, 6);
        dispatcher.addConsumer(consumer1);
        dispatcher.addConsumer(consumer2);
        dispatcher.addConsumer(consumer3);
        dispatcher.addConsumer(consumer4);
        dispatcher.addConsumer(consumer5);
        dispatcher.addConsumer(consumer6);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertNull((Object)this.getNextConsumer(dispatcher));
    }

    @Test
    public void testFewBlockedConsumerDifferentPriority() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, this.cursorMock, null);
        Consumer consumer1 = this.createConsumer(0, 2, false, 1);
        Consumer consumer2 = this.createConsumer(0, 2, false, 2);
        Consumer consumer3 = this.createConsumer(0, 2, false, 3);
        Consumer consumer4 = this.createConsumer(0, 2, false, 4);
        Consumer consumer5 = this.createConsumer(0, 1, true, 5);
        Consumer consumer6 = this.createConsumer(0, 2, true, 6);
        Consumer consumer7 = this.createConsumer(1, 2, false, 7);
        Consumer consumer8 = this.createConsumer(1, 10, true, 8);
        Consumer consumer9 = this.createConsumer(1, 2, false, 9);
        Consumer consumer10 = this.createConsumer(2, 2, false, 10);
        Consumer consumer11 = this.createConsumer(2, 10, true, 11);
        Consumer consumer12 = this.createConsumer(2, 2, false, 12);
        dispatcher.addConsumer(consumer1);
        dispatcher.addConsumer(consumer2);
        dispatcher.addConsumer(consumer3);
        dispatcher.addConsumer(consumer4);
        dispatcher.addConsumer(consumer5);
        dispatcher.addConsumer(consumer6);
        dispatcher.addConsumer(consumer7);
        dispatcher.addConsumer(consumer8);
        dispatcher.addConsumer(consumer9);
        dispatcher.addConsumer(consumer10);
        dispatcher.addConsumer(consumer11);
        dispatcher.addConsumer(consumer12);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer1);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer2);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer3);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer7);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer9);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer7);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer9);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer10);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer12);
        Consumer consumer13 = this.createConsumer(0, 2, false, 13);
        Consumer consumer14 = this.createConsumer(0, 2, true, 14);
        dispatcher.addConsumer(consumer13);
        dispatcher.addConsumer(consumer14);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer13);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer13);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer10);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer12);
        Assert.assertNull((Object)this.getNextConsumer(dispatcher));
    }

    @Test
    public void testFewBlockedConsumerDifferentPriority2() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, this.cursorMock, null);
        Consumer consumer1 = this.createConsumer(0, 2, true, 1);
        Consumer consumer2 = this.createConsumer(0, 2, true, 2);
        Consumer consumer3 = this.createConsumer(0, 2, true, 3);
        Consumer consumer4 = this.createConsumer(1, 2, false, 4);
        Consumer consumer5 = this.createConsumer(1, 1, false, 5);
        Consumer consumer6 = this.createConsumer(2, 1, false, 6);
        Consumer consumer7 = this.createConsumer(2, 2, true, 7);
        dispatcher.addConsumer(consumer1);
        dispatcher.addConsumer(consumer2);
        dispatcher.addConsumer(consumer3);
        dispatcher.addConsumer(consumer4);
        dispatcher.addConsumer(consumer5);
        dispatcher.addConsumer(consumer6);
        dispatcher.addConsumer(consumer7);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer5);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer4);
        Assert.assertEquals((Object)this.getNextConsumer(dispatcher), (Object)consumer6);
        Assert.assertNull((Object)this.getNextConsumer(dispatcher));
    }

    private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
        Consumer consumer = dispatcher.getNextConsumer();
        if (consumer != null) {
            Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
            field.setAccessible(true);
            AtomicIntegerFieldUpdater messagePermits = (AtomicIntegerFieldUpdater)field.get(consumer);
            messagePermits.decrementAndGet(consumer);
            return consumer;
        }
        return null;
    }

    private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
        Consumer consumer = new Consumer(null, PulsarApi.CommandSubscribe.SubType.Shared, "test-topic", (long)id, priority, "" + id, 5000, this.serverCnx, "appId", Collections.emptyMap(), false, PulsarApi.CommandSubscribe.InitialPosition.Latest);
        try {
            consumer.flowPermits(permit);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Field blockField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
        blockField.setAccessible(true);
        blockField.set(consumer, blocked);
        return consumer;
    }
}

