/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.proxy.server.ParserProxyHandler;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectProxyHandler {
    private Channel inboundChannel;
    Channel outboundChannel;
    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<ChannelId, ChannelId>();
    private String originalPrincipal;
    private AuthData clientAuthData;
    private String clientAuthMethod;
    private int protocolVersion;
    public static final String TLS_HANDLER = "tls";
    private final Authentication authentication;
    private final Supplier<SslHandler> sslHandlerSupplier;
    private AuthenticationDataProvider authenticationDataProvider;
    private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);

    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, final int protocolVersion, final Supplier<SslHandler> sslHandlerSupplier) {
        URI targetBroker;
        this.authentication = proxyConnection.getClientAuthentication();
        this.inboundChannel = proxyConnection.ctx().channel();
        this.originalPrincipal = proxyConnection.clientAuthRole;
        this.clientAuthData = proxyConnection.clientAuthData;
        this.clientAuthMethod = proxyConnection.clientAuthMethod;
        this.protocolVersion = protocolVersion;
        this.sslHandlerSupplier = sslHandlerSupplier;
        final ProxyConfiguration config = service.getConfiguration();
        Bootstrap b = new Bootstrap();
        b.option(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        ((Bootstrap)((Bootstrap)b.group((EventLoopGroup)this.inboundChannel.eventLoop())).channel(this.inboundChannel.getClass())).option(ChannelOption.AUTO_READ, (Object)false);
        b.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                if (sslHandlerSupplier != null) {
                    ch.pipeline().addLast(DirectProxyHandler.TLS_HANDLER, (ChannelHandler)sslHandlerSupplier.get());
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(5253120, 0, 4, 0, 4));
                ch.pipeline().addLast("proxyOutboundHandler", (ChannelHandler)new ProxyBackendHandler(config, protocolVersion));
            }
        });
        try {
            targetBroker = new URI("pulsar://" + targetBrokerUrl);
        }
        catch (URISyntaxException e) {
            log.warn("[{}] Failed to parse broker url '{}'", new Object[]{this.inboundChannel, targetBrokerUrl, e});
            this.inboundChannel.close();
            return;
        }
        ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
        this.outboundChannel = f.channel();
        f.addListener(future -> {
            if (!future.isSuccess()) {
                this.inboundChannel.close();
                return;
            }
            ProxyBackendHandler cnx = (ProxyBackendHandler)this.outboundChannel.pipeline().get("proxyOutboundHandler");
            cnx.setRemoteHostName(targetBroker.getHost());
            if (ProxyService.proxyLogLevel == 2) {
                inboundOutboundChannelMap.put(this.outboundChannel.id(), this.inboundChannel.id());
            }
        });
    }

    public class ProxyBackendHandler
    extends PulsarDecoder
    implements FutureListener<Void> {
        private BackendState state = BackendState.Init;
        private String remoteHostName;
        protected ChannelHandlerContext ctx;
        private ProxyConfiguration config;
        private int protocolVersion;

        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
            this.config = config;
            this.protocolVersion = protocolVersion;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.ctx = ctx;
            DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
            AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of((byte[])AuthData.INIT_AUTH_DATA));
            ByteBuf command = null;
            command = Commands.newConnect((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)"Pulsar proxy", null, (String)DirectProxyHandler.this.originalPrincipal, (AuthData)DirectProxyHandler.this.clientAuthData, (String)DirectProxyHandler.this.clientAuthMethod);
            DirectProxyHandler.this.outboundChannel.writeAndFlush((Object)command);
            DirectProxyHandler.this.outboundChannel.read();
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            switch (this.state) {
                case Init: {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Received msg on broker connection: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, msg.getClass()});
                    }
                    super.channelRead(ctx, msg);
                    break;
                }
                case HandshakeCompleted: {
                    ProxyService.opsCounter.inc();
                    if (msg instanceof ByteBuf) {
                        ProxyService.bytesCounter.inc((double)((ByteBuf)msg).readableBytes());
                    }
                    DirectProxyHandler.this.inboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)this);
                    break;
                }
            }
        }

        protected void handleAuthChallenge(PulsarApi.CommandAuthChallenge authChallenge) {
            Preconditions.checkArgument((boolean)authChallenge.hasChallenge());
            Preconditions.checkArgument((authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData() ? 1 : 0) != 0);
            try {
                AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of((byte[])authChallenge.getChallenge().getAuthData().toByteArray()));
                Preconditions.checkState((!authData.isComplete() ? 1 : 0) != 0);
                ByteBuf request = Commands.newAuthResponse((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)PulsarVersion.getVersion());
                if (log.isDebugEnabled()) {
                    log.debug("{} Mutual auth {}", (Object)this.ctx.channel(), (Object)DirectProxyHandler.this.authentication.getAuthMethodName());
                }
                DirectProxyHandler.this.outboundChannel.writeAndFlush((Object)request);
                DirectProxyHandler.this.outboundChannel.read();
            }
            catch (Exception e) {
                log.error("Error mutual verify: {}", (Throwable)e);
                return;
            }
        }

        public void operationComplete(Future<Void> future) throws Exception {
            if (future.isSuccess()) {
                DirectProxyHandler.this.outboundChannel.read();
            } else {
                log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, future.cause()});
                DirectProxyHandler.this.inboundChannel.close();
            }
        }

        protected void messageReceived() {
        }

        protected void handleConnected(PulsarApi.CommandConnected connected) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received Connected from broker", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
            }
            if (this.config.isTlsHostnameVerificationEnabled() && this.remoteHostName != null && !this.verifyTlsHostName(this.remoteHostName, this.ctx)) {
                log.warn("[{}] Failed to verify hostname of {}", (Object)this.ctx.channel(), (Object)this.remoteHostName);
                this.ctx.close();
                return;
            }
            this.state = BackendState.HandshakeCompleted;
            ChannelFuture channelFuture = connected.hasMaxMessageSize() ? DirectProxyHandler.this.inboundChannel.writeAndFlush((Object)Commands.newConnected((int)connected.getProtocolVersion(), (int)connected.getMaxMessageSize())) : DirectProxyHandler.this.inboundChannel.writeAndFlush((Object)Commands.newConnected((int)connected.getProtocolVersion()));
            channelFuture.addListener(future -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Removing decoder from pipeline", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
                }
                if (ProxyService.proxyLogLevel == 0) {
                    DirectProxyHandler.this.inboundChannel.pipeline().remove("frameDecoder");
                    DirectProxyHandler.this.outboundChannel.pipeline().remove("frameDecoder");
                } else if (connected.hasMaxMessageSize()) {
                    DirectProxyHandler.this.inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                    DirectProxyHandler.this.outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                    DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.inboundChannel, "frontendconn", connected.getMaxMessageSize()));
                    DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.outboundChannel, "backendconn", connected.getMaxMessageSize()));
                } else {
                    DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.inboundChannel, "frontendconn", 0x500000));
                    DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.outboundChannel, "backendconn", 0x500000));
                }
                DirectProxyHandler.this.inboundChannel.read();
                DirectProxyHandler.this.outboundChannel.read();
            });
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            DirectProxyHandler.this.inboundChannel.close();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.warn("[{}] [{}] Caught exception: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, cause.getMessage(), cause});
            ctx.close();
        }

        public void setRemoteHostName(String remoteHostName) {
            this.remoteHostName = remoteHostName;
        }

        private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
            ChannelHandler sslHandler = ctx.channel().pipeline().get(DirectProxyHandler.TLS_HANDLER);
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
                return new DefaultHostnameVerifier().verify(hostname, sslSession);
            }
            return false;
        }
    }

    static enum BackendState {
        Init,
        HandshakeCompleted;

    }
}

