/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.proxy.server.DirectProxyHandler;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyClientCnx;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnectionPool;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyConnection
extends PulsarHandler
implements FutureListener<Void> {
    private PulsarClientImpl client;
    private ProxyService service;
    private Authentication clientAuthentication;
    AuthenticationDataSource authenticationData;
    private State state;
    private final Supplier<SslHandler> sslHandlerSupplier;
    private LookupProxyHandler lookupProxyHandler = null;
    private DirectProxyHandler directProxyHandler = null;
    String clientAuthRole;
    AuthData clientAuthData;
    String clientAuthMethod;
    private String authMethod = "none";
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    private ClientConfigurationData clientConf;
    private boolean hasProxyToBrokerUrl;
    private int protocolVersionToAdvertise;
    private String proxyToBrokerUrl;
    private HAProxyMessage haProxyMessage;
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);

    ConnectionPool getConnectionPool() {
        return this.client.getCnxPool();
    }

    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
        super(30, TimeUnit.SECONDS);
        this.service = proxyService;
        this.state = State.Init;
        this.sslHandlerSupplier = sslHandlerSupplier;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        ProxyService.activeConnections.inc();
        if (ProxyService.activeConnections.get() > (double)this.service.getConfiguration().getMaxConcurrentInboundConnections()) {
            ctx.close();
            ProxyService.rejectedConnections.inc();
            return;
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        ProxyService.activeConnections.dec();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ProxyService.newConnections.inc();
        this.service.getClientCnxs().add(this);
        LOG.info("[{}] New connection opened", (Object)this.remoteAddress);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        this.service.getClientCnxs().remove((Object)this);
        LOG.info("[{}] Connection closed", (Object)this.remoteAddress);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        LOG.warn("[{}] Got exception {} : {} {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), ClientCnx.isKnownException((Throwable)cause) ? null : cause});
        ctx.close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HAProxyMessage) {
            this.haProxyMessage = (HAProxyMessage)msg;
            return;
        }
        switch (this.state) {
            case Init: 
            case Connecting: 
            case ProxyLookupRequests: {
                super.channelRead(ctx, msg);
                break;
            }
            case ProxyConnectionToBroker: {
                ProxyService.opsCounter.inc();
                if (msg instanceof ByteBuf) {
                    int bytes = ((ByteBuf)msg).readableBytes();
                    this.directProxyHandler.getInboundChannelRequestsRate().recordEvent((long)bytes);
                    ProxyService.bytesCounter.inc((double)bytes);
                }
                this.directProxyHandler.outboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)this);
                break;
            }
        }
    }

    public void operationComplete(Future<Void> future) throws Exception {
        if (future.isSuccess()) {
            this.ctx.read();
        } else {
            LOG.warn("[{}] Error in writing to inbound channel. Closing", (Object)this.remoteAddress, (Object)future.cause());
            this.directProxyHandler.outboundChannel.close();
        }
    }

    private void completeConnect() {
        LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole, this.hasProxyToBrokerUrl});
        if (this.hasProxyToBrokerUrl) {
            this.state = State.ProxyConnectionToBroker;
            this.directProxyHandler = new DirectProxyHandler(this.service, this, this.proxyToBrokerUrl, this.protocolVersionToAdvertise, this.sslHandlerSupplier);
            this.cancelKeepAliveTask();
        } else {
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = new LookupProxyHandler(this.service, this);
            this.ctx.writeAndFlush((Object)Commands.newConnected((int)this.protocolVersionToAdvertise));
        }
    }

    private void createClientAndCompleteConnect(AuthData clientData) throws PulsarClientException {
        if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
            this.clientAuthData = clientData;
            this.clientAuthMethod = this.authMethod;
        }
        this.client = this.createClient(this.clientConf, this.clientAuthData, this.clientAuthMethod, this.protocolVersionToAdvertise);
        this.completeConnect();
    }

    private void doAuthentication(AuthData clientData) throws Exception {
        AuthData brokerData = this.authState.authenticate(clientData);
        if (this.authState.isComplete()) {
            this.clientAuthRole = this.authState.getAuthRole();
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole});
            }
            this.createClientAndCompleteConnect(clientData);
            return;
        }
        this.ctx.writeAndFlush((Object)Commands.newAuthChallenge((String)this.authMethod, (AuthData)brokerData, (int)this.protocolVersionToAdvertise));
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
        }
        this.state = State.Connecting;
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Init ? 1 : 0) != 0);
        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
        this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
        this.protocolVersionToAdvertise = ProxyConnection.getProtocolVersionToAdvertise(connect);
        String string = this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", (Object)this.remoteAddress, (Object)this.proxyToBrokerUrl);
            LOG.debug("[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", new Object[]{this.remoteAddress, this.protocolVersionToAdvertise, this.remoteEndpointProtocolVersion, Commands.getCurrentProtocolVersion()});
        }
        if (this.remoteEndpointProtocolVersion < 10) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", (Object)this.remoteAddress);
            this.ctx.close();
            return;
        }
        try {
            this.clientConf = this.createClientConfiguration();
            this.clientAuthentication = this.clientConf.getAuthentication();
            int protocolVersion = ProxyConnection.getProtocolVersionToAdvertise(connect);
            if (!this.service.getConfiguration().isAuthenticationEnabled()) {
                this.client = new PulsarClientImpl(this.clientConf, this.service.getWorkerGroup(), (ConnectionPool)new ProxyConnectionPool(this.clientConf, this.service.getWorkerGroup(), () -> new ClientCnx(this.clientConf, this.service.getWorkerGroup(), protocolVersion)), this.service.getTimer());
                this.completeConnect();
                return;
            }
            AuthData clientData = AuthData.of((byte[])connect.getAuthData().toByteArray());
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.service.getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.clientAuthRole = (String)this.service.getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.createClientAndCompleteConnect(clientData);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.authenticationData = this.authState.getAuthDataSource();
            this.doAuthentication(clientData);
        }
        catch (Exception e) {
            LOG.warn("[{}] Unable to authenticate: ", (Object)this.remoteAddress, (Object)e);
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)"Failed to authenticate"));
            this.close();
            return;
        }
    }

    protected void handleAuthResponse(PulsarApi.CommandAuthResponse authResponse) {
        Preconditions.checkArgument((this.state == State.Connecting ? 1 : 0) != 0);
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData().toByteArray());
            this.doAuthentication(clientData);
        }
        catch (Exception e) {
            String msg = "Unable to handleAuthResponse";
            LOG.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
            this.close();
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
    }

    protected void handleGetTopicsOfNamespace(PulsarApi.CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
    }

    protected void handleGetSchema(PulsarApi.CommandGetSchema commandGetSchema) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetSchema(commandGetSchema);
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleLookup(lookup);
    }

    private void close() {
        this.state = State.Closed;
        this.ctx.close();
        try {
            this.client.close();
        }
        catch (PulsarClientException e) {
            LOG.error("Unable to close pulsar client - {}. Error - {}", (Object)this.client, (Object)e.getMessage());
        }
    }

    ClientConfigurationData createClientConfiguration() throws PulsarClientException.UnsupportedAuthenticationException {
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(this.service.getServiceUrl());
        ProxyConfiguration proxyConfig = this.service.getConfiguration();
        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
            clientConf.setAuthentication(AuthenticationFactory.create((String)proxyConfig.getBrokerClientAuthenticationPlugin(), (String)proxyConfig.getBrokerClientAuthenticationParameters()));
        }
        if (proxyConfig.isTlsEnabledWithBroker()) {
            clientConf.setUseTls(true);
            if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
                clientConf.setUseKeyStoreTls(true);
                clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
                clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
                clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
            } else {
                clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
                clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
            }
        }
        return clientConf;
    }

    private PulsarClientImpl createClient(ClientConfigurationData clientConf, AuthData clientAuthData, String clientAuthMethod, int protocolVersion) throws PulsarClientException {
        return new PulsarClientImpl(clientConf, this.service.getWorkerGroup(), (ConnectionPool)new ProxyConnectionPool(clientConf, this.service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, this.service.getWorkerGroup(), this.clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), this.service.getTimer());
    }

    private static int getProtocolVersionToAdvertise(PulsarApi.CommandConnect connect) {
        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
    }

    long newRequestId() {
        return this.client.newRequestId();
    }

    public Authentication getClientAuthentication() {
        return this.clientAuthentication;
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public boolean hasHAProxyMessage() {
        return this.haProxyMessage != null;
    }

    public HAProxyMessage getHAProxyMessage() {
        return this.haProxyMessage;
    }

    public DirectProxyHandler getDirectProxyHandler() {
        return this.directProxyHandler;
    }

    static enum State {
        Init,
        Connecting,
        ProxyLookupRequests,
        ProxyConnectionToBroker,
        Closed;

    }
}

