/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.proxy;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.kroxylicious.proxy.bootstrap.FilterChainFactory;
import io.kroxylicious.proxy.config.Configuration;
import io.kroxylicious.proxy.config.MicrometerDefinition;
import io.kroxylicious.proxy.config.PluginFactoryRegistry;
import io.kroxylicious.proxy.config.admin.AdminHttpConfiguration;
import io.kroxylicious.proxy.internal.KafkaProxyInitializer;
import io.kroxylicious.proxy.internal.MeterRegistries;
import io.kroxylicious.proxy.internal.PortConflictDetector;
import io.kroxylicious.proxy.internal.admin.AdminHttpInitializer;
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
import io.kroxylicious.proxy.internal.net.EndpointRegistry;
import io.kroxylicious.proxy.internal.net.NetworkBindingOperationProcessor;
import io.kroxylicious.proxy.internal.util.Metrics;
import io.kroxylicious.proxy.model.VirtualCluster;
import io.kroxylicious.proxy.service.HostPort;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaProxy
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxy.class);
    private static final Logger STARTUP_SHUTDOWN_LOGGER = LoggerFactory.getLogger((String)"io.kroxylicious.proxy.StartupShutdownLogger");
    @NonNull
    private final Configuration config;
    @Nullable
    private final AdminHttpConfiguration adminHttpConfig;
    @NonNull
    private final List<MicrometerDefinition> micrometerConfig;
    @NonNull
    private final List<VirtualCluster> virtualClusters;
    private final AtomicBoolean running = new AtomicBoolean();
    private final CompletableFuture<Void> shutdown = new CompletableFuture();
    private final NetworkBindingOperationProcessor bindingOperationProcessor = new DefaultNetworkBindingOperationProcessor();
    private final EndpointRegistry endpointRegistry = new EndpointRegistry(this.bindingOperationProcessor);
    @NonNull
    private final PluginFactoryRegistry pfr;
    @Nullable
    private MeterRegistries meterRegistries;
    @Nullable
    private FilterChainFactory filterChainFactory;
    @Nullable
    private EventGroupConfig adminEventGroup;
    @Nullable
    private EventGroupConfig serverEventGroup;
    @Nullable
    private Channel metricsChannel;

    public KafkaProxy(PluginFactoryRegistry pfr, Configuration config) {
        this.pfr = Objects.requireNonNull(pfr);
        this.config = Objects.requireNonNull(config);
        this.virtualClusters = config.virtualClusterModel();
        this.adminHttpConfig = config.adminHttpConfig();
        this.micrometerConfig = config.getMicrometer();
    }

    public KafkaProxy startup() throws InterruptedException {
        if (this.running.getAndSet(true)) {
            throw new IllegalStateException("This proxy is already running");
        }
        try {
            STARTUP_SHUTDOWN_LOGGER.info("Kroxylicious is starting");
            PortConflictDetector portConflictDefector = new PortConflictDetector();
            Optional<HostPort> adminHttpHostPort = Optional.ofNullable(this.shouldBindAdminEndpoint() ? new HostPort(this.adminHttpConfig.host(), this.adminHttpConfig.port()) : null);
            portConflictDefector.validate(this.virtualClusters, adminHttpHostPort);
            int availableCores = Runtime.getRuntime().availableProcessors();
            this.meterRegistries = new MeterRegistries(this.micrometerConfig);
            this.adminEventGroup = this.buildNettyEventGroups("admin", availableCores, this.config.isUseIoUring());
            this.serverEventGroup = this.buildNettyEventGroups("server", availableCores, this.config.isUseIoUring());
            this.maybeStartMetricsListener(this.adminEventGroup, this.meterRegistries);
            this.filterChainFactory = new FilterChainFactory(this.pfr, this.config.filters());
            ServerBootstrap tlsServerBootstrap = this.buildServerBootstrap(this.serverEventGroup, new KafkaProxyInitializer(this.filterChainFactory, this.pfr, true, this.endpointRegistry, this.endpointRegistry, false, Map.of()));
            ServerBootstrap plainServerBootstrap = this.buildServerBootstrap(this.serverEventGroup, new KafkaProxyInitializer(this.filterChainFactory, this.pfr, false, this.endpointRegistry, this.endpointRegistry, false, Map.of()));
            this.bindingOperationProcessor.start(plainServerBootstrap, tlsServerBootstrap);
            CompletableFuture.allOf((CompletableFuture[])this.virtualClusters.stream().map(vc -> this.endpointRegistry.registerVirtualCluster((VirtualCluster)vc).toCompletableFuture()).toArray(CompletableFuture[]::new)).join();
            Metrics.inboundDownstreamMessagesCounter();
            Metrics.inboundDownstreamDecodedMessagesCounter();
            return this;
        }
        catch (InterruptedException | RuntimeException e) {
            this.shutdown();
            throw e;
        }
    }

    private ServerBootstrap buildServerBootstrap(EventGroupConfig virtualHostEventGroup, KafkaProxyInitializer kafkaProxyInitializer) {
        return ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(virtualHostEventGroup.bossGroup(), virtualHostEventGroup.workerGroup()).channel(virtualHostEventGroup.clazz())).option(ChannelOption.SO_REUSEADDR, (Object)true)).childHandler((ChannelHandler)kafkaProxyInitializer).childOption(ChannelOption.TCP_NODELAY, (Object)true);
    }

    private EventGroupConfig buildNettyEventGroups(String name, int availableCores, boolean useIoUring) {
        Class<IOUringServerSocketChannel> channelClass;
        IOUringEventLoopGroup workerGroup;
        IOUringEventLoopGroup bossGroup;
        if (useIoUring) {
            if (!IOUring.isAvailable()) {
                throw new IllegalStateException("io_uring not available due to: " + String.valueOf(IOUring.unavailabilityCause()));
            }
            bossGroup = new IOUringEventLoopGroup(1);
            workerGroup = new IOUringEventLoopGroup(availableCores);
            channelClass = IOUringServerSocketChannel.class;
        } else if (Epoll.isAvailable()) {
            bossGroup = new EpollEventLoopGroup(1);
            workerGroup = new EpollEventLoopGroup(availableCores);
            channelClass = EpollServerSocketChannel.class;
        } else if (KQueue.isAvailable()) {
            bossGroup = new KQueueEventLoopGroup(1);
            workerGroup = new KQueueEventLoopGroup(availableCores);
            channelClass = KQueueServerSocketChannel.class;
        } else {
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup(availableCores);
            channelClass = NioServerSocketChannel.class;
        }
        return new EventGroupConfig(name, (EventLoopGroup)bossGroup, (EventLoopGroup)workerGroup, channelClass);
    }

    private void maybeStartMetricsListener(EventGroupConfig eventGroupConfig, MeterRegistries meterRegistries) throws InterruptedException {
        if (this.shouldBindAdminEndpoint()) {
            ServerBootstrap metricsBootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(eventGroupConfig.bossGroup(), eventGroupConfig.workerGroup()).option(ChannelOption.SO_REUSEADDR, (Object)true)).channel(eventGroupConfig.clazz())).childHandler((ChannelHandler)new AdminHttpInitializer(meterRegistries, this.adminHttpConfig));
            LOGGER.info("Binding metrics endpoint: {}:{}", (Object)this.adminHttpConfig.host(), (Object)this.adminHttpConfig.port());
            this.metricsChannel = metricsBootstrap.bind(this.adminHttpConfig.host(), this.adminHttpConfig.port().intValue()).sync().channel();
        }
    }

    private boolean shouldBindAdminEndpoint() {
        return this.adminHttpConfig != null && this.adminHttpConfig.endpoints().maybePrometheus().isPresent();
    }

    public void block() throws Exception {
        if (!this.running.get()) {
            throw new IllegalStateException("This proxy is not running");
        }
        this.shutdown.join();
    }

    public void shutdown() throws InterruptedException {
        if (!this.running.getAndSet(false)) {
            throw new IllegalStateException("This proxy is not running");
        }
        try {
            STARTUP_SHUTDOWN_LOGGER.info("Shutting down");
            this.endpointRegistry.shutdown().handle((u, t) -> {
                this.bindingOperationProcessor.close();
                ArrayList<Object> closeFutures = new ArrayList<Object>();
                if (this.serverEventGroup != null) {
                    closeFutures.addAll(this.serverEventGroup.shutdownGracefully());
                }
                if (this.adminEventGroup != null) {
                    closeFutures.addAll(this.adminEventGroup.shutdownGracefully());
                }
                closeFutures.forEach((Consumer<Object>)((Consumer<Future>)Future::syncUninterruptibly));
                if (this.filterChainFactory != null) {
                    this.filterChainFactory.close();
                }
                if (t != null) {
                    if (t instanceof RuntimeException) {
                        RuntimeException re = (RuntimeException)t;
                        throw re;
                    }
                    throw new RuntimeException((Throwable)t);
                }
                return null;
            }).toCompletableFuture().join();
            if (this.meterRegistries != null) {
                this.meterRegistries.close();
            }
        }
        finally {
            this.adminEventGroup = null;
            this.serverEventGroup = null;
            this.metricsChannel = null;
            this.meterRegistries = null;
            this.filterChainFactory = null;
            this.shutdown.complete(null);
            LOGGER.info("Shut down completed.");
        }
    }

    @Override
    public void close() throws Exception {
        if (this.running.get()) {
            this.shutdown();
        }
    }

    private record EventGroupConfig(String name, EventLoopGroup bossGroup, EventLoopGroup workerGroup, Class<? extends ServerChannel> clazz) {
        public List<Future<?>> shutdownGracefully() {
            return List.of(this.bossGroup.shutdownGracefully(), this.workerGroup.shutdownGracefully());
        }
    }
}

