/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.PersistentTopics;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.lookup.DestinationLookup;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCnx
extends PulsarHandler {
    private final BrokerService service;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private State state;
    private volatile boolean isActive = true;
    private String authRole = null;
    private static final int MaxPendingSendRequests = 1000;
    private static final int ResumeReadsThreshold = 500;
    private int pendingSendRequest = 0;
    private final String replicatorPrefix;
    private String clientVersion = null;
    private int nonPersistentPendingMessages = 0;
    private final int MaxNonPersistentPendingMessages;
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);

    public ServerCnx(BrokerService service) {
        super(service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        this.service = service;
        this.state = State.Start;
        this.producers = new ConcurrentLongHashMap(8, 1);
        this.consumers = new ConcurrentLongHashMap(8, 1);
        this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix();
        this.MaxNonPersistentPendingMessages = service.pulsar().getConfiguration().getMaxConcurrentNonPersistentMessagePerConnection();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("New connection from {}", (Object)this.remoteAddress);
        this.ctx = ctx;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.isActive = false;
        log.info("Closed connection from {}", (Object)this.remoteAddress);
        this.producers.values().forEach(producerFuture -> {
            if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                Producer producer = producerFuture.getNow(null);
                producer.closeNow();
            }
        });
        this.consumers.values().forEach(consumerFuture -> {
            if (!consumerFuture.isDone() || consumerFuture.isCompletedExceptionally()) {
                return;
            }
            Consumer consumer = consumerFuture.getNow(null);
            try {
                consumer.close();
            }
            catch (BrokerServiceException e) {
                log.warn("Consumer {} was already closed: {}", new Object[]{consumer, e.getMessage(), e});
            }
        });
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("[{}] Got exception: {}", new Object[]{this.remoteAddress, cause.getMessage(), cause});
        ctx.close();
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        Semaphore lookupSemaphore;
        long requestId = lookup.getRequestId();
        String topic = lookup.getTopic();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {}", new Object[]{topic, this.remoteAddress, requestId});
        }
        if ((lookupSemaphore = this.service.getLookupRequestSemaphore()).tryAcquire()) {
            DestinationLookup.lookupDestinationAsync(this.getBrokerService().pulsar(), DestinationName.get((String)topic), lookup.getAuthoritative(), this.getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
                if (ex == null) {
                    this.ctx.writeAndFlush(lookupResponse);
                } else {
                    log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, topic, ex.getMessage(), ex});
                    this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
                }
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requets {}", (Object)this.remoteAddress, (Object)topic);
            }
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        Semaphore lookupSemaphore;
        long requestId = partitionMetadata.getRequestId();
        String topic = partitionMetadata.getTopic();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{topic, this.remoteAddress, requestId});
        }
        if ((lookupSemaphore = this.service.getLookupRequestSemaphore()).tryAcquire()) {
            PersistentTopics.getPartitionedTopicMetadata(this.getBrokerService().pulsar(), this.getRole(), DestinationName.get((String)topic)).handle((metadata, ex) -> {
                if (ex == null) {
                    int partitions = metadata.partitions;
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)partitions, (long)requestId));
                } else if (ex instanceof PulsarClientException) {
                    log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{this.getRole(), this.remoteAddress, topic, ex.getMessage()});
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)ex.getMessage(), (long)requestId));
                } else {
                    log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, topic, ex.getMessage(), ex});
                    this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
                }
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", (Object)this.remoteAddress, (Object)topic);
            }
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.TooManyRequests, (String)"Failed due to too many pending lookup requests", (long)requestId));
        }
    }

    protected void handleConsumerStats(PulsarApi.CommandConsumerStats commandConsumerStats) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        Consumer consumer = consumerFuture.getNow(null);
        ByteBuf msg = null;
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, requestId, consumerId});
            msg = Commands.newConsumerStatsResponse((PulsarApi.ServerError)PulsarApi.ServerError.ConsumerNotFound, (String)("Consumer " + consumerId + " not found"), (long)requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", (Object)requestId, (Object)consumer);
            }
            msg = Commands.newConsumerStatsResponse((PulsarApi.CommandConsumerStatsResponse.Builder)this.createConsumerStatsResponse(consumer, requestId));
        }
        this.ctx.writeAndFlush((Object)msg);
    }

    PulsarApi.CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, long requestId) {
        PulsarApi.CommandConsumerStatsResponse.Builder commandConsumerStatsResponseBuilder = PulsarApi.CommandConsumerStatsResponse.newBuilder();
        ConsumerStats consumerStats = consumer.getStats();
        commandConsumerStatsResponseBuilder.setRequestId(requestId);
        commandConsumerStatsResponseBuilder.setMsgRateOut(consumerStats.msgRateOut);
        commandConsumerStatsResponseBuilder.setMsgThroughputOut(consumerStats.msgThroughputOut);
        commandConsumerStatsResponseBuilder.setMsgRateRedeliver(consumerStats.msgRateRedeliver);
        commandConsumerStatsResponseBuilder.setConsumerName(consumerStats.consumerName);
        commandConsumerStatsResponseBuilder.setAvailablePermits((long)consumerStats.availablePermits);
        commandConsumerStatsResponseBuilder.setUnackedMessages((long)consumerStats.unackedMessages);
        commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs);
        commandConsumerStatsResponseBuilder.setAddress(consumerStats.address);
        commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince);
        Subscription subscription = consumer.getSubscription();
        commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
        commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
        commandConsumerStatsResponseBuilder.setType(subscription.getTypeString());
        return commandConsumerStatsResponseBuilder;
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        String version;
        Preconditions.checkArgument((this.state == State.Start ? 1 : 0) != 0);
        if (this.service.isAuthenticationEnabled()) {
            try {
                String authMethod = "none";
                if (connect.hasAuthMethodName()) {
                    authMethod = connect.getAuthMethodName();
                } else if (connect.hasAuthMethod()) {
                    authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
                }
                String authData = connect.getAuthData().toStringUtf8();
                ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
                SSLSession sslSession = null;
                if (sslHandler != null) {
                    sslSession = ((SslHandler)sslHandler).engine().getSession();
                }
                this.authRole = this.getBrokerService().getAuthenticationService().authenticate((AuthenticationDataSource)new AuthenticationDataCommand(authData, this.remoteAddress, sslSession), authMethod);
                log.info("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, authMethod, this.authRole});
            }
            catch (AuthenticationException e) {
                String msg = "Unable to authenticate";
                log.warn("[{}] {}: {}", new Object[]{this.remoteAddress, msg, e.getMessage()});
                this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
                this.close();
                return;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}", (Object)this.remoteAddress);
        }
        this.ctx.writeAndFlush((Object)Commands.newConnected((int)connect.getProtocolVersion()));
        this.state = State.Connected;
        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
        String string = version = connect.hasClientVersion() ? connect.getClientVersion() : null;
        if (StringUtils.isNotBlank((CharSequence)version) && !version.contains(" ")) {
            this.clientVersion = version;
        }
    }

    protected void handleSubscribe(PulsarApi.CommandSubscribe subscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture authorizationFuture = this.service.isAuthorizationEnabled() ? this.service.getAuthorizationManager().canConsumeAsync(DestinationName.get((String)subscribe.getTopic()), this.authRole) : CompletableFuture.completedFuture(true);
        String topicName = subscribe.getTopic();
        String subscriptionName = subscribe.getSubscription();
        long requestId = subscribe.getRequestId();
        long consumerId = subscribe.getConsumerId();
        PulsarApi.CommandSubscribe.SubType subType = subscribe.getSubType();
        String consumerName = subscribe.getConsumerName();
        boolean isDurable = subscribe.getDurable();
        MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new MessageIdImpl(subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition()) : null;
        int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        authorizationFuture.thenApply(isAuthorized -> {
            if (isAuthorized.booleanValue()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to subscribe with role {}", (Object)this.remoteAddress, (Object)this.authRole);
                }
                log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                CompletableFuture consumerFuture = new CompletableFuture();
                CompletableFuture existingConsumerFuture = (CompletableFuture)this.consumers.putIfAbsent(consumerId, consumerFuture);
                if (existingConsumerFuture != null) {
                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                        Consumer consumer2 = existingConsumerFuture.getNow(null);
                        log.info("[{}] Consumer with the same id is already created: {}", (Object)this.remoteAddress, (Object)consumer2);
                        this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
                        return null;
                    }
                    log.warn("[{}][{}][{}] Consumer is already present on the connection", new Object[]{this.remoteAddress, topicName, subscriptionName});
                    PulsarApi.ServerError error = !existingConsumerFuture.isDone() ? PulsarApi.ServerError.ServiceNotReady : this.getErrorCode(existingConsumerFuture);
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)error, (String)"Consumer is already present on the connection"));
                    return null;
                }
                ((CompletableFuture)((CompletableFuture)this.service.getTopic(topicName).thenCompose(topic -> topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, (MessageId)startMessageId))).thenAccept(consumer -> {
                    if (consumerFuture.complete(consumer)) {
                        log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                        this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId), this.ctx.voidPromise());
                    } else {
                        try {
                            consumer.close();
                            log.info("[{}] Cleared consumer created after timeout on client side {}", (Object)this.remoteAddress, consumer);
                        }
                        catch (BrokerServiceException e) {
                            log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                        }
                        this.consumers.remove(consumerId, (Object)consumerFuture);
                    }
                })).exceptionally(exception -> {
                    log.warn("[{}][{}][{}] Failed to create consumer: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage(), exception});
                    if (consumerFuture.completeExceptionally((Throwable)exception)) {
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(exception.getCause()), (String)exception.getCause().getMessage()));
                    }
                    this.consumers.remove(consumerId, (Object)consumerFuture);
                    return null;
                });
            } else {
                String msg = "Client is not authorized to subscribe";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.authRole});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
            }
            return null;
        });
    }

    protected void handleProducer(PulsarApi.CommandProducer cmdProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture authorizationFuture = this.service.isAuthorizationEnabled() ? this.service.getAuthorizationManager().canProduceAsync(DestinationName.get((String)cmdProducer.getTopic().toString()), this.authRole) : CompletableFuture.completedFuture(true);
        String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() : this.service.generateUniqueProducerName();
        String topicName = cmdProducer.getTopic();
        long producerId = cmdProducer.getProducerId();
        long requestId = cmdProducer.getRequestId();
        authorizationFuture.thenApply(isAuthorized -> {
            if (isAuthorized.booleanValue()) {
                CompletableFuture producerFuture;
                CompletableFuture existingProducerFuture;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to Produce with role {}", (Object)this.remoteAddress, (Object)this.authRole);
                }
                if ((existingProducerFuture = (CompletableFuture)this.producers.putIfAbsent(producerId, producerFuture = new CompletableFuture())) != null) {
                    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                        Producer producer = existingProducerFuture.getNow(null);
                        log.info("[{}] Producer with the same id is already created: {}", (Object)this.remoteAddress, (Object)producer);
                        this.ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)requestId, (String)producer.getProducerName()));
                        return null;
                    }
                    PulsarApi.ServerError error = !existingProducerFuture.isDone() ? PulsarApi.ServerError.ServiceNotReady : this.getErrorCode(existingProducerFuture);
                    log.warn("[{}][{}] Producer is already present on the connection", (Object)this.remoteAddress, (Object)topicName);
                    this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)error, (String)"Producer is already present on the connection"));
                    return null;
                }
                log.info("[{}][{}] Creating producer. producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                ((CompletableFuture)this.service.getTopic(topicName).thenAccept(topic -> {
                    if (topic.isBacklogQuotaExceeded(producerName)) {
                        IllegalStateException illegalStateException = new IllegalStateException("Cannot create producer on topic with backlog quota exceeded");
                        BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.ProducerBlockedQuotaExceededError, (String)illegalStateException.getMessage()));
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.ProducerBlockedQuotaExceededException, (String)illegalStateException.getMessage()));
                        }
                        producerFuture.completeExceptionally(illegalStateException);
                        this.producers.remove(producerId, (Object)producerFuture);
                        return;
                    }
                    this.disableTcpNoDelayIfNeeded(topicName, producerName);
                    Producer producer = new Producer((Topic)topic, this, producerId, producerName, this.authRole);
                    try {
                        topic.addProducer(producer);
                        if (this.isActive()) {
                            if (producerFuture.complete(producer)) {
                                log.info("[{}] Created new producer: {}", (Object)this.remoteAddress, (Object)producer);
                                this.ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)requestId, (String)producerName));
                                return;
                            }
                            producer.closeNow();
                            log.info("[{}] Cleared producer created after timeout on client side {}", (Object)this.remoteAddress, (Object)producer);
                        } else {
                            producer.closeNow();
                            log.info("[{}] Cleared producer created after connection was closed: {}", (Object)this.remoteAddress, (Object)producer);
                            producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
                        }
                    }
                    catch (BrokerServiceException ise) {
                        log.error("[{}] Failed to add producer to topic {}: {}", new Object[]{this.remoteAddress, topicName, ise.getMessage()});
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(ise), (String)ise.getMessage()));
                        producerFuture.completeExceptionally(ise);
                    }
                    this.producers.remove(producerId, (Object)producerFuture);
                })).exceptionally(exception -> {
                    Throwable cause = exception.getCause();
                    if (!(cause instanceof BrokerServiceException.ServiceUnitNotReadyException)) {
                        log.error("[{}] Failed to create topic {}", new Object[]{this.remoteAddress, topicName, exception});
                    }
                    if (producerFuture.completeExceptionally((Throwable)exception)) {
                        this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(cause), (String)cause.getMessage()));
                    }
                    this.producers.remove(producerId, (Object)producerFuture);
                    return null;
                });
            } else {
                String msg = "Client is not authorized to Produce";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.authRole});
                this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.AuthorizationError, (String)msg));
            }
            return null;
        });
    }

    protected void handleSend(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(send.getProducerId());
        if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
            log.warn("[{}] Producer had already been closed: {}", (Object)this.remoteAddress, (Object)send.getProducerId());
            return;
        }
        Producer producer = producerFuture.getNow(null);
        if (log.isDebugEnabled()) {
            this.printSendCommandDebug(send, headersAndPayload);
        }
        if (producer.isNonPersistentTopic() && this.nonPersistentPendingMessages++ > this.MaxNonPersistentPendingMessages) {
            long producerId = send.getProducerId();
            long sequenceId = send.getSequenceId();
            this.service.getTopicOrderedExecutor().submitOrdered((Object)producer.getTopic(), SafeRun.safeRun(() -> this.ctx.writeAndFlush((Object)Commands.newSendReceipt((long)producerId, (long)sequenceId, (long)-1L, (long)-1L), this.ctx.voidPromise())));
            producer.recordMessageDrop(send.getNumMessages());
            return;
        }
        this.startSendOperation();
        producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
    }

    private void printSendCommandDebug(PulsarApi.CommandSend send, ByteBuf headersAndPayload) {
        headersAndPayload.markReaderIndex();
        PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        headersAndPayload.resetReaderIndex();
        log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", new Object[]{this.remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes()});
        msgMetadata.recycle();
    }

    protected void handleAck(PulsarApi.CommandAck ack) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(ack.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).messageAcked(ack);
        }
    }

    protected void handleFlow(PulsarApi.CommandFlow flow) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, flow.getConsumerId(), flow.getMessagePermits()});
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(flow.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).flowPermits(flow.getMessagePermits());
        }
    }

    protected void handleRedeliverUnacknowledged(PulsarApi.CommandRedeliverUnacknowledgedMessages redeliver) {
        CompletableFuture consumerFuture;
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Resend Command from consumer {} ", (Object)this.remoteAddress, (Object)redeliver.getConsumerId());
        }
        if ((consumerFuture = (CompletableFuture)this.consumers.get(redeliver.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == PulsarApi.CommandSubscribe.SubType.Shared) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
            } else {
                consumer.redeliverUnacknowledgedMessages();
            }
        }
    }

    protected void handleUnsubscribe(PulsarApi.CommandUnsubscribe unsubscribe) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(unsubscribe.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).doUnsubscribe(unsubscribe.getRequestId());
        } else {
            this.ctx.writeAndFlush((Object)Commands.newError((long)unsubscribe.getRequestId(), (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
        }
    }

    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        long producerId = closeProducer.getProducerId();
        long requestId = closeProducer.getRequestId();
        CompletableFuture producerFuture = (CompletableFuture)this.producers.get(producerId);
        if (producerFuture == null) {
            log.warn("[{}] Producer {} was not registered on the connection", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.UnknownError, (String)"Producer was not registered on the connection"));
            return;
        }
        if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer {} before its creation was completed", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (producerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer {} that already failed to be created", (Object)this.remoteAddress, (Object)producerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        Producer producer = producerFuture.getNow(null);
        log.info("[{}][{}] Closing producer on cnx {}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress});
        producer.close().thenAccept(v -> {
            log.info("[{}][{}] Closed producer on cnx {}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress});
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            this.producers.remove(producerId, (Object)producerFuture);
        });
    }

    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        log.info("[{}] Closing consumer: {}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long requestId = closeConsumer.getRequestId();
        long consumerId = closeConsumer.getConsumerId();
        CompletableFuture consumerFuture = (CompletableFuture)this.consumers.get(consumerId);
        if (consumerFuture == null) {
            log.warn("[{}] Consumer was not registered on the connection: {}", (Object)consumerId, (Object)this.remoteAddress);
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Consumer not found"));
            return;
        }
        if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer {} before its creation was completed", (Object)this.remoteAddress, (Object)consumerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        if (consumerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer {} that already failed to be created", (Object)this.remoteAddress, (Object)consumerId);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            return;
        }
        Consumer consumer = consumerFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, (Object)consumerFuture);
            this.ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
            log.info("[{}] Closed consumer {}", (Object)this.remoteAddress, (Object)consumer);
        }
        catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer: ", new Object[]{this.remoteAddress, consumer, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(e), (String)e.getMessage()));
        }
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public void closeProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", (Object)this.remoteAddress, (Object)producer);
        }
        long producerId = producer.getProducerId();
        this.producers.remove(producerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseProducer((long)producerId, (long)-1L));
        } else {
            this.close();
        }
    }

    public void closeConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", (Object)this.remoteAddress, (Object)consumer);
        }
        long consumerId = consumer.consumerId();
        this.consumers.remove(consumerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush((Object)Commands.newCloseConsumer((long)consumerId, (long)-1L));
        } else {
            this.close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    public void removedConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", (Object)this.remoteAddress, (Object)consumer);
        }
        this.consumers.remove(consumer.consumerId());
    }

    public void removedProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", (Object)this.remoteAddress, (Object)producer);
        }
        this.producers.remove(producer.getProducerId());
    }

    public boolean isActive() {
        return this.isActive;
    }

    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation() {
        if (++this.pendingSendRequest == 1000) {
            this.ctx.channel().config().setAutoRead(false);
        }
    }

    public void completedSendOperation(boolean isNonPersistentTopic) {
        if (--this.pendingSendRequest == 500) {
            this.ctx.channel().config().setAutoRead(true);
        }
        if (isNonPersistentTopic) {
            --this.nonPersistentPendingMessages;
        }
    }

    private <T> PulsarApi.ServerError getErrorCode(CompletableFuture<T> future) {
        PulsarApi.ServerError error;
        block2: {
            error = PulsarApi.ServerError.UnknownError;
            try {
                future.getNow(null);
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException)) break block2;
                error = BrokerServiceException.getClientErrorCode((BrokerServiceException)e.getCause());
            }
        }
        return error;
    }

    private final void disableTcpNoDelayIfNeeded(String topic, String producerName) {
        if (producerName != null && producerName.startsWith(this.replicatorPrefix)) {
            try {
                if (((Boolean)this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                    this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, (Object)false);
                }
            }
            catch (Throwable throwable) {
                log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{topic, producerName, this.ctx.channel()});
            }
        }
    }

    public State getState() {
        return this.state;
    }

    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    boolean hasConsumer(long consumerId) {
        return this.consumers.containsKey(consumerId);
    }

    public boolean isBatchMessageCompatibleVersion() {
        return this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v4.getNumber();
    }

    public String getClientVersion() {
        return this.clientVersion;
    }

    static enum State {
        Start,
        Connected;

    }
}

