/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.HttpClient;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCnx
extends PulsarHandler {
    private final Authentication authentication;
    private State state;
    private final ConcurrentLongHashMap<CompletableFuture<Pair<String, Long>>> pendingRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<CompletableFuture<BinaryProtoLookupService.LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap(16, 1);
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture();
    private final Semaphore pendingLookupRequestSemaphore;
    private final EventLoopGroup eventLoopGroup;
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ClientCnx.class, "numberOfRejectRequests");
    private volatile int numberOfRejectRequests = 0;
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    private String proxyToTargetBrokerAddress = null;
    private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);

    public ClientCnx(PulsarClientImpl pulsarClient) {
        super(30, TimeUnit.SECONDS);
        this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(), true);
        this.authentication = pulsarClient.getConfiguration().getAuthentication();
        this.eventLoopGroup = pulsarClient.eventLoopGroup();
        this.maxNumberOfRejectedRequestPerConnection = pulsarClient.getConfiguration().getMaxNumberOfRejectedRequestPerConnection();
        this.state = State.None;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        if (this.proxyToTargetBrokerAddress == null) {
            log.info("{} Connected to broker", (Object)ctx.channel());
        } else {
            log.info("{} Connected through proxy to target broker at {}", (Object)ctx.channel(), (Object)this.proxyToTargetBrokerAddress);
        }
        String authData = "";
        if (this.authentication.getAuthData().hasDataFromCommand()) {
            authData = this.authentication.getAuthData().getCommandData();
        }
        ctx.writeAndFlush((Object)Commands.newConnect((String)this.authentication.getAuthMethodName(), (String)authData, (String)HttpClient.getPulsarClientVersion(), (String)this.proxyToTargetBrokerAddress)).addListener(future -> {
            if (future.isSuccess()) {
                if (log.isDebugEnabled()) {
                    log.debug("Complete: {}", (Object)future.isSuccess());
                }
                this.state = State.SentConnectFrame;
            } else {
                log.warn("Error during handshake", future.cause());
                ctx.close();
            }
        });
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("{} Disconnected", (Object)ctx.channel());
        if (!this.connectionFuture.isDone()) {
            this.connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
        }
        PulsarClientException e = new PulsarClientException("Disconnected from server at " + ctx.channel().remoteAddress());
        this.pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.producers.forEach((id, producer) -> producer.connectionClosed(this));
        this.consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("{} Exception caught: {}", new Object[]{ctx.channel(), cause.getMessage(), cause});
        ctx.close();
    }

    protected void handleConnected(PulsarApi.CommandConnected connected) {
        Preconditions.checkArgument((this.state == State.SentConnectFrame ? 1 : 0) != 0);
        log.info("{} Connection is ready", (Object)this.ctx.channel());
        this.connectionFuture.complete(null);
        this.remoteEndpointProtocolVersion = connected.getProtocolVersion();
        this.state = State.Ready;
    }

    protected void handleSendReceipt(PulsarApi.CommandSendReceipt sendReceipt) {
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        long producerId = sendReceipt.getProducerId();
        long sequenceId = sendReceipt.getSequenceId();
        long ledgerId = -1L;
        long entryId = -1L;
        if (sendReceipt.hasMessageId()) {
            ledgerId = sendReceipt.getMessageId().getLedgerId();
            entryId = sendReceipt.getMessageId().getEntryId();
        }
        if (ledgerId == -1L && entryId == -1L) {
            log.warn("[{}] Message has been dropped for non-persistent topic producer-id {}", (Object)this.ctx.channel(), (Object)producerId);
        }
        if (log.isDebugEnabled()) {
            log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", new Object[]{this.ctx.channel(), producerId, sequenceId, ledgerId, entryId});
        }
        ((ProducerImpl)this.producers.get(producerId)).ackReceived(this, sequenceId, ledgerId, entryId);
    }

    protected void handleMessage(PulsarApi.CommandMessage cmdMessage, ByteBuf headersAndPayload) {
        ConsumerImpl consumer;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received a message from the server: {}", (Object)this.ctx.channel(), (Object)cmdMessage);
        }
        if ((consumer = (ConsumerImpl)this.consumers.get(cmdMessage.getConsumerId())) != null) {
            consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this);
        }
    }

    protected void handleSuccess(PulsarApi.CommandSuccess success) {
        long requestId;
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received success response from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(null);
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleProducerSuccess(PulsarApi.CommandProducerSuccess success) {
        long requestId;
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received producer success response from server: {} - producer-name: {}", new Object[]{this.ctx.channel(), success.getRequestId(), success.getProducerName()});
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(new ImmutablePair((Object)success.getProducerName(), (Object)success.getLastSequenceId()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleLookupResponse(PulsarApi.CommandLookupTopicResponse lookupResult) {
        log.info("Received Broker lookup response: {}", (Object)lookupResult.getResponse());
        long requestId = lookupResult.getRequestId();
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture = this.getAndRemovePendingLookupRequest(requestId);
        if (requestFuture != null) {
            if (!lookupResult.hasResponse() || PulsarApi.CommandLookupTopicResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    protected void handlePartitionResponse(PulsarApi.CommandPartitionedTopicMetadataResponse lookupResult) {
        log.info("Received Broker Partition response: {}", (Object)lookupResult.getPartitions());
        long requestId = lookupResult.getRequestId();
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture = this.getAndRemovePendingLookupRequest(requestId);
        if (requestFuture != null) {
            if (!lookupResult.hasResponse() || PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult.getPartitions()));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    protected void handleReachedEndOfTopic(PulsarApi.CommandReachedEndOfTopic commandReachedEndOfTopic) {
        long consumerId = commandReachedEndOfTopic.getConsumerId();
        log.info("[{}] Broker notification reached the end of topic: {}", (Object)this.remoteAddress, (Object)consumerId);
        ConsumerImpl consumer = (ConsumerImpl)this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.setTerminated();
        }
    }

    private boolean addPendingLookupRequests(long requestId, CompletableFuture<BinaryProtoLookupService.LookupDataResult> future) {
        if (this.pendingLookupRequestSemaphore.tryAcquire()) {
            this.pendingLookupRequests.put(requestId, future);
            return true;
        }
        return false;
    }

    private CompletableFuture<BinaryProtoLookupService.LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
        CompletableFuture result = (CompletableFuture)this.pendingLookupRequests.remove(requestId);
        if (result != null) {
            this.pendingLookupRequestSemaphore.release();
        }
        return result;
    }

    protected void handleSendError(PulsarApi.CommandSendError sendError) {
        log.warn("{} Received send error from server: {} : {}", new Object[]{this.ctx.channel(), sendError.getError(), sendError.getMessage()});
        long producerId = sendError.getProducerId();
        long sequenceId = sendError.getSequenceId();
        switch (sendError.getError()) {
            case ChecksumError: {
                ((ProducerImpl)this.producers.get(producerId)).recoverChecksumError(this, sequenceId);
                break;
            }
            case TopicTerminatedError: {
                ((ProducerImpl)this.producers.get(producerId)).terminated(this);
                break;
            }
            default: {
                this.ctx.close();
            }
        }
    }

    protected void handleError(PulsarApi.CommandError error) {
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        log.warn("{} Received error from server: {}", (Object)this.ctx.channel(), (Object)error.getMessage());
        long requestId = error.getRequestId();
        if (error.getError() == PulsarApi.ServerError.ProducerBlockedQuotaExceededError) {
            log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", (Object)this.ctx.channel());
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId)) != null) {
            requestFuture.completeExceptionally(this.getPulsarClientException(error.getError(), error.getMessage()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)error.getRequestId());
        }
    }

    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", (Object)this.remoteAddress, (Object)closeProducer.getProducerId());
        long producerId = closeProducer.getProducerId();
        ProducerImpl producer = (ProducerImpl)this.producers.get(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", (Object)producerId);
        }
    }

    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        log.info("[{}] Broker notification of Closed consumer: {}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long consumerId = closeConsumer.getConsumerId();
        ConsumerImpl consumer = (ConsumerImpl)this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.connectionClosed(this);
        } else {
            log.warn("Consumer with id {} not found while closing consumer ", (Object)consumerId);
        }
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Ready;
    }

    public CompletableFuture<BinaryProtoLookupService.LookupDataResult> newLookup(ByteBuf request, long requestId) {
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> future = new CompletableFuture<BinaryProtoLookupService.LookupDataResult>();
        if (this.addPendingLookupRequests(requestId, future)) {
            this.ctx.writeAndFlush((Object)request).addListener(writeFuture -> {
                if (!writeFuture.isSuccess()) {
                    log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), requestId, writeFuture.cause().getMessage()});
                    this.getAndRemovePendingLookupRequest(requestId);
                    future.completeExceptionally(writeFuture.cause());
                }
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("{} Failed to add lookup-request into pending queue", (Object)requestId);
            }
            future.completeExceptionally(new PulsarClientException.TooManyRequestsException("Failed due to too many pending lookup requests"));
        }
        return future;
    }

    Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    Channel channel() {
        return this.ctx.channel();
    }

    SocketAddress serverAddrees() {
        return this.remoteAddress;
    }

    CompletableFuture<Void> connectionFuture() {
        return this.connectionFuture;
    }

    CompletableFuture<Pair<String, Long>> sendRequestWithId(ByteBuf cmd, long requestId) {
        CompletableFuture<Pair<String, Long>> future = new CompletableFuture<Pair<String, Long>>();
        this.pendingRequests.put(requestId, future);
        this.ctx.writeAndFlush((Object)cmd).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send request to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                this.pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        return future;
    }

    private void checkServerError(PulsarApi.ServerError error, String errMsg) {
        if (PulsarApi.ServerError.ServiceNotReady.equals((Object)error)) {
            log.error("{} Close connection becaues received internal-server error {}", (Object)this.ctx.channel(), (Object)errMsg);
            this.ctx.close();
        } else if (PulsarApi.ServerError.TooManyRequests.equals((Object)error)) {
            long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
            if (rejectedRequests == 0L) {
                this.eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(this, 0), 60L, TimeUnit.SECONDS);
            } else if (rejectedRequests >= (long)this.maxNumberOfRejectedRequestPerConnection) {
                log.error("{} Close connection becaues received {} rejected request in {} seconds ", new Object[]{this.ctx.channel(), NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(this), 60});
                this.ctx.close();
            }
        }
    }

    void registerConsumer(long consumerId, ConsumerImpl consumer) {
        this.consumers.put(consumerId, (Object)consumer);
    }

    void registerProducer(long producerId, ProducerImpl producer) {
        this.producers.put(producerId, (Object)producer);
    }

    void removeProducer(long producerId) {
        this.producers.remove(producerId);
    }

    void removeConsumer(long consumerId) {
        this.consumers.remove(consumerId);
    }

    void setTargetBroker(InetSocketAddress targetBrokerAddress) {
        this.proxyToTargetBrokerAddress = String.format("%s:%d", targetBrokerAddress.getHostString(), targetBrokerAddress.getPort());
    }

    private PulsarClientException getPulsarClientException(PulsarApi.ServerError error, String errorMsg) {
        switch (error) {
            case AuthenticationError: {
                return new PulsarClientException.AuthenticationException(errorMsg);
            }
            case AuthorizationError: {
                return new PulsarClientException.AuthorizationException(errorMsg);
            }
            case ConsumerBusy: {
                return new PulsarClientException.ConsumerBusyException(errorMsg);
            }
            case MetadataError: {
                return new PulsarClientException.BrokerMetadataException(errorMsg);
            }
            case PersistenceError: {
                return new PulsarClientException.BrokerPersistenceException(errorMsg);
            }
            case ServiceNotReady: {
                return new PulsarClientException.LookupException(errorMsg);
            }
            case TooManyRequests: {
                return new PulsarClientException.TooManyRequestsException(errorMsg);
            }
            case ProducerBlockedQuotaExceededError: {
                return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg);
            }
            case ProducerBlockedQuotaExceededException: {
                return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
            }
            case TopicTerminatedError: {
                return new PulsarClientException.TopicTerminatedException(errorMsg);
            }
        }
        return new PulsarClientException(errorMsg);
    }

    static enum State {
        None,
        SentConnectFrame,
        Ready;

    }
}

