/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.haproxy.HAProxyCommand;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.proxy.server.ParserProxyHandler;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectProxyHandler {
    private final Channel inboundChannel;
    private final ProxyConnection proxyConnection;
    Channel outboundChannel;
    boolean isTlsOutboundChannel;
    private final Rate inboundChannelRequestsRate;
    private final String originalPrincipal;
    private final AuthData clientAuthData;
    private final String clientAuthMethod;
    public static final String TLS_HANDLER = "tls";
    private final Authentication authentication;
    private AuthenticationDataProvider authenticationDataProvider;
    private final ProxyService service;
    private final Runnable onHandshakeCompleteAction;
    private final boolean tlsHostnameVerificationEnabled;
    final boolean tlsEnabledWithBroker;
    private PulsarSslFactory sslFactory;
    static final byte[] TEXT_PREFIX = new byte[]{80, 82, 79, 88, 89};
    private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);

    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
        block5: {
            this.isTlsOutboundChannel = false;
            this.service = service;
            this.authentication = proxyConnection.getClientAuthentication();
            this.inboundChannel = proxyConnection.ctx().channel();
            this.proxyConnection = proxyConnection;
            this.inboundChannelRequestsRate = new Rate();
            this.originalPrincipal = proxyConnection.clientAuthRole;
            this.clientAuthData = proxyConnection.clientAuthData;
            this.clientAuthMethod = proxyConnection.clientAuthMethod;
            this.tlsEnabledWithBroker = service.getConfiguration().isTlsEnabledWithBroker();
            this.tlsHostnameVerificationEnabled = service.getConfiguration().isTlsHostnameVerificationEnabled();
            this.onHandshakeCompleteAction = () -> ((ProxyConnection)proxyConnection).cancelKeepAliveTask();
            ProxyConfiguration config = service.getConfiguration();
            if (!this.tlsEnabledWithBroker) break block5;
            AuthenticationDataProvider authData = null;
            if (!StringUtils.isEmpty((CharSequence)config.getBrokerClientAuthenticationPlugin())) {
                try {
                    authData = this.authentication.getAuthData();
                }
                catch (PulsarClientException e) {
                    throw new RuntimeException(e);
                }
            }
            PulsarSslConfiguration sslConfiguration = this.buildSslConfiguration(config, authData);
            this.sslFactory = (PulsarSslFactory)Class.forName(config.getSslFactoryPlugin()).getConstructor(new Class[0]).newInstance(new Object[0]);
            this.sslFactory.initialize(sslConfiguration);
            this.sslFactory.createInternalSslContext();
        }
    }

    public void connect(String brokerHostAndPort, final InetSocketAddress targetBrokerAddress, final int protocolVersion) {
        String remoteHost;
        final ProxyConfiguration config = this.service.getConfiguration();
        Bootstrap b = new Bootstrap();
        b.option(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        int brokerProxyConnectTimeoutMs = this.service.getConfiguration().getBrokerProxyConnectTimeoutMs();
        if (brokerProxyConnectTimeoutMs > 0) {
            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)brokerProxyConnectTimeoutMs);
        }
        ((Bootstrap)b.group((EventLoopGroup)this.inboundChannel.eventLoop())).channel(this.inboundChannel.getClass());
        if (this.service.proxyZeroCopyModeEnabled && EpollSocketChannel.class.isAssignableFrom(this.inboundChannel.getClass())) {
            b.option(EpollChannelOption.EPOLL_MODE, (Object)EpollMode.LEVEL_TRIGGERED);
        }
        try {
            remoteHost = DirectProxyHandler.parseHost(brokerHostAndPort);
        }
        catch (IllegalArgumentException e) {
            log.warn("[{}] Failed to parse broker host '{}'", new Object[]{this.inboundChannel, brokerHostAndPort, e});
            this.inboundChannel.close();
            return;
        }
        b.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) {
                int brokerProxyReadTimeoutMs;
                ch.pipeline().addLast("consolidation", (ChannelHandler)new FlushConsolidationHandler(1024, true));
                if (DirectProxyHandler.this.tlsEnabledWithBroker) {
                    String host = targetBrokerAddress.getHostString();
                    int port = targetBrokerAddress.getPort();
                    SslHandler handler = new SslHandler(DirectProxyHandler.this.sslFactory.createClientSslEngine(ch.alloc(), host, port));
                    if (DirectProxyHandler.this.tlsHostnameVerificationEnabled) {
                        SecurityUtility.configureSSLHandler((SslHandler)handler);
                    }
                    ch.pipeline().addLast(DirectProxyHandler.TLS_HANDLER, (ChannelHandler)handler);
                }
                if ((brokerProxyReadTimeoutMs = DirectProxyHandler.this.service.getConfiguration().getBrokerProxyReadTimeoutMs()) > 0) {
                    ch.pipeline().addLast("readTimeoutHandler", (ChannelHandler)new ReadTimeoutHandler((long)brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(DirectProxyHandler.this.service.getConfiguration().getMaxMessageSize() + 10240, 0, 4, 0, 4));
                ch.pipeline().addLast("proxyOutboundHandler", (ChannelHandler)new ProxyBackendHandler(config, protocolVersion, remoteHost));
            }
        });
        ChannelFuture f = b.connect((SocketAddress)targetBrokerAddress);
        this.outboundChannel = f.channel();
        f.addListener(future -> {
            if (!future.isSuccess()) {
                log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", new Object[]{this.inboundChannel, targetBrokerAddress, brokerHostAndPort, future.cause()});
                this.inboundChannel.close();
            }
        });
    }

    private static String parseHost(String brokerPortAndHost) {
        int pos = brokerPortAndHost.lastIndexOf(58);
        if (pos > 0) {
            return brokerPortAndHost.substring(0, pos);
        }
        throw new IllegalArgumentException("Illegal broker host:port '" + brokerPortAndHost + "'");
    }

    private void writeHAProxyMessage() {
        if (this.proxyConnection.hasHAProxyMessage()) {
            ByteBuf msg = this.encodeProxyProtocolMessage(this.proxyConnection.getHAProxyMessage());
            this.writeAndFlush(msg);
        } else if (this.inboundChannel.remoteAddress() instanceof InetSocketAddress && this.inboundChannel.localAddress() instanceof InetSocketAddress) {
            InetSocketAddress clientAddress = (InetSocketAddress)this.inboundChannel.remoteAddress();
            String sourceAddress = clientAddress.getAddress().getHostAddress();
            int sourcePort = clientAddress.getPort();
            InetSocketAddress proxyAddress = (InetSocketAddress)this.inboundChannel.localAddress();
            String destinationAddress = proxyAddress.getAddress().getHostAddress();
            int destinationPort = proxyAddress.getPort();
            HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
            ByteBuf encodedMsg = this.encodeProxyProtocolMessage(msg);
            this.writeAndFlush(encodedMsg);
            msg.release();
        }
    }

    private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
        ByteBuf out = Unpooled.buffer((int)108);
        out.writeBytes(TEXT_PREFIX);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.proxiedProtocol().name(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.sourceAddress(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.destinationAddress(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)String.valueOf(msg.sourcePort()), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)String.valueOf(msg.destinationPort()), CharsetUtil.US_ASCII);
        out.writeByte(13);
        out.writeByte(10);
        return out;
    }

    public void close() {
        if (this.outboundChannel != null) {
            this.outboundChannel.close();
        }
    }

    private void writeAndFlush(ByteBuf cmd) {
        NettyChannelUtil.writeAndFlushWithVoidPromise((ChannelOutboundInvoker)this.outboundChannel, (ByteBuf)cmd);
    }

    protected PulsarSslConfiguration buildSslConfiguration(ProxyConfiguration config, AuthenticationDataProvider authData) {
        return PulsarSslConfiguration.builder().tlsProvider(config.getBrokerClientSslProvider()).tlsKeyStoreType(config.getBrokerClientTlsKeyStoreType()).tlsKeyStorePath(config.getBrokerClientTlsKeyStore()).tlsKeyStorePassword(config.getBrokerClientTlsKeyStorePassword()).tlsTrustStoreType(config.getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(config.getBrokerClientTlsTrustStore()).tlsTrustStorePassword(config.getBrokerClientTlsTrustStorePassword()).tlsCiphers(config.getBrokerClientTlsCiphers()).tlsProtocols(config.getBrokerClientTlsProtocols()).tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()).tlsCertificateFilePath(config.getBrokerClientCertificateFilePath()).tlsKeyFilePath(config.getBrokerClientKeyFilePath()).allowInsecureConnection(config.isTlsAllowInsecureConnection()).requireTrustedClientCertOnConnect(false).tlsEnabledWithKeystore(config.isBrokerClientTlsEnabledWithKeyStore()).tlsCustomParams(config.getBrokerClientSslFactoryPluginParams()).authData(authData).serverMode(false).build();
    }

    public Channel getInboundChannel() {
        return this.inboundChannel;
    }

    public Channel getOutboundChannel() {
        return this.outboundChannel;
    }

    public Rate getInboundChannelRequestsRate() {
        return this.inboundChannelRequestsRate;
    }

    public class ProxyBackendHandler
    extends PulsarDecoder {
        private BackendState state = BackendState.Init;
        private final String remoteHostName;
        protected ChannelHandlerContext ctx;
        private final ProxyConfiguration config;
        private final int protocolVersion;

        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) {
            this.config = config;
            this.protocolVersion = protocolVersion;
            this.remoteHostName = remoteHostName;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.ctx = ctx;
            if (this.config.isHaProxyProtocolEnabled()) {
                DirectProxyHandler.this.writeHAProxyMessage();
            }
            DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
            AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
            ByteBuf command = Commands.newConnect((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)DirectProxyHandler.this.proxyConnection.clientVersion, null, (String)DirectProxyHandler.this.originalPrincipal, (AuthData)DirectProxyHandler.this.clientAuthData, (String)DirectProxyHandler.this.clientAuthMethod, (String)PulsarVersion.getVersion());
            DirectProxyHandler.this.writeAndFlush(command);
            DirectProxyHandler.this.isTlsOutboundChannel = ProxyConnection.isTlsChannel(DirectProxyHandler.this.inboundChannel);
        }

        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            DirectProxyHandler.this.inboundChannel.config().setAutoRead(ctx.channel().isWritable());
            super.channelWritabilityChanged(ctx);
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            switch (this.state) {
                case Init: {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Received msg on broker connection: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, msg.getClass()});
                    }
                    super.channelRead(ctx, msg);
                    break;
                }
                case HandshakeCompleted: {
                    ProxyService.OPS_COUNTER.inc();
                    if (msg instanceof ByteBuf) {
                        ProxyService.BYTES_COUNTER.inc((double)((ByteBuf)msg).readableBytes());
                    }
                    DirectProxyHandler.this.inboundChannel.writeAndFlush(msg, DirectProxyHandler.this.inboundChannel.voidPromise());
                    if (!DirectProxyHandler.this.service.proxyZeroCopyModeEnabled || DirectProxyHandler.this.service.proxyLogLevel != 0 || DirectProxyHandler.this.isTlsOutboundChannel || DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) break;
                    if (ctx.pipeline().get("readTimeoutHandler") != null) {
                        ctx.pipeline().remove("readTimeoutHandler");
                    }
                    ProxyConnection.spliceNIC2NIC((EpollSocketChannel)ctx.channel(), (EpollSocketChannel)DirectProxyHandler.this.inboundChannel, ProxyConnection.SPLICE_BYTES).addListener(future -> {
                        if (future.isSuccess()) {
                            ProxyService.OPS_COUNTER.inc();
                            ProxyService.BYTES_COUNTER.inc((double)ProxyConnection.SPLICE_BYTES.intValue());
                        }
                    });
                    break;
                }
            }
        }

        protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
            Preconditions.checkArgument((boolean)authChallenge.hasChallenge());
            Preconditions.checkArgument((authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData() ? 1 : 0) != 0);
            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
                try {
                    DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
                }
                catch (PulsarClientException e) {
                    log.error("{} Error when refreshing authentication data provider: {}", (Object)this.ctx.channel(), (Object)e);
                    return;
                }
            }
            try {
                AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of((byte[])authChallenge.getChallenge().getAuthData()));
                Preconditions.checkState((!authData.isComplete() ? 1 : 0) != 0);
                ByteBuf request = Commands.newAuthResponse((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)PulsarVersion.getVersion());
                if (log.isDebugEnabled()) {
                    log.debug("{} Mutual auth {}", (Object)this.ctx.channel(), (Object)DirectProxyHandler.this.authentication.getAuthMethodName());
                }
                DirectProxyHandler.this.writeAndFlush(request);
            }
            catch (Exception e) {
                log.error("Error mutual verify", (Throwable)e);
            }
        }

        protected void messageReceived() {
        }

        protected void handleConnected(CommandConnected connected) {
            Preconditions.checkArgument((this.state == BackendState.Init ? 1 : 0) != 0, (String)"Unexpected state %s. BackendState.Init was expected.", (Object)((Object)this.state));
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received Connected from broker", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
            }
            this.state = BackendState.HandshakeCompleted;
            DirectProxyHandler.this.onHandshakeCompleteAction.run();
            this.startDirectProxying(connected);
            DirectProxyHandler.this.proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
        }

        private void startDirectProxying(CommandConnected connected) {
            if (DirectProxyHandler.this.service.getProxyLogLevel() == 0) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Removing decoder from pipeline", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
                }
                DirectProxyHandler.this.inboundChannel.pipeline().remove("frameDecoder");
                DirectProxyHandler.this.outboundChannel.pipeline().remove("frameDecoder");
            } else if (connected.hasMaxMessageSize()) {
                DirectProxyHandler.this.inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                DirectProxyHandler.this.outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, "frontendconn", connected.getMaxMessageSize(), DirectProxyHandler.this.outboundChannel.id()));
                DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, "backendconn", connected.getMaxMessageSize(), DirectProxyHandler.this.inboundChannel.id()));
            } else {
                DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, "frontendconn", 0x500000, DirectProxyHandler.this.outboundChannel.id()));
                DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, "backendconn", 0x500000, DirectProxyHandler.this.inboundChannel.id()));
            }
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            DirectProxyHandler.this.inboundChannel.close();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.warn("[{}] [{}] Caught exception: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, cause.getMessage(), cause});
            ctx.close();
        }
    }

    static enum BackendState {
        Init,
        HandshakeCompleted;

    }
}

