/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar;

import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarStandaloneBuilder;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.apache.pulsar.packages.management.storage.filesystem.FileSystemPackagesStorageProvider;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarStandalone
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(PulsarStandalone.class);
    private static final String PULSAR_STANDALONE_USE_ZOOKEEPER = "PULSAR_STANDALONE_USE_ZOOKEEPER";
    PulsarService broker;
    LocalBookkeeperEnsemble bkEnsemble;
    BKCluster bkCluster;
    MetadataStoreExtended metadataStore;
    ServiceConfiguration config;
    WorkerService fnWorkerService;
    WorkerConfig workerConfig;
    @Parameter(names={"-c", "--config"}, description="Configuration file path")
    private String configFile;
    @Parameter(names={"--wipe-data"}, description="Clean up previous ZK/BK data")
    private boolean wipeData = false;
    @Parameter(names={"--num-bookies"}, description="Number of local Bookies")
    private int numOfBk = 1;
    @Parameter(names={"--metadata-dir"}, description="Directory for storing metadata")
    private String metadataDir = "data/metadata";
    @Parameter(names={"--metadata-url"}, description="Metadata store url")
    private String metadataStoreUrl = "";
    @Parameter(names={"--zookeeper-port"}, description="Local zookeeper's port", hidden=true)
    private int zkPort = 2181;
    @Parameter(names={"--bookkeeper-port"}, description="Local bookies base port")
    private int bkPort = 3181;
    @Parameter(names={"--zookeeper-dir"}, description="Local zooKeeper's data directory", hidden=true)
    private String zkDir = "data/standalone/zookeeper";
    @Parameter(names={"--bookkeeper-dir"}, description="Local bookies base data directory")
    private String bkDir = "data/standalone/bookkeeper";
    @Parameter(names={"--no-broker"}, description="Only start ZK and BK services, no broker")
    private boolean noBroker = false;
    @Parameter(names={"--only-broker"}, description="Only start Pulsar broker service (no ZK, BK)")
    private boolean onlyBroker = false;
    @Parameter(names={"-nfw", "--no-functions-worker"}, description="Run functions worker with Broker")
    private boolean noFunctionsWorker = false;
    @Parameter(names={"-fwc", "--functions-worker-conf"}, description="Configuration file for Functions Worker")
    private String fnWorkerConfigFile = "conf/functions_worker.yml";
    @Parameter(names={"-nss", "--no-stream-storage"}, description="Disable stream storage")
    private boolean noStreamStorage = false;
    @Parameter(names={"--stream-storage-port"}, description="Local bookies stream storage port")
    private int streamStoragePort = 4181;
    @Parameter(names={"-a", "--advertised-address"}, description="Standalone broker advertised address")
    private String advertisedAddress = null;
    @Parameter(names={"-h", "--help"}, description="Show this help message")
    private boolean help = false;
    private boolean usingNewDefaultsPIP117;

    public void setBroker(PulsarService broker) {
        this.broker = broker;
    }

    public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) {
        this.bkEnsemble = bkEnsemble;
    }

    public void setBkPort(int bkPort) {
        this.bkPort = bkPort;
    }

    public void setBkDir(String bkDir) {
        this.bkDir = bkDir;
    }

    public void setAdvertisedAddress(String advertisedAddress) {
        this.advertisedAddress = advertisedAddress;
    }

    public void setConfig(ServiceConfiguration config) {
        this.config = config;
    }

    public void setFnWorkerService(WorkerService fnWorkerService) {
        this.fnWorkerService = fnWorkerService;
    }

    public void setConfigFile(String configFile) {
        this.configFile = configFile;
    }

    public void setWipeData(boolean wipeData) {
        this.wipeData = wipeData;
    }

    public void setNumOfBk(int numOfBk) {
        this.numOfBk = numOfBk;
    }

    public void setZkPort(int zkPort) {
        this.zkPort = zkPort;
    }

    public void setZkDir(String zkDir) {
        this.zkDir = zkDir;
    }

    public void setNoBroker(boolean noBroker) {
        this.noBroker = noBroker;
    }

    public void setOnlyBroker(boolean onlyBroker) {
        this.onlyBroker = onlyBroker;
    }

    public void setNoFunctionsWorker(boolean noFunctionsWorker) {
        this.noFunctionsWorker = noFunctionsWorker;
    }

    public void setFnWorkerConfigFile(String fnWorkerConfigFile) {
        this.fnWorkerConfigFile = fnWorkerConfigFile;
    }

    public void setNoStreamStorage(boolean noStreamStorage) {
        this.noStreamStorage = noStreamStorage;
    }

    public void setStreamStoragePort(int streamStoragePort) {
        this.streamStoragePort = streamStoragePort;
    }

    public void setHelp(boolean help) {
        this.help = help;
    }

    public ServiceConfiguration getConfig() {
        return this.config;
    }

    public String getConfigFile() {
        return this.configFile;
    }

    public boolean isWipeData() {
        return this.wipeData;
    }

    public int getNumOfBk() {
        return this.numOfBk;
    }

    public int getZkPort() {
        return this.zkPort;
    }

    public int getBkPort() {
        return this.bkPort;
    }

    public String getZkDir() {
        return this.zkDir;
    }

    public String getBkDir() {
        return this.bkDir;
    }

    public boolean isNoBroker() {
        return this.noBroker;
    }

    public boolean isOnlyBroker() {
        return this.onlyBroker;
    }

    public boolean isNoFunctionsWorker() {
        return this.noFunctionsWorker;
    }

    public String getFnWorkerConfigFile() {
        return this.fnWorkerConfigFile;
    }

    public boolean isNoStreamStorage() {
        return this.noStreamStorage;
    }

    public int getStreamStoragePort() {
        return this.streamStoragePort;
    }

    public String getAdvertisedAddress() {
        return this.advertisedAddress;
    }

    public boolean isHelp() {
        return this.help;
    }

    public void start() throws Exception {
        NamespaceResources.PartitionedTopicResources partitionedTopicResources;
        Optional getResult;
        String forceUseZookeeperEnv;
        if (this.config == null) {
            log.error("Failed to load configuration");
            System.exit(1);
        }
        if (StringUtils.equalsAnyIgnoreCase((CharSequence)(forceUseZookeeperEnv = System.getenv(PULSAR_STANDALONE_USE_ZOOKEEPER)), (CharSequence[])new CharSequence[]{"1", "true"})) {
            this.usingNewDefaultsPIP117 = false;
            log.info("Forcing to chose ZooKeeper metadata through environment variable");
        } else if (Paths.get(this.zkDir, new String[0]).toFile().exists()) {
            log.info("Found existing ZooKeeper metadata. Continuing with ZooKeeper");
            this.usingNewDefaultsPIP117 = false;
        } else {
            this.usingNewDefaultsPIP117 = true;
        }
        log.debug("--- setup PulsarStandaloneStarter ---");
        if (!this.isOnlyBroker()) {
            if (this.usingNewDefaultsPIP117) {
                this.startBookieWithMetadataStore();
            } else {
                this.startBookieWithZookeeper();
            }
        }
        if (this.isNoBroker()) {
            return;
        }
        if (!this.isNoFunctionsWorker()) {
            String filepath = Path.of(this.getFnWorkerConfigFile(), new String[0]).toAbsolutePath().normalize().toString();
            this.workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(this.config, filepath);
            if (this.usingNewDefaultsPIP117) {
                this.workerConfig.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName());
                this.config.setEnablePackagesManagement(true);
                this.config.setFunctionsWorkerEnablePackageManagement(true);
                this.workerConfig.setFunctionsWorkerEnablePackageManagement(true);
                this.config.setPackagesManagementStorageProvider(FileSystemPackagesStorageProvider.class.getName());
            } else if (this.isNoStreamStorage()) {
                this.workerConfig.setStateStorageServiceUrl(null);
            } else if (this.workerConfig.getStateStorageServiceUrl() == null) {
                this.workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
            }
            this.fnWorkerService = WorkerServiceLoader.load((WorkerConfig)this.workerConfig);
        } else {
            this.workerConfig = new WorkerConfig();
        }
        this.config.setRunningStandalone(true);
        if (!this.usingNewDefaultsPIP117) {
            String metadataStoreUrl = "zk:localhost:" + this.getZkPort();
            this.config.setMetadataStoreUrl(metadataStoreUrl);
            this.config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
            this.config.getProperties().setProperty("metadataStoreUrl", metadataStoreUrl);
            this.config.getProperties().setProperty("configurationMetadataStoreUrl", metadataStoreUrl);
        }
        this.broker = new PulsarService(this.config, this.workerConfig, Optional.ofNullable(this.fnWorkerService), PulsarStandalone::processTerminator);
        this.broker.start();
        String cluster = this.config.getClusterName();
        this.createNameSpace(cluster, "public", NamespaceName.get((String)"public", (String)"default"));
        this.createNameSpace(cluster, NamespaceName.SYSTEM_NAMESPACE.getTenant(), NamespaceName.SYSTEM_NAMESPACE);
        if (this.config.isTransactionCoordinatorEnabled() && !(getResult = (Optional)(partitionedTopicResources = this.broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()).getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get()).isPresent()) {
            partitionedTopicResources.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(1));
        }
        log.debug("--- setup completed ---");
    }

    private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
        PulsarAdmin admin = this.broker.getAdminClient();
        try {
            List namespaces;
            List tenants;
            List clusters = admin.clusters().getClusters();
            if (!clusters.contains(cluster)) {
                admin.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(this.broker.getWebServiceAddress()).serviceUrlTls(this.broker.getWebServiceAddressTls()).brokerServiceUrl(this.broker.getBrokerServiceUrl()).brokerServiceUrlTls(this.broker.getBrokerServiceUrlTls()).build());
            }
            if (!(tenants = admin.tenants().getTenants()).contains(publicTenant)) {
                admin.tenants().createTenant(publicTenant, TenantInfo.builder().adminRoles((Set)Sets.newHashSet((Iterable)this.config.getSuperUserRoles())).allowedClusters((Set)Sets.newHashSet((Object[])new String[]{cluster})).build());
            }
            if (!(namespaces = admin.namespaces().getNamespaces(publicTenant)).contains(ns.toString())) {
                admin.namespaces().createNamespace(ns.toString(), this.config.getDefaultNumberOfNamespaceBundles());
            }
        }
        catch (PulsarAdminException e) {
            log.error("Failed to create namespace {} on cluster {} and tenant {}", new Object[]{ns, cluster, publicTenant, e});
        }
    }

    public static PulsarStandaloneBuilder builder() {
        return PulsarStandaloneBuilder.instance();
    }

    @Override
    public void close() {
        try {
            if (this.fnWorkerService != null) {
                this.fnWorkerService.stop();
                this.fnWorkerService = null;
            }
            if (this.broker != null) {
                this.broker.close();
                this.broker = null;
            }
            if (this.bkCluster != null) {
                this.bkCluster.close();
                this.bkCluster = null;
            }
            if (this.bkEnsemble != null) {
                this.bkEnsemble.stop();
                this.bkEnsemble = null;
            }
        }
        catch (Exception e) {
            log.error("Shutdown failed: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    @VisibleForTesting
    void startBookieWithMetadataStore() throws Exception {
        if (StringUtils.isBlank((CharSequence)this.metadataStoreUrl)) {
            log.info("Starting BK with RocksDb metadata store");
            Path metadataDirPath = Paths.get(this.metadataDir, new String[0]);
            this.metadataStoreUrl = "rocksdb://" + metadataDirPath.toAbsolutePath();
            if (this.wipeData && Files.exists(metadataDirPath, new LinkOption[0])) {
                log.info("Wiping RocksDb metadata store at {}", (Object)this.metadataStoreUrl);
                FileUtils.cleanDirectory((File)metadataDirPath.toFile());
            }
        } else {
            log.info("Starting BK with metadata store: {}", (Object)this.metadataStoreUrl);
        }
        ServerConfiguration bkServerConf = new ServerConfiguration();
        bkServerConf.loadConf(new File(this.configFile).toURI().toURL());
        this.calculateCacheSize(bkServerConf);
        this.bkCluster = BKCluster.builder().baseServerConfiguration(bkServerConf).metadataServiceUri(this.metadataStoreUrl).bkPort(this.bkPort).numBookies(this.numOfBk).dataDir(this.bkDir).clearOldData(this.wipeData).build();
        this.config.setBookkeeperNumberOfChannelsPerBookie(1);
        this.config.setMetadataStoreUrl(this.metadataStoreUrl);
    }

    private void startBookieWithZookeeper() throws Exception {
        log.info("Starting BK & ZK cluster");
        ServerConfiguration bkServerConf = new ServerConfiguration();
        bkServerConf.loadConf(new File(this.configFile).toURI().toURL());
        this.calculateCacheSize(bkServerConf);
        this.bkEnsemble = new LocalBookkeeperEnsemble(this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(), this.getBkDir(), this.isWipeData(), "127.0.0.1");
        this.bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
        this.config.setMetadataStoreUrl("zk:127.0.0.1:" + this.zkPort);
    }

    private void calculateCacheSize(ServerConfiguration bkServerConf) {
        String writeCacheMaxSizeMb = "dbStorage_writeCacheMaxSizeMb";
        String readAheadCacheMaxSizeMb = "dbStorage_readAheadCacheMaxSizeMb";
        Object writeCache = bkServerConf.getProperty(writeCacheMaxSizeMb);
        Object readCache = bkServerConf.getProperty(readAheadCacheMaxSizeMb);
        int instanceCount = this.usingNewDefaultsPIP117 ? 1 + this.numOfBk : 2 + this.numOfBk;
        long defaultCacheMB = PlatformDependent.maxDirectMemory() / 0x100000L / (long)instanceCount / 4L;
        if (writeCache == null || writeCache.equals("")) {
            bkServerConf.setProperty(writeCacheMaxSizeMb, (Object)defaultCacheMB);
        }
        if (readCache == null || readCache.equals("")) {
            bkServerConf.setProperty(readAheadCacheMaxSizeMb, (Object)defaultCacheMB);
        }
    }

    private static void processTerminator(int exitCode) {
        log.info("Halting standalone process with code {}", (Object)exitCode);
        ShutdownUtil.triggerImmediateForcefulShutdown((int)exitCode);
    }

    public String getBrokerServiceUrl() {
        return this.broker.getBrokerServiceUrl();
    }

    public String getWebServiceUrl() {
        return this.broker.getWebServiceAddress();
    }
}

