/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.servlet.Servlet;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
import org.apache.pulsar.broker.MessagingServiceShutdownHook;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarService
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
    private ServiceConfiguration config = null;
    private NamespaceService nsService = null;
    private ManagedLedgerClientFactory managedLedgerClientFactory = null;
    private LeaderElectionService leaderElectionService = null;
    private BrokerService brokerService = null;
    private WebService webService = null;
    private WebSocketService webSocketService = null;
    private ConfigurationCacheService configurationCacheService = null;
    private LocalZooKeeperCacheService localZkCacheService = null;
    private BookKeeperClientFactory bkClientFactory;
    private ZooKeeperCache localZkCache;
    private GlobalZooKeeperCache globalZkCache;
    private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
    private Compactor compactor;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, (ThreadFactory)new DefaultThreadFactory("pulsar"));
    private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, (ThreadFactory)new DefaultThreadFactory("zk-cache-callback"));
    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build();
    private final ScheduledExecutorService loadManagerExecutor;
    private ScheduledExecutorService compactorExecutor;
    private OrderedScheduler offloaderScheduler;
    private Offloaders offloaderManager = new Offloaders();
    private LedgerOffloader offloader;
    private ScheduledFuture<?> loadReportTask = null;
    private ScheduledFuture<?> loadSheddingTask = null;
    private ScheduledFuture<?> loadResourceQuotaTask = null;
    private final AtomicReference<LoadManager> loadManager = new AtomicReference();
    private PulsarAdmin adminClient = null;
    private PulsarClient client = null;
    private ZooKeeperClientFactory zkClientFactory = null;
    private final String bindAddress;
    private final String advertisedAddress;
    private final String webServiceAddress;
    private final String webServiceAddressTls;
    private final String brokerServiceUrl;
    private final String brokerServiceUrlTls;
    private final String brokerVersion;
    private SchemaRegistryService schemaRegistryService = null;
    private final Optional<WorkerService> functionWorkerService;
    private final MessagingServiceShutdownHook shutdownService;
    private MetricsGenerator metricsGenerator;
    private volatile State state;
    private final ReentrantLock mutex = new ReentrantLock();
    private final Condition isClosedCondition = this.mutex.newCondition();

    public PulsarService(ServiceConfiguration config) {
        this(config, Optional.empty());
    }

    public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService) {
        PulsarConfigurationLoader.isComplete((Object)config);
        this.state = State.Init;
        this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getBindAddress());
        this.advertisedAddress = PulsarService.advertisedAddress(config);
        this.webServiceAddress = PulsarService.webAddress(config);
        this.webServiceAddressTls = PulsarService.webAddressTls(config);
        this.brokerServiceUrl = PulsarService.brokerUrl(config);
        this.brokerServiceUrlTls = PulsarService.brokerUrlTls(config);
        this.brokerVersion = PulsarVersion.getVersion();
        this.config = config;
        this.shutdownService = new MessagingServiceShutdownHook(this);
        this.loadManagerExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-load-manager"));
        this.functionWorkerService = functionWorkerService;
    }

    @Override
    public void close() throws PulsarServerException {
        this.mutex.lock();
        try {
            if (this.state == State.Closed) {
                return;
            }
            if (this.webService != null) {
                this.webService.close();
                this.webService = null;
            }
            if (this.brokerService != null) {
                this.brokerService.close();
                this.brokerService = null;
            }
            if (this.managedLedgerClientFactory != null) {
                this.managedLedgerClientFactory.close();
                this.managedLedgerClientFactory = null;
            }
            if (this.bkClientFactory != null) {
                this.bkClientFactory.close();
                this.bkClientFactory = null;
            }
            if (this.leaderElectionService != null) {
                this.leaderElectionService.stop();
                this.leaderElectionService = null;
            }
            this.loadManagerExecutor.shutdown();
            if (this.globalZkCache != null) {
                this.globalZkCache.close();
                this.globalZkCache = null;
                this.localZooKeeperConnectionProvider.close();
                this.localZooKeeperConnectionProvider = null;
            }
            this.configurationCacheService = null;
            this.localZkCacheService = null;
            if (this.localZkCache != null) {
                this.localZkCache.stop();
                this.localZkCache = null;
            }
            if (this.adminClient != null) {
                this.adminClient.close();
                this.adminClient = null;
            }
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
            this.nsService = null;
            if (this.compactorExecutor != null) {
                this.compactorExecutor.shutdown();
            }
            if (this.offloaderScheduler != null) {
                this.offloaderScheduler.shutdown();
            }
            if (this.executor != null) {
                this.executor.shutdown();
            }
            this.orderedExecutor.shutdown();
            this.cacheExecutor.shutdown();
            LoadManager loadManager = this.loadManager.get();
            if (loadManager != null) {
                loadManager.stop();
            }
            if (this.schemaRegistryService != null) {
                this.schemaRegistryService.close();
            }
            this.offloaderManager.close();
            this.state = State.Closed;
        }
        catch (Exception e) {
            throw new PulsarServerException((Throwable)e);
        }
        finally {
            this.mutex.unlock();
        }
    }

    public ServiceConfiguration getConfiguration() {
        return this.config;
    }

    public Optional<WorkerConfig> getWorkerConfig() {
        return this.functionWorkerService.map(service -> service.getWorkerConfig());
    }

    public void start() throws PulsarServerException {
        this.mutex.lock();
        LOG.info("Starting Pulsar Broker service; version: '{}'", (Object)(this.brokerVersion != null ? this.brokerVersion : "unknown"));
        LOG.info("Git Revision {}", (Object)PulsarVersion.getGitSha());
        LOG.info("Built by {} on {} at {}", new Object[]{PulsarVersion.getBuildUser(), PulsarVersion.getBuildHost(), PulsarVersion.getBuildTime()});
        try {
            if (this.state != State.Init) {
                throw new PulsarServerException("Cannot start the service once it was stopped");
            }
            if (!this.config.getWebServicePort().isPresent() && !this.config.getWebServicePortTls().isPresent()) {
                throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
            }
            if (!this.config.getBrokerServicePort().isPresent() && !this.config.getBrokerServicePortTls().isPresent()) {
                throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
            }
            this.localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(this.getZooKeeperClientFactory(), this.config.getZookeeperServers(), this.config.getZooKeeperSessionTimeoutMillis());
            this.localZooKeeperConnectionProvider.start((ZooKeeperSessionWatcher.ShutdownService)this.shutdownService);
            this.startZkCacheService();
            this.bkClientFactory = this.newBookKeeperClientFactory();
            this.managedLedgerClientFactory = new ManagedLedgerClientFactory(this.config, this.getZkClient(), this.bkClientFactory);
            this.brokerService = new BrokerService(this);
            this.loadManager.set(LoadManager.create(this));
            this.startLeaderElectionService();
            this.startNamespaceService();
            this.offloader = this.createManagedLedgerOffloader(this.getConfiguration());
            this.brokerService.start();
            this.webService = new WebService(this);
            HashMap attributeMap = Maps.newHashMap();
            attributeMap.put("pulsar", this);
            HashMap vipAttributeMap = Maps.newHashMap();
            vipAttributeMap.put("statusFilePath", this.config.getStatusFilePath());
            vipAttributeMap.put("isReadyProbe", new Supplier<Boolean>(){

                @Override
                public Boolean get() {
                    return PulsarService.this.state == State.Started;
                }
            });
            this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap);
            this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
            this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
            this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
            this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
            this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
            this.webService.addServlet("/metrics", new ServletHolder((Servlet)new PrometheusMetricsServlet(this, this.config.isExposeTopicLevelMetricsInPrometheus(), this.config.isExposeConsumerLevelMetricsInPrometheus())), false, attributeMap);
            if (this.config.isWebSocketServiceEnabled()) {
                this.webSocketService = new WebSocketService(new ClusterData(this.webServiceAddress, this.webServiceAddressTls, this.brokerServiceUrl, this.brokerServiceUrlTls), this.config);
                this.webSocketService.start();
                WebSocketProducerServlet producerWebSocketServlet = new WebSocketProducerServlet(this.webSocketService);
                this.webService.addServlet("/ws/producer", new ServletHolder((Servlet)producerWebSocketServlet), true, attributeMap);
                this.webService.addServlet("/ws/v2/producer", new ServletHolder((Servlet)producerWebSocketServlet), true, attributeMap);
                WebSocketConsumerServlet consumerWebSocketServlet = new WebSocketConsumerServlet(this.webSocketService);
                this.webService.addServlet("/ws/consumer", new ServletHolder((Servlet)consumerWebSocketServlet), true, attributeMap);
                this.webService.addServlet("/ws/v2/consumer", new ServletHolder((Servlet)consumerWebSocketServlet), true, attributeMap);
                WebSocketReaderServlet readerWebSocketServlet = new WebSocketReaderServlet(this.webSocketService);
                this.webService.addServlet("/ws/reader", new ServletHolder((Servlet)readerWebSocketServlet), true, attributeMap);
                this.webService.addServlet("/ws/v2/reader", new ServletHolder((Servlet)readerWebSocketServlet), true, attributeMap);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Attempting to add static directory");
            }
            this.webService.addStaticResources("/static", "/static");
            this.nsService.registerBootstrapNamespaces();
            this.schemaRegistryService = SchemaRegistryService.create(this);
            this.webService.start();
            this.metricsGenerator = new MetricsGenerator(this);
            this.startLoadManagementService();
            this.state = State.Started;
            this.acquireSLANamespace();
            this.startWorkerService(this.brokerService.getAuthenticationService(), this.brokerService.getAuthorizationService());
            String bootstrapMessage = "bootstrap service " + (this.config.getWebServicePort().isPresent() ? "port = " + this.config.getWebServicePort().get() : "") + (this.config.getWebServicePortTls().isPresent() ? "tls-port = " + this.config.getWebServicePortTls() : "") + (this.config.getBrokerServicePort().isPresent() ? "broker url= " + this.brokerServiceUrl : "") + (this.config.getBrokerServicePortTls().isPresent() ? "broker url= " + this.brokerServiceUrlTls : "");
            LOG.info("messaging service is ready");
            LOG.info("messaging service is ready, {}, cluster={}, configs={}", new Object[]{bootstrapMessage, this.config.getClusterName(), ReflectionToStringBuilder.toString((Object)this.config)});
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new PulsarServerException((Throwable)e);
        }
        finally {
            this.mutex.unlock();
        }
    }

    private void startLeaderElectionService() {
        this.leaderElectionService = new LeaderElectionService(this, new LeaderElectionService.LeaderListener(){

            @Override
            public synchronized void brokerIsTheLeaderNow() {
                if (PulsarService.this.getConfiguration().isLoadBalancerEnabled()) {
                    long loadSheddingInterval = TimeUnit.MINUTES.toMillis(PulsarService.this.getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES.toMillis(PulsarService.this.getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
                    PulsarService.this.loadSheddingTask = PulsarService.this.loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(PulsarService.this.loadManager), loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
                    PulsarService.this.loadResourceQuotaTask = PulsarService.this.loadManagerExecutor.scheduleAtFixedRate(new LoadResourceQuotaUpdaterTask(PulsarService.this.loadManager), resourceQuotaUpdateInterval, resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
                }
            }

            @Override
            public synchronized void brokerIsAFollowerNow() {
                if (PulsarService.this.loadSheddingTask != null) {
                    PulsarService.this.loadSheddingTask.cancel(false);
                    PulsarService.this.loadSheddingTask = null;
                }
                if (PulsarService.this.loadResourceQuotaTask != null) {
                    PulsarService.this.loadResourceQuotaTask.cancel(false);
                    PulsarService.this.loadResourceQuotaTask = null;
                }
            }
        });
        this.leaderElectionService.start();
    }

    private void acquireSLANamespace() {
        try {
            boolean acquiredSLANamespace;
            String nsName = NamespaceService.getSLAMonitorNamespace(this.getAdvertisedAddress(), this.config);
            if (!this.globalZkCache.exists(AdminResource.path("policies") + "/" + nsName)) {
                LOG.info("SLA Namespace = {} doesn't exist.", (Object)nsName);
                return;
            }
            try {
                acquiredSLANamespace = this.nsService.registerSLANamespace();
                LOG.info("Register SLA Namespace = {}, returned - {}.", (Object)nsName, (Object)acquiredSLANamespace);
            }
            catch (PulsarServerException e) {
                acquiredSLANamespace = false;
            }
            if (!acquiredSLANamespace) {
                this.nsService.unloadSLANamespace();
            }
        }
        catch (Exception ex) {
            LOG.warn("Exception while trying to unload the SLA namespace, will try to unload the namespace again after 1 minute. Exception:", (Throwable)ex);
            this.executor.schedule(this::acquireSLANamespace, 1L, TimeUnit.MINUTES);
        }
        catch (Throwable ex) {
            LOG.warn("Exception while trying to unload the SLA namespace, will not try to unload the namespace again. Exception:", ex);
        }
    }

    public void waitUntilClosed() throws InterruptedException {
        this.mutex.lock();
        try {
            while (this.state != State.Closed) {
                this.isClosedCondition.await();
            }
        }
        finally {
            this.mutex.unlock();
        }
    }

    private void startZkCacheService() throws PulsarServerException {
        LOG.info("starting configuration cache service");
        this.localZkCache = new LocalZooKeeperCache(this.getZkClient(), this.config.getZooKeeperOperationTimeoutSeconds(), this.getOrderedExecutor());
        this.globalZkCache = new GlobalZooKeeperCache(this.getZooKeeperClientFactory(), (int)this.config.getZooKeeperSessionTimeoutMillis(), this.config.getZooKeeperOperationTimeoutSeconds(), this.config.getConfigurationStoreServers(), this.getOrderedExecutor(), this.cacheExecutor);
        try {
            this.globalZkCache.start();
        }
        catch (IOException e) {
            throw new PulsarServerException((Throwable)e);
        }
        this.configurationCacheService = new ConfigurationCacheService(this.getGlobalZkCache(), this.config.getClusterName());
        this.localZkCacheService = new LocalZooKeeperCacheService(this.getLocalZkCache(), this.configurationCacheService);
    }

    private void startNamespaceService() throws PulsarServerException {
        LOG.info("Starting name space service, bootstrap namespaces=" + this.config.getBootstrapNamespaces());
        this.nsService = this.getNamespaceServiceProvider().get();
    }

    public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
        return () -> new NamespaceService(this);
    }

    private void startLoadManagementService() throws PulsarServerException {
        LOG.info("Starting load management service ...");
        this.loadManager.get().start();
        if (this.config.isLoadBalancerEnabled()) {
            LOG.info("Starting load balancer");
            if (this.loadReportTask == null) {
                long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
                this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadReportUpdaterTask(this.loadManager), loadReportMinInterval, loadReportMinInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void loadNamespaceTopics(NamespaceBundle bundle) {
        this.executor.submit(() -> {
            LOG.info("Loading all topics on bundle: {}", (Object)bundle);
            NamespaceName nsName = bundle.getNamespaceObject();
            ArrayList persistentTopics = Lists.newArrayList();
            long topicLoadStart = System.nanoTime();
            for (String topic : this.getNamespaceService().getListOfPersistentTopics(nsName)) {
                try {
                    CompletableFuture<Topic> future;
                    TopicName topicName = TopicName.get((String)topic);
                    if (!bundle.includes(topicName) || (future = this.brokerService.getOrCreateTopic(topic)) == null) continue;
                    persistentTopics.add(future);
                }
                catch (Throwable t) {
                    LOG.warn("Failed to preload topic {}", (Object)topic, (Object)t);
                }
            }
            if (!persistentTopics.isEmpty()) {
                FutureUtil.waitForAll((List)persistentTopics).thenRun(() -> {
                    double topicLoadTimeSeconds = (double)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - topicLoadStart) / 1000.0;
                    LOG.info("Loaded {} topics on {} -- time taken: {} seconds", new Object[]{persistentTopics.size(), bundle, topicLoadTimeSeconds});
                });
            }
            return null;
        });
    }

    public String getStatusFilePath() {
        if (this.config == null) {
            return null;
        }
        return this.config.getStatusFilePath();
    }

    public ZooKeeper getZkClient() {
        return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
    }

    public ConfigurationCacheService getConfigurationCache() {
        return this.configurationCacheService;
    }

    public State getState() {
        return this.state;
    }

    public LeaderElectionService getLeaderElectionService() {
        return this.leaderElectionService;
    }

    public NamespaceService getNamespaceService() {
        return this.nsService;
    }

    public WorkerService getWorkerService() {
        return this.functionWorkerService.orElse(null);
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public BookKeeper getBookKeeperClient() {
        return this.managedLedgerClientFactory.getBookKeeperClient();
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerClientFactory.getManagedLedgerFactory();
    }

    public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
        return this.managedLedgerClientFactory;
    }

    public LedgerOffloader getManagedLedgerOffloader() {
        return this.offloader;
    }

    public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) throws PulsarServerException {
        try {
            if (StringUtils.isNotBlank((CharSequence)conf.getManagedLedgerOffloadDriver())) {
                Preconditions.checkNotNull((Object)conf.getOffloadersDirectory(), (String)"Offloader driver is configured to be '%s' but no offloaders directory is configured.", (Object)conf.getManagedLedgerOffloadDriver());
                this.offloaderManager = OffloaderUtils.searchForOffloaders((String)conf.getOffloadersDirectory());
                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(conf.getManagedLedgerOffloadDriver());
                try {
                    return offloaderFactory.create(conf.getProperties(), (Map)ImmutableMap.of((Object)"S3ManagedLedgerOffloaderSoftwareVersion".toLowerCase(), (Object)PulsarVersion.getVersion(), (Object)"S3ManagedLedgerOffloaderSoftwareGitSha".toLowerCase(), (Object)PulsarVersion.getGitSha()), this.getOffloaderScheduler(conf));
                }
                catch (IOException ioe) {
                    throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
                }
            }
            LOG.info("No ledger offloader configured, using NULL instance");
            return NullLedgerOffloader.INSTANCE;
        }
        catch (Throwable t) {
            throw new PulsarServerException(t);
        }
    }

    public ZooKeeperCache getLocalZkCache() {
        return this.localZkCache;
    }

    public ZooKeeperCache getGlobalZkCache() {
        return this.globalZkCache;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getCacheExecutor() {
        return this.cacheExecutor;
    }

    public ScheduledExecutorService getLoadManagerExecutor() {
        return this.loadManagerExecutor;
    }

    public OrderedExecutor getOrderedExecutor() {
        return this.orderedExecutor;
    }

    public LocalZooKeeperCacheService getLocalZkCacheService() {
        return this.localZkCacheService;
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperBkClientFactoryImpl(this.orderedExecutor);
        }
        return this.zkClientFactory;
    }

    public BookKeeperClientFactory newBookKeeperClientFactory() {
        return new BookKeeperClientFactoryImpl();
    }

    public BookKeeperClientFactory getBookKeeperClientFactory() {
        return this.bkClientFactory;
    }

    protected synchronized ScheduledExecutorService getCompactorExecutor() {
        if (this.compactorExecutor == null) {
            this.compactorExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("compaction"));
        }
        return this.compactorExecutor;
    }

    public synchronized Compactor getCompactor() throws PulsarServerException {
        if (this.compactor == null) {
            try {
                this.compactor = new TwoPhaseCompactor(this.getConfiguration(), this.getClient(), this.getBookKeeperClient(), this.getCompactorExecutor());
            }
            catch (Exception e) {
                throw new PulsarServerException((Throwable)e);
            }
        }
        return this.compactor;
    }

    protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(conf.getManagedLedgerOffloadMaxThreads()).name("offloader").build();
        }
        return this.offloaderScheduler;
    }

    public synchronized PulsarClient getClient() throws PulsarServerException {
        if (this.client == null) {
            try {
                ClientBuilder builder = PulsarClient.builder().serviceUrl(this.getConfiguration().isTlsEnabled() ? this.brokerServiceUrlTls : this.brokerServiceUrl).enableTls(this.getConfiguration().isTlsEnabled()).allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection()).tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
                if (StringUtils.isNotBlank((CharSequence)this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
                    builder.authentication(this.getConfiguration().getBrokerClientAuthenticationPlugin(), this.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                this.client = builder.build();
            }
            catch (Exception e) {
                throw new PulsarServerException((Throwable)e);
            }
        }
        return this.client;
    }

    public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
        if (this.adminClient == null) {
            try {
                ServiceConfiguration conf = this.getConfiguration();
                String adminApiUrl = conf.isBrokerClientTlsEnabled() ? PulsarService.webAddressTls(this.config) : PulsarService.webAddress(this.config);
                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl).authentication(conf.getBrokerClientAuthenticationPlugin(), conf.getBrokerClientAuthenticationParameters());
                if (conf.isBrokerClientTlsEnabled()) {
                    builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                    builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                }
                builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.adminClient = builder.build();
                LOG.info("created admin with url {} ", (Object)adminApiUrl);
            }
            catch (Exception e) {
                throw new PulsarServerException((Throwable)e);
            }
        }
        return this.adminClient;
    }

    public MetricsGenerator getMetricsGenerator() {
        return this.metricsGenerator;
    }

    public MessagingServiceShutdownHook getShutdownService() {
        return this.shutdownService;
    }

    public static String advertisedAddress(ServiceConfiguration config) {
        return ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
    }

    public static String brokerUrl(ServiceConfiguration config) {
        if (config.getBrokerServicePort().isPresent()) {
            return PulsarService.brokerUrl(PulsarService.advertisedAddress(config), (Integer)config.getBrokerServicePort().get());
        }
        return null;
    }

    public static String brokerUrl(String host, int port) {
        return String.format("pulsar://%s:%d", host, port);
    }

    public static String brokerUrlTls(ServiceConfiguration config) {
        if (config.getBrokerServicePortTls().isPresent()) {
            return PulsarService.brokerUrlTls(PulsarService.advertisedAddress(config), (Integer)config.getBrokerServicePortTls().get());
        }
        return null;
    }

    public static String brokerUrlTls(String host, int port) {
        return String.format("pulsar+ssl://%s:%d", host, port);
    }

    public static String webAddress(ServiceConfiguration config) {
        if (config.getWebServicePort().isPresent()) {
            return PulsarService.webAddress(PulsarService.advertisedAddress(config), (Integer)config.getWebServicePort().get());
        }
        return null;
    }

    public static String webAddress(String host, int port) {
        return String.format("http://%s:%d", host, port);
    }

    public static String webAddressTls(ServiceConfiguration config) {
        if (config.getWebServicePortTls().isPresent()) {
            return PulsarService.webAddressTls(PulsarService.advertisedAddress(config), (Integer)config.getWebServicePortTls().get());
        }
        return null;
    }

    public static String webAddressTls(String host, int port) {
        return String.format("https://%s:%d", host, port);
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    public String getAdvertisedAddress() {
        return this.advertisedAddress;
    }

    public String getSafeWebServiceAddress() {
        return this.webServiceAddress != null ? this.webServiceAddress : this.webServiceAddressTls;
    }

    public String getWebServiceAddress() {
        return this.webServiceAddress;
    }

    public String getWebServiceAddressTls() {
        return this.webServiceAddressTls;
    }

    public String getSafeBrokerServiceUrl() {
        return this.brokerServiceUrl != null ? this.brokerServiceUrl : this.brokerServiceUrlTls;
    }

    public String getBrokerServiceUrl() {
        return this.brokerServiceUrl;
    }

    public String getBrokerServiceUrlTls() {
        return this.brokerServiceUrlTls;
    }

    public AtomicReference<LoadManager> getLoadManager() {
        return this.loadManager;
    }

    public String getBrokerVersion() {
        return this.brokerVersion;
    }

    public SchemaRegistryService getSchemaRegistryService() {
        return this.schemaRegistryService;
    }

    private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException {
        if (this.functionWorkerService.isPresent()) {
            URI dlogURI;
            LOG.info("Starting function worker service");
            String namespace = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace();
            String[] a = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
            String property = a[0];
            String cluster = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster();
            try {
                NamedEntity.checkName((String)property);
                this.getGlobalZkCache().getZooKeeper().create(AdminResource.path("policies", property), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)new TenantInfo((Set)Sets.newHashSet((Iterable)this.config.getSuperUserRoles()), (Set)Sets.newHashSet((Object[])new String[]{cluster}))), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                LOG.info("Created property {} for function worker", (Object)property);
            }
            catch (KeeperException.NodeExistsException e) {
                LOG.debug("Failed to create already existing property {} for function worker service", (Object)cluster, (Object)e);
            }
            catch (IllegalArgumentException e) {
                LOG.error("Failed to create property with invalid name {} for function worker service", (Object)cluster, (Object)e);
                throw e;
            }
            catch (Exception e) {
                LOG.error("Failed to create property {} for function worker", (Object)cluster, (Object)e);
                throw e;
            }
            try {
                NamedEntity.checkName((String)cluster);
                ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null, this.brokerServiceUrl, null);
                this.getGlobalZkCache().getZooKeeper().create(AdminResource.path("clusters", cluster), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)clusterData), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                LOG.info("Created cluster {} for function worker", (Object)cluster);
            }
            catch (KeeperException.NodeExistsException e) {
                LOG.debug("Failed to create already existing cluster {} for function worker service", (Object)cluster, (Object)e);
            }
            catch (IllegalArgumentException e) {
                LOG.error("Failed to create cluster with invalid name {} for function worker service", (Object)cluster, (Object)e);
                throw e;
            }
            catch (Exception e) {
                LOG.error("Failed to create cluster {} for function worker service", (Object)cluster, (Object)e);
                throw e;
            }
            try {
                Policies policies = new Policies();
                policies.retention_policies = new RetentionPolicies(-1, -1);
                policies.replication_clusters = Collections.singleton(this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster());
                int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles();
                policies.bundles = NamespacesBase.getBundles(defaultNumberOfBundles);
                this.getConfigurationCache().policiesCache().invalidate(AdminResource.path("policies", namespace));
                ZkUtils.createFullPathOptimistic((ZooKeeper)this.getGlobalZkCache().getZooKeeper(), (String)AdminResource.path("policies", namespace), (byte[])ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)policies), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                LOG.info("Created namespace {} for function worker service", (Object)namespace);
            }
            catch (KeeperException.NodeExistsException e) {
                LOG.debug("Failed to create already existing namespace {} for function worker service", (Object)namespace);
            }
            catch (Exception e) {
                LOG.error("Failed to create namespace {}", (Object)namespace, (Object)e);
                throw e;
            }
            InternalConfigurationData internalConf = new InternalConfigurationData(this.getConfiguration().getZookeeperServers(), this.getConfiguration().getConfigurationStoreServers(), new ClientConfiguration().getZkLedgersRootPath(), (String)this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
            try {
                dlogURI = WorkerUtils.initializeDlogNamespace((String)internalConf.getZookeeperServers(), (String)internalConf.getLedgersRootPath());
            }
            catch (IOException ioe) {
                LOG.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", (Object)internalConf.getZookeeperServers(), (Object)ioe);
                throw ioe;
            }
            LOG.info("Function worker service setup completed");
            this.functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
            LOG.info("Function worker service started");
        }
    }

    public static enum State {
        Init,
        Started,
        Closed;

    }
}

