/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.proxy.server.BrokerDiscoveryProvider;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ServiceChannelInitializer;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyService
implements Closeable {
    private final ProxyConfiguration proxyConfig;
    private final String serviceUrl;
    private final String serviceUrlTls;
    private ConfigurationCacheService configurationCacheService;
    private final AuthenticationService authenticationService;
    private AuthorizationService authorizationService;
    private ZooKeeperClientFactory zkClientFactory = null;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
    private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
    private BrokerDiscoveryProvider discoveryProvider;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected static int proxyLogLevel;
    private static final int numThreads;
    static final Gauge activeConnections;
    static final Counter newConnections;
    static final Counter rejectedConnections;
    static final Counter opsCounter;
    static final Counter bytesCounter;
    private static final Logger LOG;

    public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws IOException {
        String hostname;
        Preconditions.checkNotNull((Object)proxyConfig);
        this.proxyConfig = proxyConfig;
        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
        try {
            hostname = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        this.serviceUrl = proxyConfig.getServicePort().isPresent() ? String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort().get()) : null;
        this.serviceUrlTls = proxyConfig.getServicePortTls().isPresent() ? String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls().get()) : null;
        proxyLogLevel = proxyConfig.getproxyLogLevel().isPresent() ? Integer.valueOf(proxyConfig.getproxyLogLevel().get()) : 0;
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup((int)1, (ThreadFactory)this.acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup((int)numThreads, (ThreadFactory)this.workersThreadFactory);
        this.authenticationService = authenticationService;
    }

    public void start() throws Exception {
        if (!StringUtils.isBlank((CharSequence)this.proxyConfig.getZookeeperServers()) && !StringUtils.isBlank((CharSequence)this.proxyConfig.getConfigurationStoreServers())) {
            this.discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, this.getZooKeeperClientFactory());
            this.configurationCacheService = new ConfigurationCacheService((ZooKeeperCache)this.discoveryProvider.globalZkCache);
            this.authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom((PulsarConfiguration)this.proxyConfig), this.configurationCacheService);
        }
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childOption(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        bootstrap.group(this.acceptorGroup, this.workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(1024, 16384, 0x100000));
        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass((EventLoopGroup)this.workerGroup));
        EventLoopUtil.enableTriggeredMode((ServerBootstrap)bootstrap);
        bootstrap.childHandler((ChannelHandler)new ServiceChannelInitializer(this, this.proxyConfig, false));
        if (this.proxyConfig.getServicePort().isPresent()) {
            try {
                bootstrap.bind(this.proxyConfig.getServicePort().get().intValue()).sync();
                LOG.info("Started Pulsar Proxy at {}", (Object)this.serviceUrl);
            }
            catch (Exception e) {
                throw new IOException("Failed to bind Pulsar Proxy on port " + this.proxyConfig.getServicePort().get(), e);
            }
        }
        LOG.info("Started Pulsar Proxy at {}", (Object)this.serviceUrl);
        if (this.proxyConfig.getServicePortTls().isPresent()) {
            ServerBootstrap tlsBootstrap = bootstrap.clone();
            tlsBootstrap.childHandler((ChannelHandler)new ServiceChannelInitializer(this, this.proxyConfig, true));
            tlsBootstrap.bind(this.proxyConfig.getServicePortTls().get().intValue()).sync();
            LOG.info("Started Pulsar TLS Proxy on port {}", (Object)this.proxyConfig.getServicePortTls().get());
        }
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperClientFactoryImpl();
        }
        return this.zkClientFactory;
    }

    public BrokerDiscoveryProvider getDiscoveryProvider() {
        return this.discoveryProvider;
    }

    @Override
    public void close() throws IOException {
        if (this.discoveryProvider != null) {
            this.discoveryProvider.close();
        }
        this.acceptorGroup.shutdownGracefully();
        this.workerGroup.shutdownGracefully();
    }

    public String getServiceUrl() {
        return this.serviceUrl;
    }

    public String getServiceUrlTls() {
        return this.serviceUrlTls;
    }

    public ProxyConfiguration getConfiguration() {
        return this.proxyConfig;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public ConfigurationCacheService getConfigurationCacheService() {
        return this.configurationCacheService;
    }

    public void setConfigurationCacheService(ConfigurationCacheService configurationCacheService) {
        this.configurationCacheService = configurationCacheService;
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    static {
        numThreads = Runtime.getRuntime().availableProcessors();
        activeConnections = (Gauge)Gauge.build((String)"pulsar_proxy_active_connections", (String)"Number of connections currently active in the proxy").create().register();
        newConnections = (Counter)Counter.build((String)"pulsar_proxy_new_connections", (String)"Counter of connections being opened in the proxy").create().register();
        rejectedConnections = (Counter)Counter.build((String)"pulsar_proxy_rejected_connections", (String)"Counter for connections rejected due to throttling").create().register();
        opsCounter = (Counter)Counter.build((String)"pulsar_proxy_binary_ops", (String)"Counter of proxy operations").create().register();
        bytesCounter = (Counter)Counter.build((String)"pulsar_proxy_binary_bytes", (String)"Counter of proxy bytes").create().register();
        LOG = LoggerFactory.getLogger(ProxyService.class);
    }
}

