/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.proxy.server.DirectProxyHandler;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyClientCnx;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnectionPool;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyConnection
extends PulsarHandler
implements FutureListener<Void> {
    private PulsarClientImpl client;
    private ProxyService service;
    private Authentication clientAuthentication;
    AuthenticationDataSource authenticationData;
    private State state;
    private final SslContext sslCtx;
    private LookupProxyHandler lookupProxyHandler = null;
    private DirectProxyHandler directProxyHandler = null;
    String clientAuthRole;
    String clientAuthData;
    String clientAuthMethod;
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);

    ConnectionPool getConnectionPool() {
        return this.client.getCnxPool();
    }

    public ProxyConnection(ProxyService proxyService, SslContext sslCtx) {
        super(30, TimeUnit.SECONDS);
        this.service = proxyService;
        this.state = State.Init;
        this.sslCtx = sslCtx;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        ProxyService.activeConnections.inc();
        if (ProxyService.activeConnections.get() > (double)this.service.getConfiguration().getMaxConcurrentInboundConnections()) {
            ctx.close();
            ProxyService.rejectedConnections.inc();
            return;
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        ProxyService.activeConnections.dec();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ProxyService.newConnections.inc();
        LOG.info("[{}] New connection opened", (Object)this.remoteAddress);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        LOG.info("[{}] Connection closed", (Object)this.remoteAddress);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        LOG.warn("[{}] Got exception {} : {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), ClientCnx.isKnownException((Throwable)cause) ? null : cause});
        ctx.close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        switch (this.state) {
            case Init: 
            case ProxyLookupRequests: {
                super.channelRead(ctx, msg);
                break;
            }
            case ProxyConnectionToBroker: {
                ProxyService.opsCounter.inc();
                if (msg instanceof ByteBuf) {
                    ProxyService.bytesCounter.inc((double)((ByteBuf)msg).readableBytes());
                }
                this.directProxyHandler.outboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)this);
                break;
            }
        }
    }

    public void operationComplete(Future<Void> future) throws Exception {
        if (future.isSuccess()) {
            this.ctx.read();
        } else {
            LOG.warn("[{}] Error in writing to inbound channel. Closing", (Object)this.remoteAddress, (Object)future.cause());
            this.directProxyHandler.outboundChannel.close();
        }
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Init ? 1 : 0) != 0);
        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", (Object)this.remoteAddress, (Object)(connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null"));
        }
        if (this.remoteEndpointProtocolVersion < 10) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", (Object)this.remoteAddress);
            this.ctx.close();
            return;
        }
        int protocolVersionToAdvertise = ProxyConnection.getProtocolVersionToAdvertise(connect);
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", new Object[]{this.remoteAddress, protocolVersionToAdvertise, this.remoteEndpointProtocolVersion, Commands.getCurrentProtocolVersion()});
        }
        if (!this.authenticateAndCreateClient(connect)) {
            this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)"Failed to authenticate"));
            this.close();
            return;
        }
        if (connect.hasProxyToBrokerUrl()) {
            this.state = State.ProxyConnectionToBroker;
            this.directProxyHandler = new DirectProxyHandler(this.service, this, connect.getProxyToBrokerUrl(), protocolVersionToAdvertise, this.sslCtx);
            this.cancelKeepAliveTask();
        } else {
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = new LookupProxyHandler(this.service, this);
            this.ctx.writeAndFlush((Object)Commands.newConnected((int)protocolVersionToAdvertise));
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
    }

    protected void handleGetTopicsOfNamespace(PulsarApi.CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleLookup(lookup);
    }

    private void close() {
        this.state = State.Closed;
        this.ctx.close();
        try {
            this.client.close();
        }
        catch (PulsarClientException e) {
            LOG.error("Unable to close pulsar client - {}. Error - {}", (Object)this.client, (Object)e.getMessage());
        }
    }

    ClientConfigurationData createClientConfiguration() throws PulsarClientException.UnsupportedAuthenticationException {
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(this.service.getServiceUrl());
        ProxyConfiguration proxyConfig = this.service.getConfiguration();
        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
            clientConf.setAuthentication(AuthenticationFactory.create((String)proxyConfig.getBrokerClientAuthenticationPlugin(), (String)proxyConfig.getBrokerClientAuthenticationParameters()));
        }
        if (proxyConfig.isTlsEnabledWithBroker()) {
            clientConf.setUseTls(true);
            clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
        }
        return clientConf;
    }

    private boolean authenticateAndCreateClient(PulsarApi.CommandConnect connect) {
        try {
            ClientConfigurationData clientConf = this.createClientConfiguration();
            this.clientAuthentication = clientConf.getAuthentication();
            int protocolVersion = ProxyConnection.getProtocolVersionToAdvertise(connect);
            if (!this.service.getConfiguration().isAuthenticationEnabled()) {
                this.client = new PulsarClientImpl(clientConf, this.service.getWorkerGroup(), (ConnectionPool)new ProxyConnectionPool(clientConf, this.service.getWorkerGroup(), () -> new ClientCnx(clientConf, this.service.getWorkerGroup(), protocolVersion)));
                return true;
            }
            String authMethod = "none";
            if (connect.hasAuthMethodName()) {
                authMethod = connect.getAuthMethodName();
            } else if (connect.hasAuthMethod()) {
                authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
            }
            String authData = connect.getAuthData().toStringUtf8();
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authenticationData = new AuthenticationDataCommand(authData, this.remoteAddress, sslSession);
            this.clientAuthRole = this.service.getAuthenticationService().authenticate(this.authenticationData, authMethod);
            LOG.info("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, authMethod, this.clientAuthRole});
            if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
                this.clientAuthData = authData;
                this.clientAuthMethod = authMethod;
            }
            this.client = this.createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersion);
            return true;
        }
        catch (Exception e) {
            LOG.warn("[{}] Unable to authenticate: {}", (Object)this.remoteAddress, (Object)e.getMessage());
            return false;
        }
    }

    private PulsarClientImpl createClient(ClientConfigurationData clientConf, String clientAuthData, String clientAuthMethod, int protocolVersion) throws PulsarClientException {
        return new PulsarClientImpl(clientConf, this.service.getWorkerGroup(), (ConnectionPool)new ProxyConnectionPool(clientConf, this.service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, this.service.getWorkerGroup(), this.clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)));
    }

    private static int getProtocolVersionToAdvertise(PulsarApi.CommandConnect connect) {
        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
    }

    long newRequestId() {
        return this.client.newRequestId();
    }

    public Authentication getClientAuthentication() {
        return this.clientAuthentication;
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    static enum State {
        Init,
        ProxyLookupRequests,
        ProxyConnectionToBroker,
        Closed;

    }
}

