package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/proxy/server/DirectProxyHandler.class */
public class DirectProxyHandler {
    private Channel inboundChannel;
    Channel outboundChannel;
    private String originalPrincipal;
    private AuthData clientAuthData;
    private String clientAuthMethod;
    private int protocolVersion;
    public static final String TLS_HANDLER = "tls";
    private final Authentication authentication;
    private final Supplier<SslHandler> sslHandlerSupplier;
    private AuthenticationDataProvider authenticationDataProvider;
    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);

    /* loaded from: input_file:org/apache/pulsar/proxy/server/DirectProxyHandler$BackendState.class */
    enum BackendState {
        Init,
        HandshakeCompleted
    }

    /* loaded from: input_file:org/apache/pulsar/proxy/server/DirectProxyHandler$ProxyBackendHandler.class */
    public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {
        private BackendState state = BackendState.Init;
        private String remoteHostName;
        protected ChannelHandlerContext ctx;
        private ProxyConfiguration config;
        private int protocolVersion;

        public ProxyBackendHandler(ProxyConfiguration proxyConfiguration, int i) {
            this.config = proxyConfiguration;
            this.protocolVersion = i;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.ctx = channelHandlerContext;
            DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
            DirectProxyHandler.this.outboundChannel.writeAndFlush(Commands.newConnect(DirectProxyHandler.this.authentication.getAuthMethodName(), DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)), this.protocolVersion, "Pulsar proxy", (String) null, DirectProxyHandler.this.originalPrincipal, DirectProxyHandler.this.clientAuthData, DirectProxyHandler.this.clientAuthMethod));
            DirectProxyHandler.this.outboundChannel.read();
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            switch (this.state) {
                case Init:
                    if (DirectProxyHandler.log.isDebugEnabled()) {
                        DirectProxyHandler.log.debug("[{}] [{}] Received msg on broker connection: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, obj.getClass()});
                    }
                    super.channelRead(channelHandlerContext, obj);
                    return;
                case HandshakeCompleted:
                    ProxyService.opsCounter.inc();
                    if (obj instanceof ByteBuf) {
                        ProxyService.bytesCounter.inc(((ByteBuf) obj).readableBytes());
                    }
                    DirectProxyHandler.this.inboundChannel.writeAndFlush(obj).addListener(this);
                    return;
                default:
                    return;
            }
        }

        protected void handleAuthChallenge(PulsarApi.CommandAuthChallenge commandAuthChallenge) {
            Preconditions.checkArgument(commandAuthChallenge.hasChallenge());
            Preconditions.checkArgument(commandAuthChallenge.getChallenge().hasAuthData() && commandAuthChallenge.getChallenge().hasAuthData());
            try {
                AuthData authenticate = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of(commandAuthChallenge.getChallenge().getAuthData().toByteArray()));
                Preconditions.checkState(!authenticate.isComplete());
                ByteBuf newAuthResponse = Commands.newAuthResponse(DirectProxyHandler.this.authentication.getAuthMethodName(), authenticate, this.protocolVersion, PulsarVersion.getVersion());
                if (DirectProxyHandler.log.isDebugEnabled()) {
                    DirectProxyHandler.log.debug("{} Mutual auth {}", this.ctx.channel(), DirectProxyHandler.this.authentication.getAuthMethodName());
                }
                DirectProxyHandler.this.outboundChannel.writeAndFlush(newAuthResponse);
                DirectProxyHandler.this.outboundChannel.read();
            } catch (Exception e) {
                DirectProxyHandler.log.error("Error mutual verify: {}", e);
            }
        }

        public void operationComplete(Future<Void> future) throws Exception {
            if (future.isSuccess()) {
                DirectProxyHandler.this.outboundChannel.read();
            } else {
                DirectProxyHandler.log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, future.cause()});
                DirectProxyHandler.this.inboundChannel.close();
            }
        }

        protected void messageReceived() {
        }

        protected void handleConnected(PulsarApi.CommandConnected commandConnected) {
            if (DirectProxyHandler.log.isDebugEnabled()) {
                DirectProxyHandler.log.debug("[{}] [{}] Received Connected from broker", DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel);
            }
            if (!this.config.isTlsHostnameVerificationEnabled() || this.remoteHostName == null || verifyTlsHostName(this.remoteHostName, this.ctx)) {
                this.state = BackendState.HandshakeCompleted;
                (commandConnected.hasMaxMessageSize() ? DirectProxyHandler.this.inboundChannel.writeAndFlush(Commands.newConnected(commandConnected.getProtocolVersion(), commandConnected.getMaxMessageSize())) : DirectProxyHandler.this.inboundChannel.writeAndFlush(Commands.newConnected(commandConnected.getProtocolVersion()))).addListener(future -> {
                    if (DirectProxyHandler.log.isDebugEnabled()) {
                        DirectProxyHandler.log.debug("[{}] [{}] Removing decoder from pipeline", DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel);
                    }
                    if (ProxyService.proxyLogLevel == 0) {
                        DirectProxyHandler.this.inboundChannel.pipeline().remove("frameDecoder");
                        DirectProxyHandler.this.outboundChannel.pipeline().remove("frameDecoder");
                    } else if (commandConnected.hasMaxMessageSize()) {
                        DirectProxyHandler.this.inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", new LengthFieldBasedFrameDecoder(commandConnected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                        DirectProxyHandler.this.outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", new LengthFieldBasedFrameDecoder(commandConnected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                        DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(DirectProxyHandler.this.inboundChannel, "frontendconn", commandConnected.getMaxMessageSize()));
                        DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", new ParserProxyHandler(DirectProxyHandler.this.outboundChannel, "backendconn", commandConnected.getMaxMessageSize()));
                    } else {
                        DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(DirectProxyHandler.this.inboundChannel, "frontendconn", 5242880));
                        DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", new ParserProxyHandler(DirectProxyHandler.this.outboundChannel, "backendconn", 5242880));
                    }
                    DirectProxyHandler.this.inboundChannel.read();
                    DirectProxyHandler.this.outboundChannel.read();
                });
            } else {
                DirectProxyHandler.log.warn("[{}] Failed to verify hostname of {}", this.ctx.channel(), this.remoteHostName);
                this.ctx.close();
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            DirectProxyHandler.this.inboundChannel.close();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            DirectProxyHandler.log.warn("[{}] [{}] Caught exception: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, th.getMessage(), th});
            channelHandlerContext.close();
        }

        public void setRemoteHostName(String str) {
            this.remoteHostName = str;
        }

        private boolean verifyTlsHostName(String str, ChannelHandlerContext channelHandlerContext) {
            SslHandler sslHandler = channelHandlerContext.channel().pipeline().get("tls");
            if (sslHandler == null) {
                return false;
            }
            return new DefaultHostnameVerifier().verify(str, sslHandler.engine().getSession());
        }
    }

    public DirectProxyHandler(ProxyService proxyService, ProxyConnection proxyConnection, String str, final int i, final Supplier<SslHandler> supplier) {
        this.authentication = proxyConnection.getClientAuthentication();
        this.inboundChannel = proxyConnection.ctx().channel();
        this.originalPrincipal = proxyConnection.clientAuthRole;
        this.clientAuthData = proxyConnection.clientAuthData;
        this.clientAuthMethod = proxyConnection.clientAuthMethod;
        this.protocolVersion = i;
        this.sslHandlerSupplier = supplier;
        final ProxyConfiguration configuration = proxyService.getConfiguration();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        bootstrap.group(this.inboundChannel.eventLoop()).channel(this.inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.pulsar.proxy.server.DirectProxyHandler.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (supplier != null) {
                    socketChannel.pipeline().addLast("tls", (ChannelHandler) supplier.get());
                }
                socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(5253120, 0, 4, 0, 4));
                socketChannel.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(configuration, i));
            }
        });
        try {
            URI uri = new URI("pulsar://" + str);
            ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort());
            this.outboundChannel = connect.channel();
            connect.addListener(future -> {
                if (!future.isSuccess()) {
                    this.inboundChannel.close();
                    return;
                }
                this.outboundChannel.pipeline().get("proxyOutboundHandler").setRemoteHostName(uri.getHost());
                if (ProxyService.proxyLogLevel == 2) {
                    inboundOutboundChannelMap.put(this.outboundChannel.id(), this.inboundChannel.id());
                }
            });
        } catch (URISyntaxException e) {
            log.warn("[{}] Failed to parse broker url '{}'", new Object[]{this.inboundChannel, str, e});
            this.inboundChannel.close();
        }
    }
}
