/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCnx
extends PulsarHandler {
    private final BrokerService service;
    private final SchemaRegistryService schemaService;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private State state;
    private volatile boolean isActive = true;
    String authRole = null;
    AuthenticationDataSource authenticationData;
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    private static final int MaxPendingSendRequests = 1000;
    private static final int ResumeReadsThreshold = 500;
    private int pendingSendRequest = 0;
    private final String replicatorPrefix;
    private String clientVersion = null;
    private int nonPersistentPendingMessages = 0;
    private final int MaxNonPersistentPendingMessages;
    private String originalPrincipal = null;
    private Set<String> proxyRoles;
    private boolean authenticateOriginalAuthData;
    private final boolean schemaValidationEnforced;
    private String authMethod = "none";
    private final int maxMessageSize;
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);

    public ServerCnx(PulsarService pulsar) {
        super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        this.service = pulsar.getBrokerService();
        this.schemaService = pulsar.getSchemaRegistryService();
        this.state = State.Start;
        this.producers = new ConcurrentLongHashMap(8, 1);
        this.consumers = new ConcurrentLongHashMap(8, 1);
        this.replicatorPrefix = this.service.pulsar().getConfiguration().getReplicatorPrefix();
        this.MaxNonPersistentPendingMessages = this.service.pulsar().getConfiguration().getMaxConcurrentNonPersistentMessagePerConnection();
        this.proxyRoles = this.service.pulsar().getConfiguration().getProxyRoles();
        this.authenticateOriginalAuthData = this.service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
        this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
        this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("New connection from {}", (Object)this.remoteAddress);
        this.ctx = ctx;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.isActive = false;
        log.info("Closed connection from {}", (Object)this.remoteAddress);
        this.producers.values().forEach(producerFuture -> {
            if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                Producer producer = producerFuture.getNow(null);
                producer.closeNow();
            }
        });
        this.consumers.values().forEach(consumerFuture -> {
            if (!consumerFuture.isDone() || consumerFuture.isCompletedExceptionally()) {
                return;
            }
            Consumer consumer = consumerFuture.getNow(null);
            try {
                consumer.close();
            }
            catch (BrokerServiceException e) {
                log.warn("Consumer {} was already closed: {}", new Object[]{consumer, e.getMessage(), e});
            }
        });
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {} : {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), ClientCnx.isKnownException((Throwable)cause) ? null : cause});
            this.state = State.Failed;
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", new Object[]{this.remoteAddress, cause.getMessage(), cause});
        }
        ctx.close();
    }

    private boolean invalidOriginalPrincipal(String originalPrincipal) {
        return this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled() && this.proxyRoles.contains(this.authRole) && (StringUtils.isBlank((CharSequence)originalPrincipal) || this.proxyRoles.contains(originalPrincipal));
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        TopicName topicName;
        long requestId = lookup.getRequestId();
        boolean authoritative = lookup.getAuthoritative();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {}", new Object[]{lookup.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(lookup.getTopic(), requestId, (GeneratedMessageLite)lookup)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for lookup ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for lookup ", this.authRole, this.originalPrincipal, topicName});
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided for lookup ", (long)requestId));
                lookupSemaphore.release();
                return;
            }
            CompletableFuture isProxyAuthorizedFuture = this.service.isAuthorizationEnabled() && this.originalPrincipal != null ? this.service.getAuthorizationService().canLookupAsync(topicName, this.authRole, this.authenticationData) : CompletableFuture.completedFuture(true);
            String finalOriginalPrincipal = this.originalPrincipal;
            ((CompletableFuture)isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                if (isProxyAuthorized.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(this.getBrokerService().pulsar(), topicName, authoritative, finalOriginalPrincipal != null ? finalOriginalPrincipal : this.authRole, this.authenticationData, requestId).handle((lookupResponse, ex) -> {
                        if (ex == null) {
                            this.ctx.writeAndFlush(lookupResponse);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Lookup";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Lookup", this.authRole, topicName});
                    this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Lookup", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                String msg = "Exception occured while trying to authorize lookup";
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occured while trying to authorize lookup", this.authRole, topicName, ex});
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Exception occured while trying to authorize lookup", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        TopicName topicName;
        long requestId = partitionMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{partitionMetadata.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(partitionMetadata.getTopic(), requestId, (GeneratedMessageLite)partitionMetadata)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", this.authRole, this.originalPrincipal, topicName});
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided for getPartitionMetadataRequest ", (long)requestId));
                lookupSemaphore.release();
                return;
            }
            CompletableFuture isProxyAuthorizedFuture = this.service.isAuthorizationEnabled() && this.originalPrincipal != null ? this.service.getAuthorizationService().canLookupAsync(topicName, this.authRole, this.authenticationData) : CompletableFuture.completedFuture(true);
            String finalOriginalPrincipal = this.originalPrincipal;
            ((CompletableFuture)isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                if (isProxyAuthorized.booleanValue()) {
                    PersistentTopicsBase.getPartitionedTopicMetadata(this.getBrokerService().pulsar(), this.authRole, finalOriginalPrincipal, this.authenticationData, topicName).handle((metadata, ex) -> {
                        if (ex == null) {
                            int partitions = metadata.partitions;
                            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)partitions, (long)requestId));
                        } else if (ex instanceof PulsarClientException) {
                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{this.getRole(), this.remoteAddress, topicName, ex.getMessage()});
                            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)ex.getMessage(), (long)requestId));
                        } else {
                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            PulsarApi.ServerError error = ex instanceof RestException && ((RestException)((Object)((Object)((Object)ex)))).getResponse().getStatus() < 500 ? PulsarApi.ServerError.MetadataError : PulsarApi.ServerError.ServiceNotReady;
                            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)error, (String)ex.getMessage(), (long)requestId));
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Get Partition Metadata";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Get Partition Metadata", this.authRole, topicName});
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Get Partition Metadata", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                String msg = "Exception occured while trying to authorize get Partition Metadata";
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occured while trying to authorize get Partition Metadata", this.authRole, topicName});
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Exception occured while trying to authorize get Partition Metadata", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handleConsumerStats(PulsarApi.CommandConsumerStats commandConsumerStats) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        Consumer consumer = consumerFuture.getNow(null);
        ByteBuf msg = null;
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, requestId, consumerId});
            msg = Commands.newConsumerStatsResponse((PulsarApi.ServerError)PulsarApi.ServerError.ConsumerNotFound, (String)("Consumer " + consumerId + " not found"), (long)requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", (Object)requestId, (Object)consumer);
            }
            msg = Commands.newConsumerStatsResponse((PulsarApi.CommandConsumerStatsResponse.Builder)this.createConsumerStatsResponse(consumer, requestId));
        }
        this.ctx.writeAndFlush((Object)msg);
    }

    PulsarApi.CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, long requestId) {
        PulsarApi.CommandConsumerStatsResponse.Builder commandConsumerStatsResponseBuilder = PulsarApi.CommandConsumerStatsResponse.newBuilder();
        ConsumerStats consumerStats = consumer.getStats();
        commandConsumerStatsResponseBuilder.setRequestId(requestId);
        commandConsumerStatsResponseBuilder.setMsgRateOut(consumerStats.msgRateOut);
        commandConsumerStatsResponseBuilder.setMsgThroughputOut(consumerStats.msgThroughputOut);
        commandConsumerStatsResponseBuilder.setMsgRateRedeliver(consumerStats.msgRateRedeliver);
        commandConsumerStatsResponseBuilder.setConsumerName(consumerStats.consumerName);
        commandConsumerStatsResponseBuilder.setAvailablePermits((long)consumerStats.availablePermits);
        commandConsumerStatsResponseBuilder.setUnackedMessages((long)consumerStats.unackedMessages);
        commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs);
        commandConsumerStatsResponseBuilder.setAddress(consumerStats.getAddress());
        commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.getConnectedSince());
        Subscription subscription = consumer.getSubscription();
        commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
        commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
        commandConsumerStatsResponseBuilder.setType(subscription.getTypeString());
        return commandConsumerStatsResponseBuilder;
    }

    private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, SSLSession sslSession) throws AuthenticationException {
        if (this.authenticateOriginalAuthData) {
            originalPrincipal = originalAuthData != null ? this.getBrokerService().getAuthenticationService().authenticate((AuthenticationDataSource)new AuthenticationDataCommand(originalAuthData, this.remoteAddress, sslSession), originalAuthMethod) : null;
        }
        return originalPrincipal;
    }

    private void completeConnect(int clientProtoVersion, String clientVersion) {
        this.ctx.writeAndFlush((Object)Commands.newConnected((int)clientProtoVersion, (int)this.maxMessageSize));
        this.state = State.Connected;
        this.remoteEndpointProtocolVersion = clientProtoVersion;
        if (StringUtils.isNotBlank((CharSequence)clientVersion) && !clientVersion.contains(" ")) {
            this.clientVersion = clientVersion.intern();
        }
    }

    private void doAuthentication(AuthData clientData, int clientProtocolVersion, String clientVersion) throws Exception {
        AuthData brokerData = this.authState.authenticate(clientData);
        if (this.authState.isComplete()) {
            this.authRole = this.authState.getAuthRole();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", new Object[]{this.remoteAddress, this.authMethod, this.authRole, this.originalPrincipal});
            }
            this.completeConnect(clientProtocolVersion, clientVersion);
            return;
        }
        this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)clientProtocolVersion));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
        }
        this.state = State.Connecting;
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Start ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}, auth enabled: {}", (Object)this.remoteAddress, (Object)this.service.isAuthenticationEnabled());
        }
        String clientVersion = connect.getClientVersion();
        int clientProtocolVersion = connect.getProtocolVersion();
        if (!this.service.isAuthenticationEnabled()) {
            this.completeConnect(clientProtocolVersion, clientVersion);
            return;
        }
        try {
            AuthData clientData = AuthData.of((byte[])connect.getAuthData().toByteArray());
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.authRole = (String)this.getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect(clientProtocolVersion, clientVersion);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.originalPrincipal = this.getOriginalPrincipal(connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null, connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null, connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null, sslSession);
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.doAuthentication(clientData, clientProtocolVersion, clientVersion);
        }
        catch (Exception e) {
            String msg = "Unable to authenticate";
            log.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
            this.close();
        }
    }

    protected void handleAuthResponse(PulsarApi.CommandAuthResponse authResponse) {
        Preconditions.checkArgument((this.state == State.Connecting ? 1 : 0) != 0);
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData().toByteArray());
            this.doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
        }
        catch (Exception e) {
            String msg = "Unable to handleAuthResponse";
            log.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
            this.close();
        }
    }

    protected void handleSubscribe(PulsarApi.CommandSubscribe subscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = subscribe.getRequestId();
        long consumerId = subscribe.getConsumerId();
        TopicName topicName = this.validateTopicName(subscribe.getTopic(), requestId, (GeneratedMessageLite)subscribe);
        if (topicName == null) {
            return;
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while subscribing ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while subscribing ", this.authRole, this.originalPrincipal, topicName});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided while subscribing "));
            return;
        }
        String subscriptionName = subscribe.getSubscription();
        PulsarApi.CommandSubscribe.SubType subType = subscribe.getSubType();
        String consumerName = subscribe.getConsumerName();
        boolean isDurable = subscribe.getDurable();
        BatchMessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null;
        String subscription = subscribe.getSubscription();
        int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        boolean readCompacted = subscribe.getReadCompacted();
        Map metadata = CommandUtils.metadataFromCommand((PulsarApi.CommandSubscribe)subscribe);
        PulsarApi.CommandSubscribe.InitialPosition initialPosition = subscribe.getInitialPosition();
        SchemaData schema = subscribe.hasSchema() ? this.getSchema(subscribe.getSchema()) : null;
        boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
        CompletableFuture isProxyAuthorizedFuture = this.service.isAuthorizationEnabled() && this.originalPrincipal != null ? this.service.getAuthorizationService().canConsumeAsync(topicName, this.authRole, this.authenticationData, subscribe.getSubscription()) : CompletableFuture.completedFuture(true);
        ((CompletableFuture)isProxyAuthorizedFuture.thenApply(arg_0 -> this.lambda$handleSubscribe$15(topicName, subscription, subscriptionName, metadata, requestId, consumerId, schema, subType, priorityLevel, consumerName, isDurable, (MessageIdImpl)startMessageId, readCompacted, initialPosition, isReplicated, arg_0))).exceptionally(ex -> {
            String msg = String.format("[%s] %s with role %s", this.remoteAddress, ex.getMessage(), this.authRole);
            if (ex.getCause() instanceof PulsarServerException) {
                log.info(msg);
            } else {
                log.warn(msg);
            }
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)ex.getMessage()));
            return null;
        });
    }

    private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
        return SchemaData.builder().data(protocolSchema.getSchemaData().toByteArray()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty((String)this.originalPrincipal)).type(Commands.getSchemaType((PulsarApi.Schema.Type)protocolSchema.getType())).props(protocolSchema.getPropertiesList().stream().collect(Collectors.toMap(PulsarApi.KeyValue::getKey, PulsarApi.KeyValue::getValue))).build();
    }

    protected void handleProducer(PulsarApi.CommandProducer cmdProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = cmdProducer.getProducerId();
        long requestId = cmdProducer.getRequestId();
        String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() : this.service.generateUniqueProducerName();
        boolean isEncrypted = cmdProducer.getEncrypted();
        Map metadata = CommandUtils.metadataFromCommand((PulsarApi.CommandProducer)cmdProducer);
        SchemaData schema = cmdProducer.hasSchema() ? this.getSchema(cmdProducer.getSchema()) : null;
        TopicName topicName = this.validateTopicName(cmdProducer.getTopic(), requestId, (GeneratedMessageLite)cmdProducer);
        if (topicName == null) {
            return;
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while creating producer ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while creating producer ", this.authRole, this.originalPrincipal, topicName});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided while creating producer "));
            return;
        }
        CompletableFuture isProxyAuthorizedFuture = this.service.isAuthorizationEnabled() && this.originalPrincipal != null ? this.service.getAuthorizationService().canProduceAsync(topicName, this.authRole, this.authenticationData) : CompletableFuture.completedFuture(true);
        ((CompletableFuture)isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
            if (isProxyAuthorized.booleanValue()) {
                CompletableFuture authorizationFuture = this.service.isAuthorizationEnabled() ? this.service.getAuthorizationService().canProduceAsync(topicName, this.originalPrincipal != null ? this.originalPrincipal : this.authRole, this.authenticationData) : CompletableFuture.completedFuture(true);
                ((CompletableFuture)authorizationFuture.thenApply(isAuthorized -> {
                    if (isAuthorized.booleanValue()) {
                        CompletableFuture producerFuture;
                        CompletableFuture existingProducerFuture;
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Client is authorized to Produce with role {}", (Object)this.remoteAddress, (Object)this.authRole);
                        }
                        if ((existingProducerFuture = (CompletableFuture)this.producers.putIfAbsent(producerId, producerFuture = new CompletableFuture())) != null) {
                            if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                                Producer producer = existingProducerFuture.getNow(null);
                                log.info("[{}] Producer with the same id {} is already created: {}", new Object[]{this.remoteAddress, producerId, producer});
                                this.ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)requestId, (String)producer.getProducerName(), (SchemaVersion)producer.getSchemaVersion()));
                                return null;
                            }
                            PulsarApi.ServerError error = null;
                            if (!existingProducerFuture.isDone()) {
                                error = PulsarApi.ServerError.ServiceNotReady;
                            } else {
                                error = this.getErrorCode(existingProducerFuture);
                                this.producers.remove(producerId, producerFuture);
                            }
                            log.warn("[{}][{}] Producer with id {} is already present on the connection", new Object[]{this.remoteAddress, producerId, topicName});
                            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)error, (String)"Producer is already present on the connection"));
                            return null;
                        }
                        log.info("[{}][{}] Creating producer. producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                        ((CompletableFuture)this.service.getOrCreateTopic(topicName.toString()).thenAccept(topic -> {
                            if (topic.isBacklogQuotaExceeded(producerName)) {
                                IllegalStateException illegalStateException = new IllegalStateException("Cannot create producer on topic with backlog quota exceeded");
                                BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
                                if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.ProducerBlockedQuotaExceededError, (String)illegalStateException.getMessage()));
                                } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.ProducerBlockedQuotaExceededException, (String)illegalStateException.getMessage()));
                                }
                                producerFuture.completeExceptionally(illegalStateException);
                                this.producers.remove(producerId, (Object)producerFuture);
                                return;
                            }
                            if (topic.isEncryptionRequired() && !isEncrypted) {
                                String msg = String.format("Encryption is required in %s", topicName);
                                log.warn("[{}] {}", (Object)this.remoteAddress, (Object)msg);
                                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)msg));
                                this.producers.remove(producerId, (Object)producerFuture);
                                return;
                            }
                            this.disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
                            CompletionStage<Object> schemaVersionFuture = schema != null ? topic.addSchema(schema) : topic.hasSchema().thenCompose(hasSchema -> {
                                log.info("[{}]-{} {} configured with schema {}", new Object[]{this.remoteAddress, producerId, topicName, hasSchema});
                                CompletableFuture<SchemaVersion> result = new CompletableFuture<SchemaVersion>();
                                if (hasSchema.booleanValue() && (this.schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                                    result.completeExceptionally(new IncompatibleSchemaException("Producers cannot connect without a schema to topics with a schema"));
                                } else {
                                    result.complete(SchemaVersion.Empty);
                                }
                                return result;
                            });
                            ((CompletableFuture)schemaVersionFuture).exceptionally(exception -> {
                                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(exception), (String)exception.getMessage()));
                                this.producers.remove(producerId, (Object)producerFuture);
                                return null;
                            });
                            ((CompletableFuture)schemaVersionFuture).thenAccept(schemaVersion -> {
                                Producer producer = new Producer((Topic)topic, this, producerId, producerName, this.authRole, isEncrypted, metadata, (SchemaVersion)schemaVersion);
                                try {
                                    topic.addProducer(producer);
                                    if (this.isActive()) {
                                        if (producerFuture.complete(producer)) {
                                            log.info("[{}] Created new producer: {}", (Object)this.remoteAddress, (Object)producer);
                                            this.ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)requestId, (String)producerName, (long)producer.getLastSequenceId(), (SchemaVersion)producer.getSchemaVersion()));
                                            return;
                                        }
                                        producer.closeNow();
                                        log.info("[{}] Cleared producer created after timeout on client side {}", (Object)this.remoteAddress, (Object)producer);
                                    } else {
                                        producer.closeNow();
                                        log.info("[{}] Cleared producer created after connection was closed: {}", (Object)this.remoteAddress, (Object)producer);
                                        producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
                                    }
                                }
                                catch (BrokerServiceException ise) {
                                    log.error("[{}] Failed to add producer to topic {}: {}", new Object[]{this.remoteAddress, topicName, ise.getMessage()});
                                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(ise), (String)ise.getMessage()));
                                    producerFuture.completeExceptionally(ise);
                                }
                                this.producers.remove(producerId, (Object)producerFuture);
                            });
                        })).exceptionally(exception -> {
                            Throwable cause = exception.getCause();
                            if (!(cause instanceof BrokerServiceException.ServiceUnitNotReadyException)) {
                                log.error("[{}] Failed to create topic {}", new Object[]{this.remoteAddress, topicName, exception});
                            }
                            if (producerFuture.completeExceptionally((Throwable)exception)) {
                                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(cause), (String)cause.getMessage()));
                            }
                            this.producers.remove(producerId, (Object)producerFuture);
                            return null;
                        });
                    } else {
                        String msg = "Client is not authorized to Produce";
                        log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.authRole});
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
                    }
                    return null;
                })).exceptionally(e -> {
                    String msg = String.format("[%s] %s with role %s", this.remoteAddress, e.getMessage(), this.authRole);
                    log.warn(msg);
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)e.getMessage()));
                    return null;
                });
            } else {
                String msg = "Proxy Client is not authorized to Produce";
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Produce", this.authRole, topicName});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Produce"));
            }
            return null;
        })).exceptionally(ex -> {
            String msg = String.format("[%s] %s with role %s", this.remoteAddress, ex.getMessage(), this.authRole);
            log.warn(msg);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)ex.getMessage()));
            return null;
        });
    }

    protected void handleSend(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(send.getProducerId());
        if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
            log.warn("[{}] Producer had already been closed: {}", (Object)this.remoteAddress, (Object)send.getProducerId());
            return;
        }
        Producer producer = producerFuture.getNow(null);
        if (log.isDebugEnabled()) {
            this.printSendCommandDebug(send, headersAndPayload);
        }
        if (producer.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.MaxNonPersistentPendingMessages) {
                long producerId = send.getProducerId();
                long sequenceId = send.getSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered((Object)producer.getTopic().getName(), (SafeRunnable)SafeRun.safeRun(() -> this.ctx.writeAndFlush((Object)Commands.newSendReceipt((long)producerId, (long)sequenceId, (long)-1L, (long)-1L), this.ctx.voidPromise())));
                producer.recordMessageDrop(send.getNumMessages());
                return;
            }
            ++this.nonPersistentPendingMessages;
        }
        this.startSendOperation();
        producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
    }

    private void printSendCommandDebug(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        headersAndPayload.markReaderIndex();
        PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        headersAndPayload.resetReaderIndex();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}", new Object[]{this.remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey()});
        }
        msgMetadata.recycle();
    }

    protected void handleAck(PulsarApi.CommandAck ack) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(ack.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).messageAcked(ack);
        }
    }

    protected void handleFlow(PulsarApi.CommandFlow flow) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, flow.getConsumerId(), flow.getMessagePermits()});
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(flow.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
                consumer.flowPermits(flow.getMessagePermits());
            } else {
                log.info("[{}] Couldn't find consumer {}", (Object)this.remoteAddress, (Object)flow.getConsumerId());
            }
        }
    }

    protected void handleRedeliverUnacknowledged(PulsarApi.CommandRedeliverUnacknowledgedMessages redeliver) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Resend Command from consumer {} ", (Object)this.remoteAddress, (Object)redeliver.getConsumerId());
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(redeliver.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
            } else {
                consumer.redeliverUnacknowledgedMessages();
            }
        }
    }

    protected void handleUnsubscribe(PulsarApi.CommandUnsubscribe unsubscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(unsubscribe.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).doUnsubscribe(unsubscribe.getRequestId());
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)unsubscribe.getRequestId(), (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    protected void handleSeek(PulsarApi.CommandSeek seek) {
        boolean consumerCreated;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = seek.getRequestId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(seek.getConsumerId());
        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Message id and message publish time were not present"));
            return;
        }
        boolean bl = consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
        if (consumerCreated && seek.hasMessageId()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            PulsarApi.MessageIdData msgIdData = seek.getMessageId();
            PositionImpl position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
            ((CompletableFuture)subscription.resetCursor((Position)position).thenRun(() -> this.lambda$handleSeek$27(subscription, (Position)position, requestId))).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)("Error when resetting subscription: " + ex.getCause().getMessage())));
                return null;
            });
        } else if (consumerCreated && seek.hasMessagePublishTime()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            long timestamp = seek.getMessagePublishTime();
            ((CompletableFuture)subscription.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), timestamp});
                this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            })).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)("Reset subscription to publish time error: " + ex.getCause().getMessage())));
                return null;
            });
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = closeProducer.getProducerId();
        long requestId = closeProducer.getRequestId();
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(producerId);
        if (producerFuture == null) {
            log.warn("[{}] Producer {} was not registered on the connection", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)"Producer was not registered on the connection"));
            return;
        }
        if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer {} before its creation was completed", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (producerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer {} that already failed to be created", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        Producer producer = producerFuture.getNow(null);
        log.info("[{}][{}] Closing producer on cnx {}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress});
        producer.close().thenAccept(v -> {
            log.info("[{}][{}] Closed producer on cnx {}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress});
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            this.producers.remove(producerId, (Object)producerFuture);
        });
    }

    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        log.info("[{}] Closing consumer: {}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long requestId = closeConsumer.getRequestId();
        long consumerId = closeConsumer.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        if (consumerFuture == null) {
            log.warn("[{}] Consumer was not registered on the connection: {}", (Object)consumerId, (Object)this.remoteAddress);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
            return;
        }
        if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer {} before its creation was completed", (Object)this.remoteAddress, (Object)consumerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (consumerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer {} that already failed to be created", (Object)this.remoteAddress, (Object)consumerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        Consumer consumer = consumerFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            log.info("[{}] Closed consumer {}", (Object)this.remoteAddress, (Object)consumer);
        }
        catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer: ", new Object[]{this.remoteAddress, consumer, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(e), (String)e.getMessage()));
        }
    }

    protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId getLastMessageId) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(getLastMessageId.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            long requestId = getLastMessageId.getRequestId();
            Topic topic = consumer.getSubscription().getTopic();
            Position position = topic.getLastMessageId();
            int partitionIndex = TopicName.getPartitionIndex((String)topic.getName());
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{this.remoteAddress, topic.getName(), consumer.getSubscription().getName(), position, partitionIndex});
            }
            PulsarApi.MessageIdData messageId = PulsarApi.MessageIdData.newBuilder().setLedgerId(((PositionImpl)position).getLedgerId()).setEntryId(((PositionImpl)position).getEntryId()).setPartition(partitionIndex).build();
            this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (PulsarApi.MessageIdData)messageId));
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)getLastMessageId.getRequestId(), (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    protected void handleGetTopicsOfNamespace(PulsarApi.CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        PulsarApi.CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
        try {
            NamespaceName namespaceName = NamespaceName.get((String)namespace);
            List<String> topics = this.getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, requestId, topics.size()});
            }
            this.ctx.writeAndFlush((Object)Commands.newGetTopicsOfNamespaceResponse(topics, (long)requestId));
        }
        catch (Exception e) {
            log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, requestId});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException(e)), (String)e.getMessage()));
        }
    }

    protected void handleGetSchema(PulsarApi.CommandGetSchema commandGetSchema) {
        String schemaName;
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetSchema call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandGetSchema.getRequestId();
        SchemaVersion schemaVersion = SchemaVersion.Latest;
        if (commandGetSchema.hasSchemaVersion()) {
            schemaVersion = this.schemaService.versionFromBytes(commandGetSchema.getSchemaVersion().toByteArray());
        }
        try {
            schemaName = TopicName.get((String)commandGetSchema.getTopic()).getSchemaName();
        }
        catch (Throwable t) {
            this.ctx.writeAndFlush((Object)Commands.newGetSchemaResponseError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)t.getMessage()));
            return;
        }
        ((CompletableFuture)this.schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
            if (schemaAndMetadata == null) {
                this.ctx.writeAndFlush((Object)Commands.newGetSchemaResponseError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.TopicNotFound, (String)"Topic not found or no-schema"));
            } else {
                this.ctx.writeAndFlush((Object)Commands.newGetSchemaResponse((long)requestId, (SchemaInfo)SchemaInfoUtil.newSchemaInfo((String)schemaName, (SchemaData)schemaAndMetadata.schema), (SchemaVersion)schemaAndMetadata.version));
            }
        })).exceptionally(ex -> {
            this.ctx.writeAndFlush((Object)Commands.newGetSchemaResponseError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)ex.getMessage()));
            return null;
        });
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public void closeProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", (Object)this.remoteAddress, (Object)producer);
        }
        long producerId = producer.getProducerId();
        this.producers.remove(producerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseProducer((long)producerId, (long)-1L));
        } else {
            this.close();
        }
    }

    public void closeConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", (Object)this.remoteAddress, (Object)consumer);
        }
        long consumerId = consumer.consumerId();
        this.consumers.remove(consumerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseConsumer((long)consumerId, (long)-1L));
        } else {
            this.close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    public void removedConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", (Object)this.remoteAddress, (Object)consumer);
        }
        this.consumers.remove(consumer.consumerId());
    }

    public void removedProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", (Object)this.remoteAddress, (Object)producer);
        }
        this.producers.remove(producer.getProducerId());
    }

    public boolean isActive() {
        return this.isActive;
    }

    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation() {
        if (++this.pendingSendRequest == 1000) {
            this.ctx.channel().config().setAutoRead(false);
        }
    }

    public void completedSendOperation(boolean isNonPersistentTopic) {
        if (--this.pendingSendRequest == 500) {
            this.ctx.channel().config().setAutoRead(true);
        }
        if (isNonPersistentTopic) {
            --this.nonPersistentPendingMessages;
        }
    }

    private <T> PulsarApi.ServerError getErrorCode(CompletableFuture<T> future) {
        PulsarApi.ServerError error;
        block2: {
            error = PulsarApi.ServerError.UnknownError;
            try {
                future.getNow(null);
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException)) break block2;
                error = BrokerServiceException.getClientErrorCode(e.getCause());
            }
        }
        return error;
    }

    private final void disableTcpNoDelayIfNeeded(String topic, String producerName) {
        if (producerName != null && producerName.startsWith(this.replicatorPrefix)) {
            try {
                if (((Boolean)this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                    this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, (Object)false);
                }
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{topic, producerName, this.ctx.channel()});
            }
        }
    }

    private TopicName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) {
        try {
            return TopicName.get((String)topic);
        }
        catch (Throwable t) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, topic, t});
            }
            if (requestCommand instanceof PulsarApi.CommandLookupTopic) {
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else if (requestCommand instanceof PulsarApi.CommandPartitionedTopicMetadata) {
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else {
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage())));
            }
            return null;
        }
    }

    public State getState() {
        return this.state;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    boolean hasConsumer(long consumerId) {
        return this.consumers.containsKey(consumerId);
    }

    public boolean isBatchMessageCompatibleVersion() {
        return this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v4.getNumber();
    }

    public String getClientVersion() {
        return this.clientVersion;
    }

    private /* synthetic */ void lambda$handleSeek$27(Subscription subscription, Position position, long requestId) {
        log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), position});
        this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
    }

    private /* synthetic */ Object lambda$handleSubscribe$15(TopicName topicName, String subscription, String subscriptionName, Map metadata, long requestId, long consumerId, SchemaData schema, PulsarApi.CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdImpl startMessageId, boolean readCompacted, PulsarApi.CommandSubscribe.InitialPosition initialPosition, boolean isReplicated, Boolean isProxyAuthorized) {
        if (isProxyAuthorized.booleanValue()) {
            CompletableFuture authorizationFuture = this.service.isAuthorizationEnabled() ? this.service.getAuthorizationService().canConsumeAsync(topicName, this.originalPrincipal != null ? this.originalPrincipal : this.authRole, this.authenticationData, subscription) : CompletableFuture.completedFuture(true);
            ((CompletableFuture)authorizationFuture.thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Client is authorized to subscribe with role {}", (Object)this.remoteAddress, (Object)this.authRole);
                    }
                    log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                    try {
                        Metadata.validateMetadata((Map)metadata);
                    }
                    catch (IllegalArgumentException iae) {
                        String msg = iae.getMessage();
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)msg));
                        return null;
                    }
                    CompletableFuture consumerFuture = new CompletableFuture();
                    CompletableFuture existingConsumerFuture = (CompletableFuture)this.consumers.putIfAbsent(consumerId, consumerFuture);
                    if (existingConsumerFuture != null) {
                        if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                            Consumer consumer2 = existingConsumerFuture.getNow(null);
                            log.info("[{}] Consumer with the same id {} is already created: {}", new Object[]{this.remoteAddress, consumerId, consumer2});
                            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
                            return null;
                        }
                        log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId});
                        PulsarApi.ServerError error = null;
                        if (!existingConsumerFuture.isDone()) {
                            error = PulsarApi.ServerError.ServiceNotReady;
                        } else {
                            error = this.getErrorCode(existingConsumerFuture);
                            this.consumers.remove(consumerId, consumerFuture);
                        }
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)error, (String)"Consumer is already present on the connection"));
                        return null;
                    }
                    ((CompletableFuture)((CompletableFuture)this.service.getOrCreateTopic(topicName.toString()).thenCompose(topic -> {
                        if (schema != null) {
                            return topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(isCompatible -> {
                                if (isCompatible.booleanValue()) {
                                    return topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, (MessageId)startMessageId, metadata, readCompacted, initialPosition, isReplicated);
                                }
                                return FutureUtil.failedFuture((Throwable)new IncompatibleSchemaException("Trying to subscribe with incompatible schema"));
                            });
                        }
                        return topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, (MessageId)startMessageId, metadata, readCompacted, initialPosition, isReplicated);
                    })).thenAccept(consumer -> {
                        if (consumerFuture.complete(consumer)) {
                            log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId), this.ctx.voidPromise());
                        } else {
                            try {
                                consumer.close();
                                log.info("[{}] Cleared consumer created after timeout on client side {}", (Object)this.remoteAddress, consumer);
                            }
                            catch (BrokerServiceException e) {
                                log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                            }
                            this.consumers.remove(consumerId, (Object)consumerFuture);
                        }
                    })).exceptionally(exception -> {
                        if (exception.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage()});
                            }
                        } else {
                            log.warn("[{}][{}][{}] Failed to create consumer: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage(), exception});
                        }
                        if (consumerFuture.completeExceptionally((Throwable)exception)) {
                            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(exception), (String)exception.getCause().getMessage()));
                        }
                        this.consumers.remove(consumerId, (Object)consumerFuture);
                        return null;
                    });
                } else {
                    String msg = "Client is not authorized to subscribe";
                    log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.authRole});
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
                }
                return null;
            })).exceptionally(e -> {
                String msg = String.format("[%s] %s with role %s", this.remoteAddress, e.getMessage(), this.authRole);
                log.warn(msg);
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)e.getMessage()));
                return null;
            });
        } else {
            String msg = "Proxy Client is not authorized to subscribe";
            log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to subscribe", this.authRole, topicName});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to subscribe"));
        }
        return null;
    }

    static enum State {
        Start,
        Connected,
        Failed,
        Connecting;

    }
}

