/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PulsarCommandSender;
import org.apache.pulsar.broker.service.PulsarCommandSenderImpl;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCnx
extends PulsarHandler
implements TransportCnx {
    private final BrokerService service;
    private final SchemaRegistryService schemaService;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private State state;
    private volatile boolean isActive = true;
    String authRole = null;
    AuthenticationDataSource authenticationData;
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    AuthenticationState originalAuthState;
    AuthenticationDataSource originalAuthData;
    private boolean pendingAuthChallengeResponse = false;
    private final int maxPendingSendRequests;
    private final int resumeReadsThreshold;
    private int pendingSendRequest = 0;
    private final String replicatorPrefix;
    private String clientVersion = null;
    private int nonPersistentPendingMessages = 0;
    private final int MaxNonPersistentPendingMessages;
    private String originalPrincipal = null;
    private Set<String> proxyRoles;
    private boolean authenticateOriginalAuthData;
    private final boolean schemaValidationEnforced;
    private String authMethod = "none";
    private final int maxMessageSize;
    private boolean preciseDispatcherFlowControl;
    private boolean preciseTopicPublishRateLimitingEnable;
    private boolean encryptionRequireOnProducer;
    private volatile boolean autoReadDisabledRateLimiting = false;
    private PulsarApi.FeatureFlags features;
    private volatile boolean autoReadDisabledPublishBufferLimiting = false;
    private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
    private volatile long messagePublishBufferSize = 0L;
    private PulsarCommandSender commandSender;
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);

    public ServerCnx(PulsarService pulsar) {
        super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        this.service = pulsar.getBrokerService();
        this.schemaService = pulsar.getSchemaRegistryService();
        this.state = State.Start;
        this.producers = new ConcurrentLongHashMap(8, 1);
        this.consumers = new ConcurrentLongHashMap(8, 1);
        this.replicatorPrefix = this.service.pulsar().getConfiguration().getReplicatorPrefix();
        this.MaxNonPersistentPendingMessages = this.service.pulsar().getConfiguration().getMaxConcurrentNonPersistentMessagePerConnection();
        this.proxyRoles = this.service.pulsar().getConfiguration().getProxyRoles();
        this.authenticateOriginalAuthData = this.service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
        this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
        this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
        this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishRequestsPerConnection();
        this.resumeReadsThreshold = this.maxPendingSendRequests / 2;
        this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
        this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
        this.encryptionRequireOnProducer = pulsar.getConfiguration().isEncryptionRequireOnProducer();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("New connection from {}", (Object)this.remoteAddress);
        this.ctx = ctx;
        this.commandSender = new PulsarCommandSenderImpl(this.getBrokerService().getInterceptor(), this);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.isActive = false;
        log.info("Closed connection from {}", (Object)this.remoteAddress);
        BrokerInterceptor brokerInterceptor = this.getBrokerService().getInterceptor();
        if (brokerInterceptor != null) {
            brokerInterceptor.onConnectionClosed(this);
        }
        this.producers.values().forEach(producerFuture -> {
            if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                Producer producer = producerFuture.getNow(null);
                producer.closeNow(true);
            }
        });
        this.consumers.values().forEach(consumerFuture -> {
            if (!consumerFuture.isDone() || consumerFuture.isCompletedExceptionally()) {
                return;
            }
            Consumer consumer = consumerFuture.getNow(null);
            try {
                consumer.close();
            }
            catch (BrokerServiceException e) {
                log.warn("Consumer {} was already closed: {}", (Object)consumer, (Object)e);
            }
        });
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {}", (Object)this.remoteAddress, ClientCnx.isKnownException((Throwable)cause) ? cause : ExceptionUtils.getStackTrace((Throwable)cause));
            this.state = State.Failed;
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", (Object)this.remoteAddress, (Object)cause);
        }
        ctx.close();
    }

    private boolean invalidOriginalPrincipal(String originalPrincipal) {
        return this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled() && this.proxyRoles.contains(this.authRole) && (StringUtils.isBlank((CharSequence)originalPrincipal) || this.proxyRoles.contains(originalPrincipal));
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
        CompletableFuture isAuthorizedFuture;
        CompletableFuture<Object> isProxyAuthorizedFuture;
        if (this.service.isAuthorizationEnabled()) {
            isProxyAuthorizedFuture = this.originalPrincipal != null ? this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.originalPrincipal, this.getAuthenticationData()) : CompletableFuture.completedFuture(true);
            isAuthorizedFuture = this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.authRole, this.authenticationData);
        } else {
            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
            isAuthorizedFuture = CompletableFuture.completedFuture(true);
        }
        return isProxyAuthorizedFuture.thenCombine((CompletionStage)isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", new Object[]{this.originalPrincipal, operation, topicName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on topic {}", new Object[]{this.authRole, operation, topicName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
        if (this.service.isAuthorizationEnabled()) {
            if (this.authenticationData == null) {
                this.authenticationData = new AuthenticationDataCommand("", subscriptionName);
            } else {
                this.authenticationData.setSubscription(subscriptionName);
            }
            if (this.originalAuthData != null) {
                this.originalAuthData.setSubscription(subscriptionName);
            }
            return this.isTopicOperationAllowed(topicName, operation);
        }
        CompletableFuture<Boolean> isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
        CompletableFuture<Boolean> isAuthorizedFuture = CompletableFuture.completedFuture(true);
        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}", new Object[]{this.originalPrincipal, operation, topicName, subscriptionName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on topic {}, subscription {}", new Object[]{this.authRole, operation, topicName, subscriptionName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        TopicName topicName;
        long requestId = lookup.getRequestId();
        boolean authoritative = lookup.getAuthoritative();
        String advertisedListenerName = lookup.getAdvertisedListenerName();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {}", new Object[]{lookup.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(lookup.getTopic(), requestId, (GeneratedMessageLite)lookup)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for lookup ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for lookup ", this.authRole, this.originalPrincipal, topicName});
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Valid Proxy Client role should be provided for lookup ", (long)requestId));
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(this.getBrokerService().pulsar(), topicName, authoritative, this.getPrincipal(), this.getAuthenticationData(), requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
                        if (ex == null) {
                            this.ctx.writeAndFlush(lookupResponse);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Lookup";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Lookup", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Lookup", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                String msg = "Exception occurred while trying to authorize lookup";
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occurred while trying to authorize lookup", this.getPrincipal(), topicName, ex});
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Exception occurred while trying to authorize lookup", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        TopicName topicName;
        long requestId = partitionMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{partitionMetadata.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(partitionMetadata.getTopic(), requestId, (GeneratedMessageLite)partitionMetadata)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", this.authRole, this.originalPrincipal, topicName});
                this.commandSender.sendPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId);
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(this.getBrokerService().pulsar(), topicName).handle((metadata, ex) -> {
                        if (ex == null) {
                            int partitions = metadata.partitions;
                            this.commandSender.sendPartitionMetadataResponse(partitions, requestId);
                        } else if (ex instanceof PulsarClientException) {
                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{this.getRole(), this.remoteAddress, topicName, ex.getMessage()});
                            this.commandSender.sendPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, ex.getMessage(), requestId);
                        } else {
                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            PulsarApi.ServerError error = ex instanceof RestException && ((RestException)((Object)((Object)((Object)ex)))).getResponse().getStatus() < 500 ? PulsarApi.ServerError.MetadataError : PulsarApi.ServerError.ServiceNotReady;
                            this.commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId);
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Get Partition Metadata";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Get Partition Metadata", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Proxy Client is not authorized to Get Partition Metadata", (long)requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                String msg = "Exception occurred while trying to authorize get Partition Metadata";
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occurred while trying to authorize get Partition Metadata", this.getPrincipal(), topicName});
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)"Exception occurred while trying to authorize get Partition Metadata", (long)requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.commandSender.sendPartitionMetadataResponse(PulsarApi.ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId);
        }
    }

    protected void handleConsumerStats(PulsarApi.CommandConsumerStats commandConsumerStats) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        Consumer consumer = consumerFuture.getNow(null);
        ByteBuf msg = null;
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, requestId, consumerId});
            msg = Commands.newConsumerStatsResponse((PulsarApi.ServerError)PulsarApi.ServerError.ConsumerNotFound, (String)("Consumer " + consumerId + " not found"), (long)requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", (Object)requestId, (Object)consumer);
            }
            msg = Commands.newConsumerStatsResponse((PulsarApi.CommandConsumerStatsResponse.Builder)this.createConsumerStatsResponse(consumer, requestId));
        }
        this.ctx.writeAndFlush((Object)msg);
    }

    PulsarApi.CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, long requestId) {
        PulsarApi.CommandConsumerStatsResponse.Builder commandConsumerStatsResponseBuilder = PulsarApi.CommandConsumerStatsResponse.newBuilder();
        ConsumerStats consumerStats = consumer.getStats();
        commandConsumerStatsResponseBuilder.setRequestId(requestId);
        commandConsumerStatsResponseBuilder.setMsgRateOut(consumerStats.msgRateOut);
        commandConsumerStatsResponseBuilder.setMsgThroughputOut(consumerStats.msgThroughputOut);
        commandConsumerStatsResponseBuilder.setMsgRateRedeliver(consumerStats.msgRateRedeliver);
        commandConsumerStatsResponseBuilder.setConsumerName(consumerStats.consumerName);
        commandConsumerStatsResponseBuilder.setAvailablePermits((long)consumerStats.availablePermits);
        commandConsumerStatsResponseBuilder.setUnackedMessages((long)consumerStats.unackedMessages);
        commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs);
        commandConsumerStatsResponseBuilder.setAddress(consumerStats.getAddress());
        commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.getConnectedSince());
        Subscription subscription = consumer.getSubscription();
        commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false));
        commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
        commandConsumerStatsResponseBuilder.setType(subscription.getTypeString());
        return commandConsumerStatsResponseBuilder;
    }

    private void completeConnect(int clientProtoVersion, String clientVersion) {
        this.ctx.writeAndFlush((Object)Commands.newConnected((int)clientProtoVersion, (int)this.maxMessageSize));
        this.state = State.Connected;
        this.remoteEndpointProtocolVersion = clientProtoVersion;
        if (StringUtils.isNotBlank((CharSequence)clientVersion) && !clientVersion.contains(" ")) {
            this.clientVersion = clientVersion.intern();
        }
    }

    private State doAuthentication(AuthData clientData, int clientProtocolVersion, String clientVersion) throws Exception {
        boolean useOriginalAuthState = this.originalAuthState != null;
        AuthenticationState authState = useOriginalAuthState ? this.originalAuthState : this.authState;
        String authRole = useOriginalAuthState ? this.originalPrincipal : this.authRole;
        AuthData brokerData = authState.authenticate(clientData);
        if (log.isDebugEnabled()) {
            log.debug("Authenticate using original auth state : {}, role = {}", (Object)useOriginalAuthState, (Object)authRole);
        }
        if (authState.isComplete()) {
            String newAuthRole = authState.getAuthRole();
            if (!useOriginalAuthState) {
                this.authRole = newAuthRole;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", new Object[]{this.remoteAddress, this.authMethod, this.authRole, this.originalPrincipal});
            }
            if (this.state != State.Connected) {
                this.completeConnect(clientProtocolVersion, clientVersion);
            } else if (!StringUtils.isEmpty((CharSequence)authRole)) {
                if (!authRole.equals(newAuthRole)) {
                    log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", new Object[]{this.remoteAddress, authRole, newAuthRole});
                    this.ctx.close();
                } else {
                    log.info("[{}] Refreshed authentication credentials for role {}", (Object)this.remoteAddress, (Object)authRole);
                }
            }
            return State.Connected;
        }
        this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)clientProtocolVersion));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
        }
        return State.Connecting;
    }

    public void refreshAuthenticationCredentials() {
        AuthenticationState authState;
        AuthenticationState authenticationState = authState = this.originalAuthState != null ? this.originalAuthState : this.authState;
        if (authState == null) {
            return;
        }
        if (this.getState() != State.Connected || !this.isActive) {
            return;
        }
        if (authState != null && !authState.isExpired()) {
            return;
        }
        if (this.originalPrincipal != null && this.originalAuthState == null) {
            log.info("[{}] Cannot revalidate user credential when using proxy and not forwarding the credentials. Closing connection", (Object)this.remoteAddress);
            return;
        }
        this.ctx.executor().execute((Runnable)SafeRun.safeRun(() -> {
            log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", new Object[]{this.remoteAddress, this.originalPrincipal, this.authRole});
            if (!this.supportsAuthenticationRefresh()) {
                log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            if (this.pendingAuthChallengeResponse) {
                log.warn("[{}] Closing connection after timeout on refreshing auth credentials", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            try {
                AuthData brokerData = authState.refreshAuthentication();
                this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)this.remoteEndpointProtocolVersion));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", (Object)this.remoteAddress, (Object)this.authMethod);
                }
                this.pendingAuthChallengeResponse = true;
            }
            catch (AuthenticationException e) {
                log.warn("[{}] Failed to refresh authentication: {}", (Object)this.remoteAddress, (Object)e);
                this.ctx.close();
            }
        }));
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Start ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}, auth enabled: {}: has original principal = {}, original principal = {}", new Object[]{this.remoteAddress, this.service.isAuthenticationEnabled(), connect.hasOriginalPrincipal(), connect.getOriginalPrincipal()});
        }
        String clientVersion = connect.getClientVersion();
        int clientProtocolVersion = connect.getProtocolVersion();
        this.features = connect.getFeatureFlags();
        if (!this.service.isAuthenticationEnabled()) {
            this.completeConnect(clientProtocolVersion, clientVersion);
            return;
        }
        try {
            AuthData clientData = AuthData.of((byte[])connect.getAuthData().toByteArray());
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.authRole = (String)this.getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect(clientProtocolVersion, clientVersion);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.authenticationData = this.authState.getAuthDataSource();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Authenticate role : {}", (Object)this.remoteAddress, this.authState != null ? this.authState.getAuthRole() : null);
            }
            this.state = this.doAuthentication(clientData, clientProtocolVersion, clientVersion);
            if (connect.hasOriginalPrincipal() && this.service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
                String originalAuthMethod = connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : "none";
                AuthenticationProvider originalAuthenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
                if (originalAuthenticationProvider == null) {
                    throw new AuthenticationException(String.format("Can't find AuthenticationProvider for original role using auth method [%s] is not available", originalAuthMethod));
                }
                this.originalAuthState = originalAuthenticationProvider.newAuthState(AuthData.of((byte[])connect.getOriginalAuthData().getBytes()), this.remoteAddress, sslSession);
                this.originalAuthData = this.originalAuthState.getAuthDataSource();
                this.originalPrincipal = this.originalAuthState.getAuthRole();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role : {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            } else {
                String string = this.originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role (forwarded from proxy): {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            }
        }
        catch (Exception e) {
            String msg = "Unable to authenticate";
            if (e instanceof AuthenticationException) {
                log.warn("[{}] {}: {}", new Object[]{this.remoteAddress, msg, e.getMessage()});
            } else {
                log.warn("[{}] {}", new Object[]{this.remoteAddress, msg, e});
            }
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
            this.close();
        }
    }

    protected void handleAuthResponse(PulsarApi.CommandAuthResponse authResponse) {
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        this.pendingAuthChallengeResponse = false;
        if (log.isDebugEnabled()) {
            log.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData().toByteArray());
            this.doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
        }
        catch (AuthenticationException e) {
            log.warn("[{}] Authentication failed: {} ", (Object)this.remoteAddress, (Object)e.getMessage());
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)e.getMessage()));
            this.close();
        }
        catch (Exception e) {
            String msg = "Unable to handleAuthResponse";
            log.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)msg));
            this.close();
        }
    }

    protected void handleSubscribe(PulsarApi.CommandSubscribe subscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = subscribe.getRequestId();
        long consumerId = subscribe.getConsumerId();
        TopicName topicName = this.validateTopicName(subscribe.getTopic(), requestId, (GeneratedMessageLite)subscribe);
        if (topicName == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", new Object[]{this.remoteAddress, this.authRole, this.originalPrincipal});
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while subscribing ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while subscribing ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided while subscribing ");
            return;
        }
        String subscriptionName = subscribe.getSubscription();
        PulsarApi.CommandSubscribe.SubType subType = subscribe.getSubType();
        String consumerName = subscribe.getConsumerName();
        boolean isDurable = subscribe.getDurable();
        BatchMessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null;
        int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        boolean readCompacted = subscribe.getReadCompacted();
        Map metadata = CommandUtils.metadataFromCommand((PulsarApi.CommandSubscribe)subscribe);
        PulsarApi.CommandSubscribe.InitialPosition initialPosition = subscribe.getInitialPosition();
        long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec() ? subscribe.getStartMessageRollbackDurationSec() : -1L;
        SchemaData schema = subscribe.hasSchema() ? this.getSchema(subscribe.getSchema()) : null;
        boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
        boolean forceTopicCreation = subscribe.getForceTopicCreation();
        PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;
        CompletableFuture<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, subscriptionName, TopicOperation.CONSUME);
        ((CompletableFuture)isAuthorizedFuture.thenApply(arg_0 -> this.lambda$handleSubscribe$16(topicName, subscriptionName, metadata, requestId, consumerId, forceTopicCreation, isDurable, schema, subType, priorityLevel, consumerName, (MessageIdImpl)startMessageId, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta, arg_0))).exceptionally(ex -> {
            String msg = String.format("[%s] %s with role %s", this.remoteAddress, ex.getMessage(), this.getPrincipal());
            if (ex.getCause() instanceof PulsarServerException) {
                log.info(msg);
            } else {
                log.warn(msg);
            }
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
        return SchemaData.builder().data(protocolSchema.getSchemaData().toByteArray()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty((String)this.originalPrincipal)).type(Commands.getSchemaType((PulsarApi.Schema.Type)protocolSchema.getType())).props(protocolSchema.getPropertiesList().stream().collect(Collectors.toMap(PulsarApi.KeyValue::getKey, PulsarApi.KeyValue::getValue))).build();
    }

    protected void handleProducer(PulsarApi.CommandProducer cmdProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = cmdProducer.getProducerId();
        long requestId = cmdProducer.getRequestId();
        String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() : this.service.generateUniqueProducerName();
        long epoch = cmdProducer.getEpoch();
        boolean userProvidedProducerName = cmdProducer.getUserProvidedProducerName();
        boolean isEncrypted = cmdProducer.getEncrypted();
        Map metadata = CommandUtils.metadataFromCommand((PulsarApi.CommandProducer)cmdProducer);
        SchemaData schema = cmdProducer.hasSchema() ? this.getSchema(cmdProducer.getSchema()) : null;
        TopicName topicName = this.validateTopicName(cmdProducer.getTopic(), requestId, (GeneratedMessageLite)cmdProducer);
        if (topicName == null) {
            return;
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while creating producer ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while creating producer ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided while creating producer ");
            return;
        }
        CompletableFuture<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, TopicOperation.PRODUCE);
        ((CompletableFuture)isAuthorizedFuture.thenApply(isAuthorized -> {
            if (isAuthorized.booleanValue()) {
                CompletableFuture producerFuture;
                CompletableFuture existingProducerFuture;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to Produce with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
                }
                if ((existingProducerFuture = (CompletableFuture)this.producers.putIfAbsent(producerId, producerFuture = new CompletableFuture())) != null) {
                    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                        Producer producer = existingProducerFuture.getNow(null);
                        log.info("[{}] Producer with the same id is already created: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
                        this.commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion());
                        return null;
                    }
                    PulsarApi.ServerError error = null;
                    if (!existingProducerFuture.isDone()) {
                        error = PulsarApi.ServerError.ServiceNotReady;
                    } else {
                        error = this.getErrorCode(existingProducerFuture);
                        this.producers.remove(producerId, (Object)existingProducerFuture);
                    }
                    log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                    this.commandSender.sendErrorResponse(requestId, error, "Producer is already present on the connection");
                    return null;
                }
                log.info("[{}][{}] Creating producer. producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                ((CompletableFuture)this.service.getOrCreateTopic(topicName.toString()).thenAccept(topic -> {
                    if (topic.isBacklogQuotaExceeded(producerName)) {
                        IllegalStateException illegalStateException = new IllegalStateException("Cannot create producer on topic with backlog quota exceeded");
                        BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage());
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage());
                        }
                        producerFuture.completeExceptionally(illegalStateException);
                        this.producers.remove(producerId, (Object)producerFuture);
                        return;
                    }
                    if ((topic.isEncryptionRequired() || this.encryptionRequireOnProducer) && !isEncrypted) {
                        String msg = String.format("Encryption is required in %s", topicName);
                        log.warn("[{}] {}", (Object)this.remoteAddress, (Object)msg);
                        this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.MetadataError, msg);
                        this.producers.remove(producerId, (Object)producerFuture);
                        return;
                    }
                    this.disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
                    CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema((Topic)topic, schema);
                    schemaVersionFuture.exceptionally(exception -> {
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getMessage());
                        this.producers.remove(producerId, (Object)producerFuture);
                        return null;
                    });
                    schemaVersionFuture.thenAccept(schemaVersion -> {
                        Producer producer = new Producer((Topic)topic, this, producerId, producerName, this.getPrincipal(), isEncrypted, metadata, (SchemaVersion)schemaVersion, epoch, userProvidedProducerName);
                        try {
                            topic.addProducer(producer);
                            if (this.isActive()) {
                                if (producerFuture.complete(producer)) {
                                    log.info("[{}] Created new producer: {}", (Object)this.remoteAddress, (Object)producer);
                                    this.commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion());
                                    return;
                                }
                                producer.closeNow(true);
                                log.info("[{}] Cleared producer created after timeout on client side {}", (Object)this.remoteAddress, (Object)producer);
                            } else {
                                producer.closeNow(true);
                                log.info("[{}] Cleared producer created after connection was closed: {}", (Object)this.remoteAddress, (Object)producer);
                                producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
                            }
                        }
                        catch (Exception ise) {
                            log.error("[{}] Failed to add producer to topic {}: {}", new Object[]{this.remoteAddress, topicName, ise.getMessage()});
                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ise), ise.getMessage());
                            producerFuture.completeExceptionally(ise);
                        }
                        this.producers.remove(producerId, (Object)producerFuture);
                    });
                })).exceptionally(exception -> {
                    Throwable cause = exception.getCause();
                    if (cause instanceof NoSuchElementException) {
                        cause = new BrokerServiceException.TopicNotFoundException("Topic Not Found.");
                    }
                    if (!Exceptions.areExceptionsPresentInChain((Throwable)cause, (Class[])new Class[]{BrokerServiceException.ServiceUnitNotReadyException.class, ManagedLedgerException.class})) {
                        log.error("[{}] Failed to create topic {}, producerId={}", new Object[]{this.remoteAddress, topicName, producerId, exception});
                    }
                    if (producerFuture.completeExceptionally((Throwable)exception)) {
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
                    }
                    this.producers.remove(producerId, (Object)producerFuture);
                    return null;
                });
            } else {
                String msg = "Client is not authorized to Produce";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
            }
            return null;
        })).exceptionally(ex -> {
            String msg = String.format("[%s] %s with role %s", this.remoteAddress, ex.getMessage(), this.getPrincipal());
            log.warn(msg);
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    protected void handleSend(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(send.getProducerId());
        if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
            log.warn("[{}] Producer had already been closed: {}", (Object)this.remoteAddress, (Object)send.getProducerId());
            return;
        }
        Producer producer = producerFuture.getNow(null);
        if (log.isDebugEnabled()) {
            this.printSendCommandDebug(send, headersAndPayload);
        }
        if (producer.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.MaxNonPersistentPendingMessages) {
                long producerId = send.getProducerId();
                long sequenceId = send.getSequenceId();
                long highestSequenceId = send.getHighestSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered((Object)producer.getTopic().getName(), (SafeRunnable)SafeRun.safeRun(() -> this.commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1L, -1L)));
                producer.recordMessageDrop(send.getNumMessages());
                return;
            }
            ++this.nonPersistentPendingMessages;
        }
        this.startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
        if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
            TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
            producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.getIsChunk());
            return;
        }
        if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.getIsChunk());
        } else {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.getIsChunk());
        }
    }

    private void printSendCommandDebug(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        headersAndPayload.markReaderIndex();
        PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        headersAndPayload.resetReaderIndex();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}", new Object[]{this.remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey()});
        }
        msgMetadata.recycle();
    }

    protected void handleAck(PulsarApi.CommandAck ack) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(ack.getConsumerId());
        long requestId = ack.getRequestId();
        boolean hasRequestId = ack.hasRequestId();
        long consumerId = ack.getConsumerId();
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((CompletableFuture)((Consumer)consumerFuture.getNow(null)).messageAcked(ack).thenRun(() -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush((Object)Commands.newAckResponse((long)requestId, null, null, (long)consumerId));
                }
            })).exceptionally(e -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush((Object)Commands.newAckResponse((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(e), (String)e.getMessage(), (long)consumerId));
                }
                return null;
            });
        }
    }

    protected void handleFlow(PulsarApi.CommandFlow flow) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, flow.getConsumerId(), flow.getMessagePermits()});
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(flow.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
                consumer.flowPermits(flow.getMessagePermits());
            } else {
                log.info("[{}] Couldn't find consumer {}", (Object)this.remoteAddress, (Object)flow.getConsumerId());
            }
        }
    }

    protected void handleRedeliverUnacknowledged(PulsarApi.CommandRedeliverUnacknowledgedMessages redeliver) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Resend Command from consumer {} ", (Object)this.remoteAddress, (Object)redeliver.getConsumerId());
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(redeliver.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
            } else {
                consumer.redeliverUnacknowledgedMessages();
            }
        }
    }

    protected void handleUnsubscribe(PulsarApi.CommandUnsubscribe unsubscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(unsubscribe.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).doUnsubscribe(unsubscribe.getRequestId());
        } else {
            this.commandSender.sendErrorResponse(unsubscribe.getRequestId(), PulsarApi.ServerError.MetadataError, "Consumer not found");
        }
    }

    protected void handleSeek(PulsarApi.CommandSeek seek) {
        boolean consumerCreated;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long requestId = seek.getRequestId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(seek.getConsumerId());
        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.MetadataError, "Message id and message publish time were not present");
            return;
        }
        boolean bl = consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
        if (consumerCreated && seek.hasMessageId()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            PulsarApi.MessageIdData msgIdData = seek.getMessageId();
            long[] ackSet = null;
            if (msgIdData.getAckSetCount() > 0) {
                ackSet = SafeCollectionUtils.longListToArray((List)msgIdData.getAckSetList());
            }
            PositionImpl position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId(), ackSet);
            ((CompletableFuture)subscription.resetCursor((Position)position).thenRun(() -> this.lambda$handleSeek$27(subscription, (Position)position, requestId))).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.UnknownError, "Error when resetting subscription: " + ex.getCause().getMessage());
                return null;
            });
        } else if (consumerCreated && seek.hasMessagePublishTime()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            long timestamp = seek.getMessagePublishTime();
            ((CompletableFuture)subscription.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), timestamp});
                this.commandSender.sendSuccessResponse(requestId);
            })).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.UnknownError, "Reset subscription to publish time error: " + ex.getCause().getMessage());
                return null;
            });
        } else {
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.MetadataError, "Consumer not found");
        }
    }

    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = closeProducer.getProducerId();
        long requestId = closeProducer.getRequestId();
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(producerId);
        if (producerFuture == null) {
            log.warn("[{}] Producer was not registered on the connection. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.UnknownError, "Producer was not registered on the connection");
            return;
        }
        if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer before its creation was completed. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
            return;
        }
        if (producerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer that already failed to be created. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
            return;
        }
        Producer producer = producerFuture.getNow(null);
        log.info("[{}][{}] Closing producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
        producer.close(true).thenAccept(v -> {
            log.info("[{}][{}] Closed producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, (Object)producerFuture);
        });
    }

    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        log.info("[{}] Closing consumer: consumerId={}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long requestId = closeConsumer.getRequestId();
        long consumerId = closeConsumer.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        if (consumerFuture == null) {
            log.warn("[{}] Consumer was not registered on the connection: consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.MetadataError, "Consumer not found");
            return;
        }
        if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer before its creation was completed. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        if (consumerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer that already failed to be created. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        Consumer consumer = consumerFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.commandSender.sendSuccessResponse(requestId);
            log.info("[{}] Closed consumer, consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
        }
        catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer {} : {}", new Object[]{this.remoteAddress, consumer, e});
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
        }
    }

    protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId getLastMessageId) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(getLastMessageId.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            long requestId = getLastMessageId.getRequestId();
            Topic topic = consumer.getSubscription().getTopic();
            Position position = topic.getLastPosition();
            int partitionIndex = TopicName.getPartitionIndex((String)topic.getName());
            Position markDeletePosition = null;
            if (consumer.getSubscription() instanceof PersistentSubscription) {
                markDeletePosition = ((PersistentSubscription)consumer.getSubscription()).getCursor().getMarkDeletedPosition();
            }
            this.getLargestBatchIndexWhenPossible(topic, (PositionImpl)position, (PositionImpl)markDeletePosition, partitionIndex, requestId, consumer.getSubscription().getName());
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)getLastMessageId.getRequestId(), (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    private void getLargestBatchIndexWhenPossible(Topic topic, PositionImpl position, PositionImpl markDeletePosition, int partitionIndex, long requestId, String subscriptionName) {
        PersistentTopic persistentTopic = (PersistentTopic)topic;
        ManagedLedgerImpl ml = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        if (position.getEntryId() == -1L) {
            PulsarApi.MessageIdData messageId = PulsarApi.MessageIdData.newBuilder().setLedgerId(position.getLedgerId()).setEntryId(position.getEntryId()).setPartition(partitionIndex).build();
            PulsarApi.MessageIdData markDeleteMessageId = null;
            if (null != markDeletePosition) {
                markDeleteMessageId = PulsarApi.MessageIdData.newBuilder().setLedgerId(markDeletePosition.getLedgerId()).setEntryId(markDeletePosition.getEntryId()).build();
            }
            this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (PulsarApi.MessageIdData)messageId, markDeleteMessageId));
            return;
        }
        final CompletableFuture entryFuture = new CompletableFuture();
        ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                entryFuture.complete(entry);
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                entryFuture.completeExceptionally((Throwable)exception);
            }
        }, null);
        CompletionStage batchSizeFuture = entryFuture.thenApply(entry -> {
            PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)entry.getDataBuffer());
            int batchSize = metadata.getNumMessagesInBatch();
            entry.release();
            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
        });
        ((CompletableFuture)batchSizeFuture).whenComplete((batchSize, e) -> {
            if (e != null) {
                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
                    this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (PulsarApi.MessageIdData)PulsarApi.MessageIdData.newBuilder().setLedgerId(-1L).setEntryId(-1L).build(), (PulsarApi.MessageIdData)PulsarApi.MessageIdData.newBuilder().setLedgerId(markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L).setEntryId(markDeletePosition != null ? markDeletePosition.getEntryId() : -1L).build()));
                } else {
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)("Failed to get batch size for entry " + e.getMessage())));
                }
            } else {
                int largestBatchIndex;
                int n = largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{this.remoteAddress, topic.getName(), subscriptionName, position, partitionIndex});
                }
                PulsarApi.MessageIdData messageId = PulsarApi.MessageIdData.newBuilder().setLedgerId(position.getLedgerId()).setEntryId(position.getEntryId()).setPartition(partitionIndex).setBatchIndex(largestBatchIndex).build();
                PulsarApi.MessageIdData markDeleteMessageId = null;
                if (null != markDeletePosition) {
                    markDeleteMessageId = PulsarApi.MessageIdData.newBuilder().setLedgerId(markDeletePosition.getLedgerId()).setEntryId(markDeletePosition.getEntryId()).build();
                }
                this.ctx.writeAndFlush((Object)Commands.newGetLastMessageIdResponse((long)requestId, (PulsarApi.MessageIdData)messageId, markDeleteMessageId));
            }
        });
    }

    protected void handleGetTopicsOfNamespace(PulsarApi.CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        PulsarApi.CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
        NamespaceName namespaceName = NamespaceName.get((String)namespace);
        ((CompletableFuture)this.getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode).thenAccept(topics -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, requestId, topics.size()});
            }
            this.commandSender.sendGetTopicsOfNamespaceResponse((List<String>)topics, requestId);
        })).exceptionally(ex -> {
            log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, requestId});
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException((Throwable)ex)), ex.getMessage());
            return null;
        });
    }

    protected void handleGetSchema(PulsarApi.CommandGetSchema commandGetSchema) {
        String schemaName;
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, new String(commandGetSchema.getSchemaVersion().toByteArray()), commandGetSchema.getTopic(), commandGetSchema.getRequestId()});
        }
        long requestId = commandGetSchema.getRequestId();
        SchemaVersion schemaVersion = SchemaVersion.Latest;
        if (commandGetSchema.hasSchemaVersion()) {
            schemaVersion = this.schemaService.versionFromBytes(commandGetSchema.getSchemaVersion().toByteArray());
        }
        try {
            schemaName = TopicName.get((String)commandGetSchema.getTopic()).getSchemaName();
        }
        catch (Throwable t) {
            this.commandSender.sendGetSchemaErrorResponse(requestId, PulsarApi.ServerError.InvalidTopicName, t.getMessage());
            return;
        }
        ((CompletableFuture)this.schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
            if (schemaAndMetadata == null) {
                this.commandSender.sendGetSchemaErrorResponse(requestId, PulsarApi.ServerError.TopicNotFound, "Topic not found or no-schema");
            } else {
                this.commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo((String)schemaName, (SchemaData)schemaAndMetadata.schema), schemaAndMetadata.version);
            }
        })).exceptionally(ex -> {
            this.commandSender.sendGetSchemaErrorResponse(requestId, PulsarApi.ServerError.UnknownError, ex.getMessage());
            return null;
        });
    }

    protected void handleGetOrCreateSchema(PulsarApi.CommandGetOrCreateSchema commandGetOrCreateSchema) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetOrCreateSchema call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandGetOrCreateSchema.getRequestId();
        String topicName = commandGetOrCreateSchema.getTopic();
        SchemaData schemaData = this.getSchema(commandGetOrCreateSchema.getSchema());
        SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
        ((CompletableFuture)this.service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
            if (topicOpt.isPresent()) {
                Topic topic = (Topic)topicOpt.get();
                CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema(topic, schema);
                ((CompletableFuture)schemaVersionFuture.exceptionally(ex -> {
                    PulsarApi.ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
                    this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, ex.getMessage());
                    return null;
                })).thenAccept(schemaVersion -> this.commandSender.sendGetOrCreateSchemaResponse(requestId, (SchemaVersion)schemaVersion));
            } else {
                this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, PulsarApi.ServerError.TopicNotFound, "Topic not found");
            }
        })).exceptionally(ex -> {
            PulsarApi.ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
            this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, ex.getMessage());
            return null;
        });
    }

    protected void handleNewTxn(PulsarApi.CommandNewTxn command) {
        long requestId = command.getRequestId();
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get((long)command.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive new txn request {} to transaction meta store {} from {}.", new Object[]{requestId, tcId, this.remoteAddress});
        }
        this.service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds()).whenComplete((txnID, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response {} for new txn request {}", (Object)tcId.getId(), (Object)requestId);
                }
                this.ctx.writeAndFlush((Object)Commands.newTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for new txn request {}", (Object)requestId, ex);
                }
                this.ctx.writeAndFlush((Object)Commands.newTxnResponse((long)requestId, (long)tcId.getId(), (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(ex), (String)ex.getMessage()));
            }
        });
    }

    protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("Receive add published partition to txn request {} from {} with txnId {}", new Object[]{requestId, this.remoteAddress, txnID});
        }
        this.service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList()).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddPartitionToTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for add published partition to txn request {}", (Object)requestId, ex);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddPartitionToTxnResponse((long)requestId, (long)txnID.getMostSigBits(), (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(ex), (String)ex.getMessage()));
            }
        });
    }

    protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
        long requestId = command.getRequestId();
        int txnAction = command.getTxnAction().getNumber();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        ((CompletableFuture)this.service.pulsar().getTransactionMetadataStoreService().endTransaction(txnID, txnAction, command.getMessageIdList()).thenRun(() -> this.ctx.writeAndFlush((Object)Commands.newEndTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits())))).exceptionally(throwable -> {
            log.error("Send response error for end txn request.", throwable);
            this.ctx.writeAndFlush((Object)Commands.newEndTxnResponse((long)requestId, (long)txnID.getMostSigBits(), (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(throwable), (String)throwable.getMessage()));
            return null;
        });
    }

    protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition command) {
        long requestId = command.getRequestId();
        String topic = command.getTopic();
        List messageIdDataList = command.getMessageIdList();
        int txnAction = command.getTxnAction().getNumber();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        ((CompletableFuture)this.service.getTopics().get((Object)TopicName.get((String)topic).toString())).whenComplete((optionalTopic, t) -> {
            if (!optionalTopic.isPresent()) {
                this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.TopicNotFound, (String)("Topic " + topic + " is not found.")));
                return;
            }
            ((Topic)optionalTopic.get()).endTxn(txnID, txnAction, messageIdDataList).whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    log.error("Handle endTxnOnPartition {} failed.", (Object)topic, throwable);
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)throwable.getMessage()));
                    return;
                }
                this.ctx.writeAndFlush((Object)Commands.newEndTxnOnPartitionResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
            });
        });
    }

    protected void handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription command) {
        long requestId = command.getRequestId();
        long txnidMostBits = command.getTxnidMostBits();
        long txnidLeastBits = command.getTxnidLeastBits();
        String topic = command.getSubscription().getTopic();
        String subName = command.getSubscription().getSubscription();
        int txnAction = command.getTxnAction().getNumber();
        ((CompletableFuture)this.service.getTopics().get((Object)TopicName.get((String)topic).toString())).thenAccept(optionalTopic -> {
            if (!optionalTopic.isPresent()) {
                log.error("The topic {} is not exist in broker.", (Object)topic);
                this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)("The topic " + topic + " is not exist in broker.")));
                return;
            }
            Subscription subscription = ((Topic)optionalTopic.get()).getSubscription(subName);
            if (subscription == null) {
                log.error("Topic {} subscription {} is not exist.", (Object)((Topic)optionalTopic.get()).getName(), (Object)subName);
                this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)("Topic " + ((Topic)optionalTopic.get()).getName() + " subscription " + subName + " is not exist.")));
                return;
            }
            CompletableFuture<Void> completableFuture = subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction);
            completableFuture.whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    log.error("Handle end txn on subscription failed for request {}", (Object)requestId);
                    this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)"Handle end txn on subscription failed."));
                    return;
                }
                this.ctx.writeAndFlush((Object)Commands.newEndTxnOnSubscriptionResponse((long)requestId, (long)txnidLeastBits, (long)txnidMostBits));
            });
        });
    }

    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
        if (schema != null) {
            return topic.addSchema(schema);
        }
        return topic.hasSchema().thenCompose(hasSchema -> {
            log.info("[{}] {} configured with schema {}", new Object[]{this.remoteAddress, topic.getName(), hasSchema});
            CompletableFuture<SchemaVersion> result = new CompletableFuture<SchemaVersion>();
            if (hasSchema.booleanValue() && (this.schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                result.completeExceptionally(new IncompatibleSchemaException("Producers cannot connect or send message without a schema to topics with a schema"));
            } else {
                result.complete(SchemaVersion.Empty);
            }
            return result;
        });
    }

    protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("Receive add published partition to txn request {} from {} with txnId {}", new Object[]{requestId, this.remoteAddress, txnID});
        }
        this.service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, MLTransactionMetadataStore.subscriptionToTxnSubscription((List)command.getSubscriptionList())).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddSubscriptionToTxnResponse((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits()));
                log.info("handle add partition to txn finish.");
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for add published partition to txn request {}", (Object)requestId, ex);
                }
                this.ctx.writeAndFlush((Object)Commands.newAddSubscriptionToTxnResponse((long)requestId, (long)txnID.getMostSigBits(), (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(ex), (String)ex.getMessage()));
            }
        });
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    protected void interceptCommand(PulsarApi.BaseCommand command) throws InterceptException {
        if (this.getBrokerService().getInterceptor() != null) {
            this.getBrokerService().getInterceptor().onPulsarCommand(command, this);
        }
    }

    @Override
    public void closeProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseProducer((long)producer.getProducerId(), (long)-1L));
        } else {
            this.close();
        }
    }

    @Override
    public void closeConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseConsumer((long)consumer.consumerId(), (long)-1L));
        } else {
            this.close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    @Override
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    @Override
    public void removedConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
    }

    @Override
    public void removedProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
    }

    private void safelyRemoveProducer(Producer producer) {
        CompletableFuture future;
        long producerId = producer.getProducerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
        }
        if ((future = (CompletableFuture)this.producers.get(producerId)) != null) {
            future.whenComplete((producer2, exception) -> {
                if (exception != null || producer2 == producer) {
                    this.producers.remove(producerId, (Object)future);
                }
            });
        }
    }

    private void safelyRemoveConsumer(Consumer consumer) {
        CompletableFuture future;
        long consumerId = consumer.consumerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer});
        }
        if ((future = (CompletableFuture)this.consumers.get(consumerId)) != null) {
            future.whenComplete((consumer2, exception) -> {
                if (exception != null || consumer2 == consumer) {
                    this.consumers.remove(consumerId, (Object)future);
                }
            });
        }
    }

    @Override
    public boolean isActive() {
        return this.isActive;
    }

    @Override
    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation(Producer producer, int msgSize, int numMessages) {
        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
        boolean isPublishRateExceeded = false;
        if (this.preciseTopicPublishRateLimitingEnable) {
            boolean isPreciseTopicPublishRateExceeded = producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
            if (isPreciseTopicPublishRateExceeded) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
        } else {
            isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
        }
        if (++this.pendingSendRequest == this.maxPendingSendRequests || isPublishRateExceeded) {
            this.ctx.channel().config().setAutoRead(false);
            this.autoReadDisabledRateLimiting = isPublishRateExceeded;
        }
        if (this.getBrokerService().isReachMessagePublishBufferThreshold()) {
            this.ctx.channel().config().setAutoRead(false);
            this.autoReadDisabledPublishBufferLimiting = true;
        }
    }

    @Override
    public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
        if (--this.pendingSendRequest == this.resumeReadsThreshold) {
            this.ctx.channel().config().setAutoRead(true);
            this.ctx.read();
        }
        if (isNonPersistentTopic) {
            --this.nonPersistentPendingMessages;
        }
    }

    @Override
    public void enableCnxAutoRead() {
        if (!(this.ctx == null || this.ctx.channel().config().isAutoRead() || this.autoReadDisabledRateLimiting || this.autoReadDisabledPublishBufferLimiting)) {
            this.ctx.channel().config().setAutoRead(true);
            this.ctx.read();
        }
    }

    @Override
    public void disableCnxAutoRead() {
        if (this.ctx != null && this.ctx.channel().config().isAutoRead()) {
            this.ctx.channel().config().setAutoRead(false);
        }
    }

    @Override
    public void cancelPublishRateLimiting() {
        if (this.autoReadDisabledRateLimiting) {
            this.autoReadDisabledRateLimiting = false;
        }
    }

    @Override
    public void cancelPublishBufferLimiting() {
        if (this.autoReadDisabledPublishBufferLimiting) {
            this.autoReadDisabledPublishBufferLimiting = false;
        }
    }

    private <T> PulsarApi.ServerError getErrorCode(CompletableFuture<T> future) {
        PulsarApi.ServerError error;
        block2: {
            error = PulsarApi.ServerError.UnknownError;
            try {
                future.getNow(null);
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException)) break block2;
                error = BrokerServiceException.getClientErrorCode(e.getCause());
            }
        }
        return error;
    }

    private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
        if (producerName != null && producerName.startsWith(this.replicatorPrefix)) {
            try {
                if (((Boolean)this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                    this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, (Object)false);
                }
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{topic, producerName, this.ctx.channel()});
            }
        }
    }

    private TopicName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) {
        try {
            return TopicName.get((String)topic);
        }
        catch (Throwable t) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, topic, t});
            }
            if (requestCommand instanceof PulsarApi.CommandLookupTopic) {
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else if (requestCommand instanceof PulsarApi.CommandPartitionedTopicMetadata) {
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage()), (long)requestId));
            } else {
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.InvalidTopicName, (String)("Invalid topic name: " + t.getMessage())));
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ByteBufPair newMessageAndIntercept(long consumerId, PulsarApi.MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic) {
        PulsarApi.BaseCommand command = Commands.newMessageCommand((long)consumerId, (PulsarApi.MessageIdData)messageId, (int)redeliveryCount, (long[])ackSet);
        ByteBufPair res = Commands.serializeCommandMessageWithSize((PulsarApi.BaseCommand)command, (ByteBuf)metadataAndPayload);
        try {
            this.getBrokerService().getInterceptor().onPulsarCommand(command, this);
        }
        catch (Exception e) {
            log.error("Exception occur when intercept messages.", (Throwable)e);
        }
        finally {
            command.getMessage().recycle();
            command.recycle();
        }
        return res;
    }

    public State getState() {
        return this.state;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    @Override
    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    @Override
    public Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    @Override
    public HAProxyMessage getHAProxyMessage() {
        return this.proxyMessage;
    }

    @Override
    public boolean hasHAProxyMessage() {
        return this.proxyMessage != null;
    }

    boolean hasConsumer(long consumerId) {
        return this.consumers.containsKey(consumerId);
    }

    @Override
    public boolean isBatchMessageCompatibleVersion() {
        return this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v4.getNumber();
    }

    boolean supportsAuthenticationRefresh() {
        return this.features != null && this.features.getSupportsAuthRefresh();
    }

    @Override
    public String getClientVersion() {
        return this.clientVersion;
    }

    @Override
    public long getMessagePublishBufferSize() {
        return this.messagePublishBufferSize;
    }

    @VisibleForTesting
    void setMessagePublishBufferSize(long bufferSize) {
        this.messagePublishBufferSize = bufferSize;
    }

    @VisibleForTesting
    void setAutoReadDisabledRateLimiting(boolean isLimiting) {
        this.autoReadDisabledRateLimiting = isLimiting;
    }

    @Override
    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public AuthenticationState getAuthState() {
        return this.authState;
    }

    @Override
    public AuthenticationDataSource getAuthenticationData() {
        return this.originalAuthData != null ? this.originalAuthData : this.authenticationData;
    }

    public String getPrincipal() {
        return this.originalPrincipal != null ? this.originalPrincipal : this.authRole;
    }

    public AuthenticationProvider getAuthenticationProvider() {
        return this.authenticationProvider;
    }

    @Override
    public String getAuthRole() {
        return this.authRole;
    }

    public String getAuthMethod() {
        return this.authMethod;
    }

    public ConcurrentLongHashMap<CompletableFuture<Consumer>> getConsumers() {
        return this.consumers;
    }

    public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
        return this.producers;
    }

    @Override
    public PulsarCommandSender getCommandSender() {
        return this.commandSender;
    }

    @Override
    public void execute(Runnable runnable) {
        this.ctx.channel().eventLoop().execute(runnable);
    }

    private /* synthetic */ void lambda$handleSeek$27(Subscription subscription, Position position, long requestId) {
        log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), position});
        this.commandSender.sendSuccessResponse(requestId);
    }

    private /* synthetic */ Object lambda$handleSubscribe$16(TopicName topicName, String subscriptionName, Map metadata, long requestId, long consumerId, boolean forceTopicCreation, boolean isDurable, SchemaData schema, PulsarApi.CommandSubscribe.SubType subType, int priorityLevel, String consumerName, MessageIdImpl startMessageId, boolean readCompacted, PulsarApi.CommandSubscribe.InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean isReplicated, PulsarApi.KeySharedMeta keySharedMeta, Boolean isAuthorized) {
        if (isAuthorized.booleanValue()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to subscribe with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
            }
            log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
            try {
                Metadata.validateMetadata((Map)metadata);
            }
            catch (IllegalArgumentException iae) {
                String msg = iae.getMessage();
                this.commandSender.sendErrorResponse(requestId, PulsarApi.ServerError.MetadataError, msg);
                return null;
            }
            CompletableFuture consumerFuture = new CompletableFuture();
            CompletableFuture existingConsumerFuture = (CompletableFuture)this.consumers.putIfAbsent(consumerId, consumerFuture);
            if (existingConsumerFuture != null) {
                if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                    Consumer consumer2 = existingConsumerFuture.getNow(null);
                    log.info("[{}] Consumer with the same id is already created: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer2});
                    this.commandSender.sendSuccessResponse(requestId);
                    return null;
                }
                log.warn("[{}][{}][{}] Consumer with id is already present on the connection, consumerId={}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId});
                PulsarApi.ServerError error = null;
                if (!existingConsumerFuture.isDone()) {
                    error = PulsarApi.ServerError.ServiceNotReady;
                } else {
                    error = this.getErrorCode(existingConsumerFuture);
                    this.consumers.remove(consumerId, (Object)existingConsumerFuture);
                }
                this.commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection");
                return null;
            }
            boolean createTopicIfDoesNotExist = forceTopicCreation && this.service.isAllowAutoTopicCreation(topicName.toString());
            ((CompletableFuture)((CompletableFuture)this.service.getTopic(topicName.toString(), createTopicIfDoesNotExist).thenCompose(optTopic -> {
                boolean rejectSubscriptionIfDoesNotExist;
                if (!optTopic.isPresent()) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.TopicNotFoundException("Topic does not exist"));
                }
                Topic topic = (Topic)optTopic.get();
                boolean bl = rejectSubscriptionIfDoesNotExist = isDurable && !this.service.isAllowAutoSubscriptionCreation(topicName.toString()) && !topic.getSubscriptions().containsKey((Object)subscriptionName);
                if (rejectSubscriptionIfDoesNotExist) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionNotFoundException("Subscription does not exist"));
                }
                if (schema != null) {
                    return topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(v -> topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, (MessageId)startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta));
                }
                return topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, (MessageId)startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta);
            })).thenAccept(consumer -> {
                if (consumerFuture.complete(consumer)) {
                    log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                    this.commandSender.sendSuccessResponse(requestId);
                } else {
                    try {
                        consumer.close();
                        log.info("[{}] Cleared consumer created after timeout on client side {}", (Object)this.remoteAddress, consumer);
                    }
                    catch (BrokerServiceException e) {
                        log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                    }
                    this.consumers.remove(consumerId, (Object)consumerFuture);
                }
            })).exceptionally(exception -> {
                if (exception.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage()});
                    }
                } else if (exception.getCause() instanceof BrokerServiceException) {
                    log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage()});
                } else {
                    log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage(), exception});
                }
                if (consumerFuture.completeExceptionally((Throwable)exception)) {
                    this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage());
                }
                this.consumers.remove(consumerId, (Object)consumerFuture);
                return null;
            });
        } else {
            String msg = "Client is not authorized to subscribe";
            log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
        }
        return null;
    }

    static enum State {
        Start,
        Connected,
        Failed,
        Connecting;

    }
}

