/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.proxy.server.BrokerProxyValidator;
import org.apache.pulsar.proxy.server.DirectProxyHandler;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyClientCnx;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.apache.pulsar.proxy.server.TargetAddressDeniedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyConnection
extends PulsarHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
    private ConnectionPool connectionPool;
    private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0L, 0x3FFFFFFFFFFFFFFFL));
    private final ProxyService service;
    private final DnsAddressResolverGroup dnsAddressResolverGroup;
    AuthenticationDataSource authenticationData;
    private State state;
    private LookupProxyHandler lookupProxyHandler = null;
    private DirectProxyHandler directProxyHandler = null;
    private final BrokerProxyValidator brokerProxyValidator;
    String clientAuthRole;
    AuthData clientAuthData;
    String clientAuthMethod;
    private String authMethod = "none";
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    private ClientConfigurationData clientConf;
    private boolean hasProxyToBrokerUrl;
    private int protocolVersionToAdvertise;
    private String proxyToBrokerUrl;
    private HAProxyMessage haProxyMessage;
    private static final byte[] EMPTY_CREDENTIALS = new byte[0];

    ConnectionPool getConnectionPool() {
        return this.connectionPool;
    }

    public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAddressResolverGroup) {
        super(30, TimeUnit.SECONDS);
        this.service = proxyService;
        this.dnsAddressResolverGroup = dnsAddressResolverGroup;
        this.state = State.Init;
        this.brokerProxyValidator = this.service.getBrokerProxyValidator();
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        ProxyService.activeConnections.inc();
        if (ProxyService.activeConnections.get() > (double)this.service.getConfiguration().getMaxConcurrentInboundConnections()) {
            this.state = State.Closing;
            ctx.close();
            ProxyService.rejectedConnections.inc();
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        ProxyService.activeConnections.dec();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ProxyService.newConnections.inc();
        this.service.getClientCnxs().add(this);
        LOG.info("[{}] New connection opened", (Object)this.remoteAddress);
    }

    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.directProxyHandler != null) {
            this.directProxyHandler.close();
            this.directProxyHandler = null;
        }
        this.service.getClientCnxs().remove((Object)this);
        LOG.info("[{}] Connection closed", (Object)this.remoteAddress);
        if (this.connectionPool != null) {
            try {
                this.connectionPool.close();
                this.connectionPool = null;
            }
            catch (Exception e) {
                LOG.error("Failed to close connection pool {}", (Object)e.getMessage(), (Object)e);
            }
        }
        this.state = State.Closed;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        LOG.warn("[{}] Got exception {} : Message: {} State: {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), this.state, ClientCnx.isKnownException((Throwable)cause) ? null : cause});
        if (this.state != State.Closed) {
            this.state = State.Closing;
        }
        if (ctx.channel().isOpen()) {
            ctx.close();
        } else if (this.directProxyHandler != null) {
            this.directProxyHandler.close();
            this.directProxyHandler = null;
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable());
        }
        super.channelWritabilityChanged(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HAProxyMessage) {
            this.haProxyMessage = (HAProxyMessage)msg;
            return;
        }
        switch (this.state) {
            case Init: 
            case Connecting: 
            case ProxyLookupRequests: {
                super.channelRead(ctx, msg);
                break;
            }
            case ProxyConnectionToBroker: {
                if (this.directProxyHandler != null) {
                    ProxyService.opsCounter.inc();
                    if (msg instanceof ByteBuf) {
                        int bytes = ((ByteBuf)msg).readableBytes();
                        this.directProxyHandler.getInboundChannelRequestsRate().recordEvent((long)bytes);
                        ProxyService.bytesCounter.inc((double)bytes);
                    }
                    this.directProxyHandler.outboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                    break;
                }
                LOG.warn("Received message of type {} while connection to broker is missing in state {}. Dropping the input message (readable bytes={}).", new Object[]{msg.getClass(), this.state, msg instanceof ByteBuf ? ((ByteBuf)msg).readableBytes() : -1});
                break;
            }
            case ProxyConnectingToBroker: {
                LOG.warn("Received message of type {} while connecting to broker. Dropping the input message (readable bytes={}).", msg.getClass(), (Object)(msg instanceof ByteBuf ? ((ByteBuf)msg).readableBytes() : -1));
                break;
            }
        }
    }

    private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
        Supplier<ClientCnx> clientCnxSupplier;
        if (this.service.getConfiguration().isAuthenticationEnabled()) {
            if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
                this.clientAuthData = clientData;
                this.clientAuthMethod = this.authMethod;
            }
            clientCnxSupplier = () -> new ProxyClientCnx(this.clientConf, this.service.getWorkerGroup(), this.clientAuthRole, this.clientAuthData, this.clientAuthMethod, this.protocolVersionToAdvertise, this.service.getConfiguration().isForwardAuthorizationCredentials(), this);
        } else {
            clientCnxSupplier = () -> new ClientCnx(this.clientConf, this.service.getWorkerGroup(), this.protocolVersionToAdvertise);
        }
        if (this.connectionPool == null) {
            this.connectionPool = new ConnectionPool(this.clientConf, this.service.getWorkerGroup(), clientCnxSupplier, Optional.of(this.dnsAddressResolverGroup.getResolver((EventExecutor)this.service.getWorkerGroup().next())));
        } else {
            LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", new Object[]{this.remoteAddress, this.state, this.clientAuthRole});
        }
        LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole, this.hasProxyToBrokerUrl});
        if (this.hasProxyToBrokerUrl) {
            if (this.service.getConfiguration().isCheckActiveBrokers() && !this.isBrokerActive(this.proxyToBrokerUrl)) {
                this.state = State.Closing;
                LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole});
                this.ctx().writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.ServiceNotReady, (String)"Target broker isn't available.")).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                return;
            }
            this.state = State.ProxyConnectingToBroker;
            ((CompletableFuture)this.brokerProxyValidator.resolveAndCheckTargetAddress(this.proxyToBrokerUrl).thenAcceptAsync(this::connectToBroker, (Executor)this.ctx.executor())).exceptionally(throwable -> {
                if (throwable instanceof TargetAddressDeniedException || throwable.getCause() instanceof TargetAddressDeniedException) {
                    TargetAddressDeniedException targetAddressDeniedException = (TargetAddressDeniedException)(throwable instanceof TargetAddressDeniedException ? throwable : throwable.getCause());
                    LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, targetAddressDeniedException.getMessage(), this.authMethod, this.clientAuthRole});
                } else {
                    LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole, throwable});
                }
                this.ctx().writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.ServiceNotReady, (String)"Target broker cannot be validated.")).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                return null;
            });
        } else {
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = new LookupProxyHandler(this.service, this);
            this.ctx.writeAndFlush((Object)Commands.newConnected((int)this.protocolVersionToAdvertise)).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        }
    }

    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
        Preconditions.checkState((boolean)this.ctx.executor().inEventLoop(), (Object)"This method should be called in the event loop");
        if (this.state == State.ProxyConnectingToBroker && this.ctx.channel().isOpen() && this.directProxyHandler == null) {
            this.directProxyHandler = directProxyHandler;
            this.state = State.ProxyConnectionToBroker;
            int maxMessageSize = connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : -1;
            this.ctx.writeAndFlush((Object)Commands.newConnected((int)connected.getProtocolVersion(), (int)maxMessageSize)).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        } else {
            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. Closing connection to broker '{}'.", new Object[]{this.remoteAddress, this.ctx.channel().isOpen() ? "open" : "already closed", this.state != State.ProxyConnectingToBroker ? "invalid state " + (Object)((Object)this.state) : "state " + (Object)((Object)this.state), this.proxyToBrokerUrl});
            directProxyHandler.close();
            this.ctx.close();
        }
    }

    private void connectToBroker(InetSocketAddress brokerAddress) {
        Preconditions.checkState((boolean)this.ctx.executor().inEventLoop(), (Object)"This method should be called in the event loop");
        DirectProxyHandler directProxyHandler = new DirectProxyHandler(this.service, this);
        directProxyHandler.connect(this.proxyToBrokerUrl, brokerAddress, this.protocolVersionToAdvertise);
    }

    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
        try {
            CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
            this.ctx.executor().submit(() -> this.handleBrokerConnected(directProxyHandler, finalConnected));
        }
        catch (RejectedExecutionException e) {
            LOG.error("Event loop was already closed. Closing broker connection.", (Throwable)e);
            directProxyHandler.close();
        }
    }

    private void doAuthentication(AuthData clientData) throws Exception {
        AuthData brokerData = this.authState.authenticate(clientData);
        if (this.authState.isComplete()) {
            this.clientAuthRole = this.authState.getAuthRole();
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole});
            }
            if (this.connectionPool == null || this.state == State.Connecting) {
                this.completeConnect(clientData);
            }
            return;
        }
        this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)this.protocolVersionToAdvertise)).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
        }
        this.state = State.Connecting;
    }

    protected void handleConnect(CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Init ? 1 : 0) != 0);
        this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
        this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
        this.protocolVersionToAdvertise = ProxyConnection.getProtocolVersionToAdvertise(connect);
        String string = this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", (Object)this.remoteAddress, (Object)this.proxyToBrokerUrl);
            LOG.debug("[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", new Object[]{this.remoteAddress, this.protocolVersionToAdvertise, this.getRemoteEndpointProtocolVersion(), Commands.getCurrentProtocolVersion()});
        }
        if (this.getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", (Object)this.remoteAddress);
            this.state = State.Closing;
            this.ctx.close();
            return;
        }
        try {
            this.clientConf = this.createClientConfiguration();
            if (!this.service.getConfiguration().isAuthenticationEnabled()) {
                this.completeConnect(null);
                return;
            }
            AuthData clientData = AuthData.of((byte[])(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS));
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.service.getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.clientAuthRole = (String)this.service.getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect(clientData);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.authenticationData = this.authState.getAuthDataSource();
            this.doAuthentication(clientData);
        }
        catch (Exception e) {
            LOG.warn("[{}] Unable to authenticate: ", (Object)this.remoteAddress, (Object)e);
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)"Failed to authenticate")).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    protected void handleAuthResponse(CommandAuthResponse authResponse) {
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData());
            this.doAuthentication(clientData);
            if (this.service.getConfiguration().isForwardAuthorizationCredentials() && this.connectionPool != null && this.state == State.ProxyLookupRequests) {
                this.connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
                    String clientVersion = authResponse.hasClientVersion() ? authResponse.getClientVersion() : PulsarVersion.getVersion();
                    int protocolVersion = authResponse.hasProtocolVersion() ? authResponse.getProtocolVersion() : Commands.getCurrentProtocolVersion();
                    ByteBuf cmd = Commands.newAuthResponse((String)this.clientAuthMethod, (AuthData)clientData, (int)protocolVersion, (String)clientVersion);
                    ((CompletableFuture)toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush((Object)cmd).addListener(writeFuture -> {
                        if (writeFuture.isSuccess()) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("{} authentication is refreshed successfully by {}, auth method: {} ", new Object[]{toBrokerCnx.ctx().channel(), this.ctx.channel(), this.clientAuthMethod});
                            }
                        } else {
                            LOG.error("Failed to forward the auth response from the proxy to the broker through the proxy client, proxy: {}, proxy client: {}", new Object[]{this.ctx.channel(), toBrokerCnx.ctx().channel(), writeFuture.cause()});
                            toBrokerCnx.ctx().channel().pipeline().fireExceptionCaught(writeFuture.cause());
                        }
                    }))).whenComplete((__, ex) -> {
                        if (ex != null) {
                            LOG.error("Failed to forward the auth response from the proxy to the broker through the proxy client, proxy: {}", (Object)this.ctx().channel(), ex);
                        }
                    });
                });
            }
        }
        catch (Exception e) {
            String msg = "Unable to handleAuthResponse";
            LOG.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)msg)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
    }

    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
    }

    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetSchema(commandGetSchema);
    }

    protected void handleLookup(CommandLookupTopic lookup) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleLookup(lookup);
    }

    ClientConfigurationData createClientConfiguration() {
        ClientConfigurationData initialConf = new ClientConfigurationData();
        ProxyConfiguration proxyConfig = this.service.getConfiguration();
        initialConf.setServiceUrl(proxyConfig.isTlsEnabledWithBroker() ? this.service.getServiceUrlTls() : this.service.getServiceUrl());
        Map overrides = PropertiesUtils.filterAndMapProperties((Properties)proxyConfig.getProperties(), (String)"brokerClient_");
        ClientConfigurationData clientConf = (ClientConfigurationData)ConfigurationDataUtils.loadData((Map)overrides, (Object)initialConf, ClientConfigurationData.class);
        clientConf.setAuthentication(this.getClientAuthentication());
        if (proxyConfig.isTlsEnabledWithBroker()) {
            clientConf.setUseTls(true);
            clientConf.setTlsHostnameVerificationEnable(proxyConfig.isTlsHostnameVerificationEnabled());
            if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
                clientConf.setUseKeyStoreTls(true);
                clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
                clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
                clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
            } else {
                clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
                clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
            }
        }
        return clientConf;
    }

    private static int getProtocolVersionToAdvertise(CommandConnect connect) {
        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
    }

    long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public Authentication getClientAuthentication() {
        return this.service.getProxyClientAuthenticationPlugin();
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public boolean hasHAProxyMessage() {
        return this.haProxyMessage != null;
    }

    public HAProxyMessage getHAProxyMessage() {
        return this.haProxyMessage;
    }

    private boolean isBrokerActive(String targetBrokerHostPort) {
        for (ServiceLookupData serviceLookupData : this.getAvailableBrokers()) {
            if (!ProxyConnection.matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort) && !ProxyConnection.matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(), targetBrokerHostPort)) continue;
            return true;
        }
        return false;
    }

    private List<? extends ServiceLookupData> getAvailableBrokers() {
        if (this.service.getDiscoveryProvider() == null) {
            LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null.zookeeperServers and configurationStoreServers must be configured in proxy configuration when checkActiveBrokers is enabled.");
            return Collections.emptyList();
        }
        try {
            return this.service.getDiscoveryProvider().getAvailableBrokers();
        }
        catch (PulsarServerException e) {
            LOG.error("Unable to get available brokers", (Throwable)e);
            return Collections.emptyList();
        }
    }

    static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
        return pulsarServiceUrl != null && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length() && pulsarServiceUrl.startsWith(expectedPrefix) && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
    }

    public DirectProxyHandler getDirectProxyHandler() {
        return this.directProxyHandler;
    }

    static enum State {
        Init,
        Connecting,
        ProxyLookupRequests,
        ProxyConnectingToBroker,
        ProxyConnectionToBroker,
        Closing,
        Closed;

    }
}

