package io.kroxylicious.proxy.internal;

import io.kroxylicious.proxy.bootstrap.FilterChainFactory;
import io.kroxylicious.proxy.config.PluginFactoryRegistry;
import io.kroxylicious.proxy.filter.FilterAndInvoker;
import io.kroxylicious.proxy.filter.NetFilter;
import io.kroxylicious.proxy.internal.KafkaAuthnHandler;
import io.kroxylicious.proxy.internal.codec.KafkaRequestDecoder;
import io.kroxylicious.proxy.internal.codec.KafkaResponseEncoder;
import io.kroxylicious.proxy.internal.filter.ApiVersionsIntersectFilter;
import io.kroxylicious.proxy.internal.filter.BrokerAddressFilter;
import io.kroxylicious.proxy.internal.filter.EagerMetadataLearner;
import io.kroxylicious.proxy.internal.filter.NettyFilterContext;
import io.kroxylicious.proxy.internal.net.Endpoint;
import io.kroxylicious.proxy.internal.net.EndpointReconciler;
import io.kroxylicious.proxy.internal.net.VirtualClusterBinding;
import io.kroxylicious.proxy.internal.net.VirtualClusterBindingResolver;
import io.kroxylicious.proxy.model.VirtualCluster;
import io.kroxylicious.proxy.service.HostPort;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/internal/KafkaProxyInitializer.class */
public class KafkaProxyInitializer extends ChannelInitializer<SocketChannel> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxyInitializer.class);
    private final boolean haproxyProtocol;
    private final Map<KafkaAuthnHandler.SaslMechanism, AuthenticateCallbackHandler> authnHandlers;
    private final boolean tls;
    private final VirtualClusterBindingResolver virtualClusterBindingResolver;
    private final EndpointReconciler endpointReconciler;
    private final PluginFactoryRegistry pfr;
    private final FilterChainFactory filterChainFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/kroxylicious/proxy/internal/KafkaProxyInitializer$InitalizerNetFilter.class */
    public static class InitalizerNetFilter implements NetFilter {
        private final SaslDecodePredicate decodePredicate;
        private final ApiVersionsServiceImpl apiVersionService;
        private final SocketChannel ch;
        private final VirtualCluster virtualCluster;
        private final VirtualClusterBinding binding;
        private final PluginFactoryRegistry pfr;
        private final FilterChainFactory filterChainFactory;
        private final EndpointReconciler endpointReconciler;

        InitalizerNetFilter(SaslDecodePredicate saslDecodePredicate, ApiVersionsServiceImpl apiVersionsServiceImpl, SocketChannel socketChannel, VirtualClusterBinding virtualClusterBinding, PluginFactoryRegistry pluginFactoryRegistry, FilterChainFactory filterChainFactory, EndpointReconciler endpointReconciler) {
            this.decodePredicate = saslDecodePredicate;
            this.apiVersionService = apiVersionsServiceImpl;
            this.ch = socketChannel;
            this.virtualCluster = virtualClusterBinding.virtualCluster();
            this.binding = virtualClusterBinding;
            this.pfr = pluginFactoryRegistry;
            this.filterChainFactory = filterChainFactory;
            this.endpointReconciler = endpointReconciler;
        }

        @Override // io.kroxylicious.proxy.filter.NetFilter
        public void selectServer(NetFilter.NetFilterContext netFilterContext) {
            List<FilterAndInvoker> of = this.decodePredicate.isAuthenticationOffloadEnabled() ? List.of() : FilterAndInvoker.build(new ApiVersionsIntersectFilter(this.apiVersionService));
            List<FilterAndInvoker> createFilters = this.filterChainFactory.createFilters(new NettyFilterContext(this.ch.eventLoop(), this.pfr));
            List<FilterAndInvoker> build = FilterAndInvoker.build(new BrokerAddressFilter(this.virtualCluster, this.endpointReconciler));
            ArrayList arrayList = new ArrayList(of);
            arrayList.addAll(createFilters);
            if (this.binding.restrictUpstreamToMetadataDiscovery()) {
                arrayList.addAll(FilterAndInvoker.build(new EagerMetadataLearner()));
            }
            arrayList.addAll(build);
            HostPort upstreamTarget = this.binding.upstreamTarget();
            if (upstreamTarget == null) {
                throw new IllegalStateException("A target address for binding %s is not known.".formatted(this.binding));
            }
            netFilterContext.initiateConnect(upstreamTarget, arrayList);
        }
    }

    public KafkaProxyInitializer(FilterChainFactory filterChainFactory, PluginFactoryRegistry pluginFactoryRegistry, boolean z, VirtualClusterBindingResolver virtualClusterBindingResolver, EndpointReconciler endpointReconciler, boolean z2, Map<KafkaAuthnHandler.SaslMechanism, AuthenticateCallbackHandler> map) {
        this.pfr = pluginFactoryRegistry;
        this.endpointReconciler = endpointReconciler;
        this.haproxyProtocol = z2;
        this.authnHandlers = map != null ? map : Map.of();
        this.tls = z;
        this.virtualClusterBindingResolver = virtualClusterBindingResolver;
        this.filterChainFactory = filterChainFactory;
    }

    public void initChannel(SocketChannel socketChannel) {
        LOGGER.trace("Connection from {} to my address {}", socketChannel.remoteAddress(), socketChannel.localAddress());
        ChannelPipeline pipeline = socketChannel.pipeline();
        int port = socketChannel.localAddress().getPort();
        Optional<String> empty = socketChannel.parent().localAddress().getAddress().isAnyLocalAddress() ? Optional.empty() : Optional.of(socketChannel.localAddress().getAddress().getHostAddress());
        if (this.tls) {
            initTlsChannel(socketChannel, pipeline, empty, port);
        } else {
            initPlainChannel(socketChannel, pipeline, empty, port);
        }
    }

    private void initPlainChannel(final SocketChannel socketChannel, final ChannelPipeline channelPipeline, final Optional<String> optional, final int i) {
        channelPipeline.addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: io.kroxylicious.proxy.internal.KafkaProxyInitializer.1
            public void channelActive(ChannelHandlerContext channelHandlerContext) {
                CompletionStage<VirtualClusterBinding> resolve = KafkaProxyInitializer.this.virtualClusterBindingResolver.resolve(Endpoint.createEndpoint(optional, i, KafkaProxyInitializer.this.tls), null);
                SocketChannel socketChannel2 = socketChannel;
                ChannelPipeline channelPipeline2 = channelPipeline;
                resolve.handle((virtualClusterBinding, th) -> {
                    try {
                        if (th != null) {
                            channelHandlerContext.fireExceptionCaught(th);
                            return null;
                        }
                        try {
                            KafkaProxyInitializer.this.addHandlers(socketChannel2, virtualClusterBinding);
                            channelHandlerContext.fireChannelActive();
                            channelPipeline2.remove(this);
                            return null;
                        } catch (Throwable th) {
                            channelHandlerContext.fireExceptionCaught(th);
                            channelPipeline2.remove(this);
                            return null;
                        }
                    } catch (Throwable th2) {
                        channelPipeline2.remove(this);
                        throw th2;
                    }
                });
            }
        }});
    }

    private void initTlsChannel(SocketChannel socketChannel, ChannelPipeline channelPipeline, Optional<String> optional, int i) {
        LOGGER.debug("Adding SSL/SNI handler");
        channelPipeline.addLast(new ChannelHandler[]{new SniHandler((str, promise) -> {
            try {
                this.virtualClusterBindingResolver.resolve(Endpoint.createEndpoint(optional, i, this.tls), str).handle((virtualClusterBinding, th) -> {
                    try {
                        if (th != null) {
                            promise.setFailure(th);
                            return null;
                        }
                        VirtualCluster virtualCluster = virtualClusterBinding.virtualCluster();
                        Optional<SslContext> downstreamSslContext = virtualCluster.getDownstreamSslContext();
                        if (downstreamSslContext.isEmpty()) {
                            promise.setFailure(new IllegalStateException("Virtual cluster %s does not provide SSL context".formatted(virtualCluster)));
                        } else {
                            addHandlers(socketChannel, virtualClusterBinding);
                            promise.setSuccess(downstreamSslContext.get());
                        }
                        return null;
                    } catch (Throwable th) {
                        promise.setFailure(th);
                        return null;
                    }
                });
                return promise;
            } catch (Throwable th2) {
                return promise.setFailure(th2);
            }
        }) { // from class: io.kroxylicious.proxy.internal.KafkaProxyInitializer.2
            protected void onLookupComplete(ChannelHandlerContext channelHandlerContext, Future<SslContext> future) throws Exception {
                super.onLookupComplete(channelHandlerContext, future);
                channelHandlerContext.fireChannelActive();
            }
        }});
    }

    void addHandlers(SocketChannel socketChannel, VirtualClusterBinding virtualClusterBinding) {
        VirtualCluster virtualCluster = virtualClusterBinding.virtualCluster();
        ChannelPipeline pipeline = socketChannel.pipeline();
        if (virtualCluster.isLogNetwork()) {
            pipeline.addLast("networkLogger", new LoggingHandler("io.kroxylicious.proxy.internal.DownstreamNetworkLogger", LogLevel.INFO));
        }
        if (this.haproxyProtocol) {
            LOGGER.debug("Adding haproxy handler");
            pipeline.addLast("HAProxyMessageDecoder", new HAProxyMessageDecoder());
        }
        SaslDecodePredicate saslDecodePredicate = new SaslDecodePredicate(!this.authnHandlers.isEmpty());
        pipeline.addLast("requestDecoder", new KafkaRequestDecoder(saslDecodePredicate, virtualCluster.socketFrameMaxSizeBytes()));
        pipeline.addLast("responseEncoder", new KafkaResponseEncoder());
        pipeline.addLast("responseOrderer", new ResponseOrderer());
        if (virtualCluster.isLogFrames()) {
            pipeline.addLast("frameLogger", new LoggingHandler("io.kroxylicious.proxy.internal.DownstreamFrameLogger", LogLevel.INFO));
        }
        if (!this.authnHandlers.isEmpty()) {
            LOGGER.debug("Adding authn handler for handlers {}", this.authnHandlers);
            pipeline.addLast(new ChannelHandler[]{new KafkaAuthnHandler(socketChannel, this.authnHandlers)});
        }
        pipeline.addLast("netHandler", new KafkaProxyFrontendHandler(new InitalizerNetFilter(saslDecodePredicate, new ApiVersionsServiceImpl(), socketChannel, virtualClusterBinding, this.pfr, this.filterChainFactory, this.endpointReconciler), saslDecodePredicate, virtualCluster));
        LOGGER.debug("{}: Initial pipeline: {}", socketChannel, pipeline);
    }
}
