/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLSession;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectProxyHandler {
    private Channel inboundChannel;
    Channel outboundChannel;
    private String originalPrincipal;
    private String clientAuthData;
    private String clientAuthMethod;
    public static final String TLS_HANDLER = "tls";
    private final Authentication authentication;
    private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);

    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
        URI targetBroker;
        this.authentication = proxyConnection.getClientAuthentication();
        this.inboundChannel = proxyConnection.ctx().channel();
        this.originalPrincipal = proxyConnection.clientAuthRole;
        this.clientAuthData = proxyConnection.clientAuthData;
        this.clientAuthMethod = proxyConnection.clientAuthMethod;
        final ProxyConfiguration config = service.getConfiguration();
        Bootstrap b = new Bootstrap();
        b.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        ((Bootstrap)((Bootstrap)b.group((EventLoopGroup)this.inboundChannel.eventLoop())).channel(this.inboundChannel.getClass())).option(ChannelOption.AUTO_READ, (Object)false);
        b.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                if (config.isTlsEnabledWithBroker()) {
                    AuthenticationDataProvider authData = DirectProxyHandler.this.authentication.getAuthData();
                    SslContext sslCtx = authData.hasDataForTls() ? SecurityUtility.createNettySslContextForClient((boolean)config.isTlsAllowInsecureConnection(), (String)config.getBrokerClientTrustCertsFilePath(), (Certificate[])((X509Certificate[])authData.getTlsCertificates()), (PrivateKey)authData.getTlsPrivateKey()) : SecurityUtility.createNettySslContextForClient((boolean)config.isTlsAllowInsecureConnection(), (String)config.getBrokerClientTrustCertsFilePath());
                    ch.pipeline().addLast(DirectProxyHandler.TLS_HANDLER, (ChannelHandler)sslCtx.newHandler(ch.alloc()));
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x500000, 0, 4, 0, 4));
                ch.pipeline().addLast("proxyOutboundHandler", (ChannelHandler)new ProxyBackendHandler(config));
            }
        });
        try {
            targetBroker = new URI("pulsar://" + targetBrokerUrl);
        }
        catch (URISyntaxException e) {
            log.warn("[{}] Failed to parse broker url '{}'", new Object[]{this.inboundChannel, targetBrokerUrl, e});
            this.inboundChannel.close();
            return;
        }
        ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
        this.outboundChannel = f.channel();
        f.addListener(future -> {
            if (!future.isSuccess()) {
                this.inboundChannel.close();
                return;
            }
            ProxyBackendHandler cnx = (ProxyBackendHandler)this.outboundChannel.pipeline().get("proxyOutboundHandler");
            cnx.setRemoteHostName(targetBroker.getHost());
        });
    }

    public class ProxyBackendHandler
    extends PulsarDecoder
    implements FutureListener<Void> {
        private BackendState state = BackendState.Init;
        private String remoteHostName;
        protected ChannelHandlerContext ctx;
        private ProxyConfiguration config;

        public ProxyBackendHandler(ProxyConfiguration config) {
            this.config = config;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.ctx = ctx;
            String authData = "";
            if (DirectProxyHandler.this.authentication.getAuthData().hasDataFromCommand()) {
                authData = DirectProxyHandler.this.authentication.getAuthData().getCommandData();
            }
            ByteBuf command = null;
            command = Commands.newConnect((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (String)authData, (String)"Pulsar proxy", null, (String)DirectProxyHandler.this.originalPrincipal, (String)DirectProxyHandler.this.clientAuthData, (String)DirectProxyHandler.this.clientAuthMethod);
            DirectProxyHandler.this.outboundChannel.writeAndFlush((Object)command);
            DirectProxyHandler.this.outboundChannel.read();
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            switch (this.state) {
                case Init: {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Received msg on broker connection: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, msg.getClass()});
                    }
                    super.channelRead(ctx, msg);
                    break;
                }
                case HandshakeCompleted: {
                    DirectProxyHandler.this.inboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)this);
                    break;
                }
            }
        }

        public void operationComplete(Future<Void> future) throws Exception {
            if (future.isSuccess()) {
                DirectProxyHandler.this.outboundChannel.read();
            } else {
                log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, future.cause()});
                DirectProxyHandler.this.inboundChannel.close();
            }
        }

        protected void messageReceived() {
        }

        protected void handleConnected(PulsarApi.CommandConnected connected) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received Connected from broker", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
            }
            if (this.config.isTlsHostnameVerificationEnabled() && this.remoteHostName != null && !this.verifyTlsHostName(this.remoteHostName, this.ctx)) {
                log.warn("[{}] Failed to verify hostname of {}", (Object)this.ctx.channel(), (Object)this.remoteHostName);
                this.ctx.close();
                return;
            }
            this.state = BackendState.HandshakeCompleted;
            DirectProxyHandler.this.inboundChannel.writeAndFlush((Object)Commands.newConnected((int)connected.getProtocolVersion())).addListener(future -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Removing decoder from pipeline", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
                }
                DirectProxyHandler.this.inboundChannel.pipeline().remove("frameDecoder");
                DirectProxyHandler.this.outboundChannel.pipeline().remove("frameDecoder");
                DirectProxyHandler.this.inboundChannel.read();
                DirectProxyHandler.this.outboundChannel.read();
            });
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            DirectProxyHandler.this.inboundChannel.close();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.warn("[{}] [{}] Caught exception: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, cause.getMessage(), cause});
            ctx.close();
        }

        public void setRemoteHostName(String remoteHostName) {
            this.remoteHostName = remoteHostName;
        }

        private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
            ChannelHandler sslHandler = ctx.channel().pipeline().get(DirectProxyHandler.TLS_HANDLER);
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
                return new DefaultHostnameVerifier().verify(hostname, sslSession);
            }
            return false;
        }
    }

    static enum BackendState {
        Init,
        HandshakeCompleted;

    }
}

