/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
public class ServerCnxTest {
    protected EmbeddedChannel channel;
    private ServiceConfiguration svcConfig;
    private ServerCnx serverCnx;
    protected BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ClientChannelHelper clientChannelHelper;
    private PulsarService pulsar;
    private ConfigurationCacheService configCacheService;
    protected NamespaceService namespaceService;
    private final int currentProtocolVersion = PulsarApi.ProtocolVersion.values()[PulsarApi.ProtocolVersion.values().length - 1].getNumber();
    protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    private final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic";
    private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
    private final String successSubName = "successSub";
    private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
    private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic";
    private ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
    private ManagedCursor cursorMock = (ManagedCursor)Mockito.mock(ManagedCursor.class);

    @BeforeMethod
    public void setup() throws Exception {
        this.svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.pulsar = (PulsarService)Mockito.spy((Object)new PulsarService(this.svcConfig));
        ((PulsarService)Mockito.doReturn((Object)new DefaultSchemaRegistryService()).when((Object)this.pulsar)).getSchemaRegistryService();
        this.svcConfig.setKeepAliveIntervalSeconds(this.inSec(1, TimeUnit.SECONDS));
        this.svcConfig.setBacklogQuotaCheckEnabled(false);
        ((PulsarService)Mockito.doReturn((Object)this.svcConfig).when((Object)this.pulsar)).getConfiguration();
        ((ServiceConfiguration)Mockito.doReturn((Object)"use").when((Object)this.svcConfig)).getClusterName();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)this.pulsar)).getManagedLedgerFactory();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)this.pulsar)).getLocalZkCache();
        MockZooKeeper mockZk = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)mockZk).when((Object)this.pulsar)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper((ZooKeeper)mockZk, this.pulsar.getOrderedExecutor().chooseThread(0L)))).when((Object)this.pulsar)).getBookKeeperClient();
        this.configCacheService = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.empty()).when((Object)zkDataCache)).get((String)Mockito.any());
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        LocalZooKeeperCacheService zkCache = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkDataCache)).getAsync((String)Mockito.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)zkCache)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((PulsarService)Mockito.doReturn((Object)zkCache).when((Object)this.pulsar)).getLocalZkCacheService();
        this.brokerService = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsar));
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)this.pulsar)).getBrokerService();
        this.namespaceService = (NamespaceService)Mockito.mock(NamespaceService.class);
        ((PulsarService)Mockito.doReturn((Object)this.namespaceService).when((Object)this.pulsar)).getNamespaceService();
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)this.namespaceService)).isServiceUnitOwned((ServiceUnitId)Mockito.any());
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)this.namespaceService)).isServiceUnitActive((TopicName)Mockito.any());
        this.setupMLAsyncCallbackMocks();
        this.clientChannelHelper = new ClientChannelHelper();
    }

    private int inSec(int time, TimeUnit unit) {
        return (int)TimeUnit.SECONDS.convert(time, unit);
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.serverCnx.close();
        this.channel.close();
        this.pulsar.close();
        this.brokerService.close();
    }

    @Test(timeOut=30000L)
    public void testConnectCommand() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandConnected));
        this.channel.finish();
    }

    private static ByteBuf newConnect(PulsarApi.AuthMethod authMethod, String authData, int protocolVersion) {
        PulsarApi.CommandConnect.Builder connectBuilder = PulsarApi.CommandConnect.newBuilder();
        connectBuilder.setClientVersion("Pulsar Client");
        connectBuilder.setAuthMethod(authMethod);
        connectBuilder.setAuthData(ByteString.copyFromUtf8((String)authData));
        connectBuilder.setProtocolVersion(protocolVersion);
        PulsarApi.CommandConnect connect = connectBuilder.build();
        ByteBuf res = Commands.serializeWithSize((PulsarApi.BaseCommand.Builder)PulsarApi.BaseCommand.newBuilder().setType(PulsarApi.BaseCommand.Type.CONNECT).setConnect(connect));
        connect.recycle();
        connectBuilder.recycle();
        return res;
    }

    @Test(timeOut=30000L)
    public void testConnectCommandWithEnum() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = ServerCnxTest.newConnect(PulsarApi.AuthMethod.AuthMethodNone, "", Commands.getCurrentProtocolVersion());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandConnected));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testConnectCommandWithProtocolVersion() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        PulsarApi.CommandConnected response = (PulsarApi.CommandConnected)this.getResponse();
        Assert.assertEquals((int)response.getProtocolVersion(), (int)this.currentProtocolVersion);
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testKeepAlive() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        PulsarApi.CommandConnected response = (PulsarApi.CommandConnected)this.getResponse();
        Assert.assertEquals((int)response.getProtocolVersion(), (int)this.currentProtocolVersion);
        for (int i = 0; i < 3; ++i) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertFalse((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testKeepAliveNotEnforcedWithOlderClients() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", (int)PulsarApi.ProtocolVersion.v0.getNumber(), null, null, null, null, null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        PulsarApi.CommandConnected response = (PulsarApi.CommandConnected)this.getResponse();
        Assert.assertEquals((int)response.getProtocolVersion(), (int)PulsarApi.ProtocolVersion.v0.getNumber());
        for (int i = 0; i < 3; ++i) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testKeepAliveBeforeHandshake() throws Exception {
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        for (int i = 0; i < 3; ++i) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertFalse((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testConnectCommandWithAuthenticationPositive() throws Exception {
        AuthenticationService authenticationService = (AuthenticationService)Mockito.mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = (AuthenticationProvider)Mockito.mock(AuthenticationProvider.class);
        AuthenticationState authenticationState = (AuthenticationState)Mockito.mock(AuthenticationState.class);
        AuthenticationDataSource authenticationDataSource = (AuthenticationDataSource)Mockito.mock(AuthenticationDataSource.class);
        AuthData authData = AuthData.of(null);
        ((BrokerService)Mockito.doReturn((Object)authenticationService).when((Object)this.brokerService)).getAuthenticationService();
        ((AuthenticationService)Mockito.doReturn((Object)authenticationProvider).when((Object)authenticationService)).getAuthenticationProvider(Mockito.anyString());
        ((AuthenticationProvider)Mockito.doReturn((Object)authenticationState).when((Object)authenticationProvider)).newAuthState((AuthData)Mockito.anyObject(), (SocketAddress)Mockito.anyObject(), (SSLSession)Mockito.anyObject());
        ((AuthenticationState)Mockito.doReturn((Object)authData).when((Object)authenticationState)).authenticate(authData);
        ((AuthenticationState)Mockito.doReturn((Object)true).when((Object)authenticationState)).isComplete();
        ((AuthenticationState)Mockito.doReturn((Object)"appid1").when((Object)authenticationState)).getAuthRole();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Connected);
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandConnected));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testConnectCommandWithAuthenticationNegative() throws Exception {
        AuthenticationService authenticationService = (AuthenticationService)Mockito.mock(AuthenticationService.class);
        ((BrokerService)Mockito.doReturn((Object)authenticationService).when((Object)this.brokerService)).getAuthenticationService();
        ((AuthenticationService)Mockito.doReturn(Optional.empty()).when((Object)authenticationService)).getAnonymousUserRole();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        this.resetChannel();
        Assert.assertTrue((boolean)this.channel.isActive());
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        ByteBuf clientCommand = Commands.newConnect((String)"none", (String)"", null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals((Object)this.serverCnx.getState(), (Object)ServerCnx.State.Start);
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testProducerCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/failTopic", (long)2L, (long)2L, (String)"prod-name-2", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/failTopic").isPresent());
        this.channel.finish();
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)0L);
    }

    @Test(timeOut=5000L)
    public void testDuplicateConcurrentProducerCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture delayFuture = new CompletableFuture();
        ((BrokerService)Mockito.doReturn(delayFuture).when((Object)this.brokerService)).getOrCreateTopic((String)Mockito.any(String.class));
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertTrue((boolean)(response instanceof PulsarApi.CommandError));
        PulsarApi.CommandError error = (PulsarApi.CommandError)response;
        Assert.assertEquals((Object)error.getError(), (Object)PulsarApi.ServerError.ServiceNotReady);
    }

    @Test(timeOut=30000L)
    public void testProducerOnNotOwnedTopic() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ((NamespaceService)Mockito.doReturn((Object)false).when((Object)this.namespaceService)).isServiceUnitActive((TopicName)Mockito.any(TopicName.class));
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/success-not-owned-topic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        PulsarApi.CommandError errorResponse = (PulsarApi.CommandError)response;
        Assert.assertEquals((Object)errorResponse.getError(), (Object)PulsarApi.ServerError.ServiceNotReady);
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/success-not-owned-topic").isPresent());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testProducerCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService)Mockito.mock(AuthorizationService.class);
        ((AuthorizationService)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)authorizationService)).canProduceAsync((TopicName)Mockito.any(), (String)Mockito.any(), (AuthenticationDataSource)Mockito.any());
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals(this.getResponse().getClass(), PulsarApi.CommandProducerSuccess.class);
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        this.channel.finish();
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)0L);
    }

    @Test(timeOut=30000L)
    public void testNonExistentTopic() throws Exception {
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ConfigurationCacheService configCacheService = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ((PulsarService)Mockito.doReturn((Object)configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)configCacheService)).policiesCache();
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkDataCache)).getAsync(Mockito.matches((String)".*nonexistent.*"));
        AuthorizationService authorizationService = (AuthorizationService)Mockito.spy((Object)new AuthorizationService(this.svcConfig, configCacheService));
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        this.svcConfig.setAuthorizationEnabled(true);
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = (PulsarAuthorizationProvider)Mockito.spy((Object)new PulsarAuthorizationProvider(this.svcConfig, configCacheService));
        providerField.set(authorizationService, authorizationProvider);
        ((PulsarAuthorizationProvider)Mockito.doReturn(CompletableFuture.completedFuture(false)).when((Object)authorizationProvider)).isSuperUser(Mockito.anyString(), (ServiceConfiguration)Mockito.any());
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf newProducerCmd = Commands.newProducer((String)"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{newProducerCmd});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        this.channel.finish();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf newSubscribeCmd = Commands.newSubscribe((String)"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{newSubscribeCmd});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
    }

    @Test(timeOut=30000L)
    public void testClusterAccess() throws Exception {
        this.svcConfig.setAuthorizationEnabled(true);
        AuthorizationService authorizationService = (AuthorizationService)Mockito.spy((Object)new AuthorizationService(this.svcConfig, this.configCacheService));
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = (PulsarAuthorizationProvider)Mockito.spy((Object)new PulsarAuthorizationProvider(this.svcConfig, this.configCacheService));
        providerField.set(authorizationService, authorizationProvider);
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        ((PulsarAuthorizationProvider)Mockito.doReturn(CompletableFuture.completedFuture(false)).when((Object)authorizationProvider)).isSuperUser(Mockito.anyString(), (ServiceConfiguration)Mockito.any());
        ((PulsarAuthorizationProvider)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)authorizationProvider)).checkPermission((TopicName)Mockito.any(TopicName.class), Mockito.anyString(), (AuthAction)Mockito.any(AuthAction.class));
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        this.resetChannel();
        this.setChannelConnected();
        clientCommand = Commands.newProducer((String)"persistent://prop/usw/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
    }

    @Test(timeOut=30000L)
    public void testNonExistentTopicSuperUserAccess() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService)Mockito.spy((Object)new AuthorizationService(this.svcConfig, this.configCacheService));
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = (PulsarAuthorizationProvider)Mockito.spy((Object)new PulsarAuthorizationProvider(this.svcConfig, this.configCacheService));
        providerField.set(authorizationService, authorizationProvider);
        ((PulsarAuthorizationProvider)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)authorizationProvider)).isSuperUser(Mockito.anyString(), (ServiceConfiguration)Mockito.any());
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf newProducerCmd = Commands.newProducer((String)"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{newProducerCmd});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        this.channel.finish();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf newSubscribeCmd = Commands.newSubscribe((String)"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{newSubscribeCmd});
        topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertTrue((boolean)topicRef.getSubscriptions().containsKey((Object)"successSub"));
        Assert.assertTrue((boolean)topicRef.getSubscription("successSub").getDispatcher().isConsumerConnected());
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
    }

    public void testProducerCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService)Mockito.mock(AuthorizationService.class);
        ((AuthorizationService)Mockito.doReturn(CompletableFuture.completedFuture(false)).when((Object)authorizationService)).canProduceAsync((TopicName)Mockito.any(), (String)Mockito.any(), (AuthenticationDataSource)Mockito.any());
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        ((BrokerService)Mockito.doReturn((Object)"prod1").when((Object)this.brokerService)).generateUniqueProducerName();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, null, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSendCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L).build();
        ByteBuf data = Unpooled.buffer((int)1024);
        clientCommand = ByteBufPair.coalesce((ByteBufPair)Commands.newSend((long)1L, (long)0L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.None, (PulsarApi.MessageMetadata)messageMetadata, (ByteBuf)data));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer((ByteBuf)clientCommand)});
        clientCommand.release();
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSendReceipt));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testUseSameProducerName() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        String producerName = "my-producer";
        ByteBuf clientCommand1 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand1});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        ByteBuf clientCommand2 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)2L, (long)2L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand2});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testRecreateSameProducer() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        String producerName = "my-producer";
        ByteBuf createProducer1 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer1});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandProducerSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandProducerSuccess)response).getRequestId(), (long)1L);
        ByteBuf createProducer2 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)2L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer2});
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandProducerSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandProducerSuccess)response).getRequestId(), (long)2L);
        Assert.assertTrue((boolean)this.channel.outboundMessages().isEmpty());
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSubscribeMultipleTimes() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf subscribe1 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe1});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)1L);
        ByteBuf subscribe2 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)2L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe2});
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)2L);
        this.channel.finish();
    }

    @Test(timeOut=5000L)
    public void testDuplicateConcurrentSubscribeCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture delayFuture = new CompletableFuture();
        ((BrokerService)Mockito.doReturn(delayFuture).when((Object)this.brokerService)).getOrCreateTopic((String)Mockito.any(String.class));
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertTrue((boolean)(response instanceof PulsarApi.CommandError));
        PulsarApi.CommandError error = (PulsarApi.CommandError)response;
        Assert.assertEquals((Object)error.getError(), (Object)PulsarApi.ServerError.ServiceNotReady);
    }

    @Test(timeOut=30000L)
    public void testCreateProducerTimeout() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture openTopicFuture = new CompletableFuture();
        ((ManagedLedgerFactory)Mockito.doAnswer(invocationOnMock -> {
            openTopicFuture.complete(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, null));
            return null;
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        String producerName = "my-producer";
        ByteBuf createProducer1 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer1});
        ByteBuf closeProducer = Commands.newCloseProducer((long)1L, (long)2L);
        this.channel.writeInbound(new Object[]{closeProducer});
        ByteBuf createProducer2 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)3L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer2});
        ((Runnable)openTopicFuture.get()).run();
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)2L);
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)3L);
        Assert.assertTrue((boolean)this.channel.outboundMessages().isEmpty());
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L, enabled=false)
    public void testCreateProducerMultipleTimeouts() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        final CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                topicCreationDelayLatch.await();
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(ServerCnxTest.this.ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        String producerName = "my-producer";
        ByteBuf createProducer1 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)1L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer1});
        ByteBuf closeProducer1 = Commands.newCloseProducer((long)1L, (long)2L);
        this.channel.writeInbound(new Object[]{closeProducer1});
        ByteBuf createProducer2 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)3L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer2});
        ByteBuf createProducer3 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)4L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer3});
        ByteBuf createProducer4 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)5L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer4});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)2L);
        topicCreationDelayLatch.countDown();
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)3L);
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)4L);
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)5L);
        Thread.sleep(100L);
        Assert.assertTrue((boolean)this.channel.outboundMessages().isEmpty());
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L, invocationCount=1, skipFailedInvocations=true)
    public void testCreateProducerBookieTimeout() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture openFailedTopic = new CompletableFuture();
        ((ManagedLedgerFactory)Mockito.doAnswer(invocationOnMock -> {
            openFailedTopic.complete(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, null));
            return null;
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        String producerName = "my-producer";
        ByteBuf createProducer1 = Commands.newProducer((String)"persistent://prop/use/ns-abc/failTopic", (long)1L, (long)1L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer1});
        ByteBuf closeProducer = Commands.newCloseProducer((long)1L, (long)2L);
        this.channel.writeInbound(new Object[]{closeProducer});
        ByteBuf createProducer2 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)3L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer2});
        ((Runnable)openFailedTopic.get()).run();
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)2L);
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)3L);
        Thread.sleep(500L);
        ByteBuf createProducer3 = Commands.newProducer((String)"persistent://prop/use/ns-abc/successTopic", (long)1L, (long)4L, (String)producerName, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{createProducer3});
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandProducerSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandProducerSuccess)response).getRequestId(), (long)4L);
        Thread.sleep(500L);
        Assert.assertTrue((boolean)this.channel.outboundMessages().isEmpty());
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testSubscribeTimeout() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture openTopicTask = new CompletableFuture();
        ((ManagedLedgerFactory)Mockito.doAnswer(invocationOnMock -> {
            openTopicTask.complete(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, null));
            return null;
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ByteBuf subscribe1 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe1});
        ByteBuf subscribe2 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)3L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe2});
        ByteBuf subscribe3 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)4L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe3});
        ByteBuf subscribe4 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)5L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe4});
        ((Runnable)openTopicTask.get()).run();
        ServerCnxTest serverCnxTest = this;
        synchronized (serverCnxTest) {
            Object response = this.getResponse();
            Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
            Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)3L);
            response = this.getResponse();
            Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
            Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)4L);
            response = this.getResponse();
            Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
            Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)5L);
            Assert.assertFalse((boolean)this.channel.outboundMessages().isEmpty());
            Assert.assertTrue((boolean)this.channel.isActive());
            response = this.getResponse();
            Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
            Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)1L);
        }
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSubscribeBookieTimeout() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        CompletableFuture openTopicSuccess = new CompletableFuture();
        ((ManagedLedgerFactory)Mockito.doAnswer(invocationOnMock -> {
            openTopicSuccess.complete(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, null));
            return null;
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        CompletableFuture openTopicFail = new CompletableFuture();
        ((ManagedLedgerFactory)Mockito.doAnswer(invocationOnMock -> {
            openTopicFail.complete(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null));
            return null;
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ByteBuf subscribe1 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/failTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe1});
        ByteBuf closeConsumer = Commands.newCloseConsumer((long)1L, (long)2L);
        this.channel.writeInbound(new Object[]{closeConsumer});
        ByteBuf subscribe2 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)3L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe2});
        ((Runnable)openTopicFail.get()).run();
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)2L);
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        Assert.assertEquals((long)((PulsarApi.CommandError)response).getRequestId(), (long)3L);
        while (this.serverCnx.hasConsumer(1L)) {
            Thread.sleep(10L);
        }
        ByteBuf subscribe3 = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)4L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{subscribe3});
        ((Runnable)openTopicSuccess.get()).run();
        response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandSuccess.class);
        Assert.assertEquals((long)((PulsarApi.CommandSuccess)response).getRequestId(), (long)4L);
        Thread.sleep(100L);
        Assert.assertTrue((boolean)this.channel.outboundMessages().isEmpty());
        Assert.assertTrue((boolean)this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSubscribeCommand() throws Exception {
        String failSubName = "failSub";
        this.resetChannel();
        this.setChannelConnected();
        ((BrokerService)Mockito.doReturn((Object)false).when((Object)this.brokerService)).isAuthenticationEnabled();
        ((BrokerService)Mockito.doReturn((Object)false).when((Object)this.brokerService)).isAuthorizationEnabled();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertTrue((boolean)topicRef.getSubscriptions().containsKey((Object)"successSub"));
        Assert.assertTrue((boolean)topicRef.getSubscription("successSub").getDispatcher().isConsumerConnected());
        clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"failSub", (long)2L, (long)2L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/failTopic", (String)"successSub", (long)3L, (long)3L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertEquals(this.getResponse().getClass(), PulsarApi.CommandError.class);
        Assert.assertTrue((boolean)this.channel.isOpen());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
        String failSubName = "failSub";
        this.resetChannel();
        this.setChannelConnected();
        this.setConnectionVersion(PulsarApi.ProtocolVersion.v3.getNumber());
        ((BrokerService)Mockito.doReturn((Object)false).when((Object)this.brokerService)).isAuthenticationEnabled();
        ((BrokerService)Mockito.doReturn((Object)false).when((Object)this.brokerService)).isAuthorizationEnabled();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        topicRef.markBatchMessagePublished();
        clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"failSub", (long)2L, (long)2L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertTrue((boolean)(response instanceof PulsarApi.CommandError));
        Assert.assertEquals((Object)PulsarApi.ServerError.UnsupportedVersionError, (Object)((PulsarApi.CommandError)response).getError());
        Assert.assertTrue((boolean)this.channel.isOpen());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService)Mockito.mock(AuthorizationService.class);
        ((AuthorizationService)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)authorizationService)).canConsumeAsync((TopicName)Mockito.any(), (String)Mockito.any(), (AuthenticationDataSource)Mockito.any(), (String)Mockito.any());
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService)Mockito.mock(AuthorizationService.class);
        ((AuthorizationService)Mockito.doReturn(CompletableFuture.completedFuture(false)).when((Object)authorizationService)).canConsumeAsync((TopicName)Mockito.any(), (String)Mockito.any(), (AuthenticationDataSource)Mockito.any(), (String)Mockito.any());
        ((BrokerService)Mockito.doReturn((Object)authorizationService).when((Object)this.brokerService)).getAuthorizationService();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthenticationEnabled();
        ((BrokerService)Mockito.doReturn((Object)true).when((Object)this.brokerService)).isAuthorizationEnabled();
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandError));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testAckCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
        PositionImpl pos = new PositionImpl(0L, 0L);
        clientCommand = Commands.newAck((long)1L, (long)pos.getLedgerId(), (long)pos.getEntryId(), (PulsarApi.CommandAck.AckType)PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertNull(this.channel.outboundMessages().peek());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testFlowCommand() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newSubscribe((String)"persistent://prop/use/ns-abc/successTopic", (String)"successSub", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"test");
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSuccess));
        clientCommand = Commands.newFlow((long)1L, (int)1);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertNull(this.channel.outboundMessages().peek());
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies)Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.of(policies)).when((Object)zkDataCache)).get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when((Object)zkDataCache)).getAsync(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic", (long)1L, (long)1L, (String)"encrypted-producer", (boolean)true, null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandProducerSuccess.class);
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies)Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.of(policies)).when((Object)zkDataCache)).get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when((Object)zkDataCache)).getAsync(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic", (long)2L, (long)2L, (String)"unencrypted-producer", (boolean)false, null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Object response = this.getResponse();
        Assert.assertEquals(response.getClass(), PulsarApi.CommandError.class);
        PulsarApi.CommandError errorResponse = (PulsarApi.CommandError)response;
        Assert.assertEquals((Object)errorResponse.getError(), (Object)PulsarApi.ServerError.MetadataError);
        PersistentTopic topicRef = (PersistentTopic)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)0L);
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies)Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.of(policies)).when((Object)zkDataCache)).get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when((Object)zkDataCache)).getAsync(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic", (long)1L, (long)1L, (String)"prod-name", (boolean)true, null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L).addEncryptionKeys(PulsarApi.EncryptionKeys.newBuilder().setKey("testKey").setValue(ByteString.copyFrom((byte[])"testVal".getBytes()))).build();
        ByteBuf data = Unpooled.buffer((int)1024);
        clientCommand = ByteBufPair.coalesce((ByteBufPair)Commands.newSend((long)1L, (long)0L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.None, (PulsarApi.MessageMetadata)messageMetadata, (ByteBuf)data));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer((ByteBuf)clientCommand)});
        clientCommand.release();
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSendReceipt));
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies)Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.of(policies)).when((Object)zkDataCache)).get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when((Object)zkDataCache)).getAsync(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ByteBuf clientCommand = Commands.newProducer((String)"persistent://prop/use/ns-abc/successEncryptionRequiredTopic", (long)1L, (long)1L, (String)"prod-name", (boolean)true, null);
        this.channel.writeInbound(new Object[]{clientCommand});
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandProducerSuccess));
        PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L).build();
        ByteBuf data = Unpooled.buffer((int)1024);
        clientCommand = ByteBufPair.coalesce((ByteBufPair)Commands.newSend((long)1L, (long)0L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.None, (PulsarApi.MessageMetadata)messageMetadata, (ByteBuf)data));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer((ByteBuf)clientCommand)});
        clientCommand.release();
        Assert.assertTrue((boolean)(this.getResponse() instanceof PulsarApi.CommandSendError));
        this.channel.finish();
    }

    protected void resetChannel() throws Exception {
        int MaxMessageSize = 0x500000;
        if (this.channel != null && this.channel.isActive()) {
            this.serverCnx.close();
            this.channel.close().get();
        }
        this.serverCnx = new ServerCnx(this.pulsar);
        this.serverCnx.authRole = "";
        this.channel = new EmbeddedChannel(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), this.serverCnx});
    }

    protected void setChannelConnected() throws Exception {
        Field channelState = ServerCnx.class.getDeclaredField("state");
        channelState.setAccessible(true);
        channelState.set(this.serverCnx, ServerCnx.State.Connected);
    }

    private void setConnectionVersion(int version) throws Exception {
        ServerCnx cnx = this.serverCnx;
        Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
        versionField.setAccessible(true);
        versionField.set(cnx, version);
    }

    protected Object getResponse() throws Exception {
        long sleepTimeMs = 10L;
        long iterations = TimeUnit.SECONDS.toMillis(10L) / 10L;
        int i = 0;
        while ((long)i < iterations) {
            if (!this.channel.outboundMessages().isEmpty()) {
                Object outObject = this.channel.outboundMessages().remove();
                return this.clientChannelHelper.getCommand(outObject);
            }
            Thread.sleep(10L);
            ++i;
        }
        throw new IOException("Failed to get response from socket within 10s");
    }

    private void setupMLAsyncCallbackMocks() {
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(ServerCnxTest.this.ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                new Thread(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null)).start();
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)Mockito.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)Mockito.any(AsyncCallbacks.OpenLedgerCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addComplete((Position)new PositionImpl(-1L, -1L), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncAddEntry((ByteBuf)Mockito.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)Mockito.any(AsyncCallbacks.AddEntryCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[2]).openCursorComplete(ServerCnxTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[3]).openCursorComplete(ServerCnxTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (Map)Mockito.any(Map.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[2]).openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*fail.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[3]).openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*fail.*"), (PulsarApi.CommandSubscribe.InitialPosition)Mockito.any(PulsarApi.CommandSubscribe.InitialPosition.class), (Map)Mockito.any(Map.class), (AsyncCallbacks.OpenCursorCallback)Mockito.any(AsyncCallbacks.OpenCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(AsyncCallbacks.DeleteCursorCallback.class), Mockito.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*fail.*"), (AsyncCallbacks.DeleteCursorCallback)Mockito.any(AsyncCallbacks.DeleteCursorCallback.class), Mockito.any());
        ((ManagedCursor)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.CloseCallback)invocationOnMock.getArguments()[0]).closeComplete(null);
                return null;
            }
        }).when((Object)this.cursorMock)).asyncClose((AsyncCallbacks.CloseCallback)Mockito.any(AsyncCallbacks.CloseCallback.class), Mockito.any());
        ((ManagedCursor)Mockito.doReturn((Object)"successSub").when((Object)this.cursorMock)).getName();
    }

    @Test(timeOut=30000L)
    public void testInvalidTopicOnLookup() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        String invalidTopicName = "xx/ass/aa/aaa";
        this.resetChannel();
        this.setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newLookup((String)invalidTopicName, (boolean)true, (long)1L)});
        Object obj = this.getResponse();
        Assert.assertEquals(obj.getClass(), PulsarApi.CommandLookupTopicResponse.class);
        PulsarApi.CommandLookupTopicResponse res = (PulsarApi.CommandLookupTopicResponse)obj;
        Assert.assertEquals((Object)res.getError(), (Object)PulsarApi.ServerError.InvalidTopicName);
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testInvalidTopicOnProducer() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        String invalidTopicName = "xx/ass/aa/aaa";
        this.resetChannel();
        this.setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer((String)invalidTopicName, (long)1L, (long)1L, (String)"prod-name", Collections.emptyMap());
        this.channel.writeInbound(new Object[]{clientCommand});
        Object obj = this.getResponse();
        Assert.assertEquals(obj.getClass(), PulsarApi.CommandError.class);
        PulsarApi.CommandError res = (PulsarApi.CommandError)obj;
        Assert.assertEquals((Object)res.getError(), (Object)PulsarApi.ServerError.InvalidTopicName);
        this.channel.finish();
    }

    @Test(timeOut=30000L)
    public void testInvalidTopicOnSubscribe() throws Exception {
        this.resetChannel();
        this.setChannelConnected();
        String invalidTopicName = "xx/ass/aa/aaa";
        this.resetChannel();
        this.setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe((String)invalidTopicName, (String)"test-subscription", (long)1L, (long)1L, (PulsarApi.CommandSubscribe.SubType)PulsarApi.CommandSubscribe.SubType.Exclusive, (int)0, (String)"consumerName")});
        Object obj = this.getResponse();
        Assert.assertEquals(obj.getClass(), PulsarApi.CommandError.class);
        PulsarApi.CommandError res = (PulsarApi.CommandError)obj;
        Assert.assertEquals((Object)res.getError(), (Object)PulsarApi.ServerError.InvalidTopicName);
        this.channel.finish();
    }
}

