/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarClusterMetadataSetup {
    private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);

    private static void createZkNode(ZooKeeper zkc, String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {
        try {
            ZkUtils.createFullPathOptimistic((ZooKeeper)zkc, (String)path, (byte[])data, acl, (CreateMode)createMode);
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
            // empty catch block
        }
    }

    public static void main(String[] args) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jcommander = new JCommander();
        try {
            jcommander.addObject((Object)arguments);
            jcommander.parse(args);
            if (arguments.help) {
                jcommander.usage();
                return;
            }
        }
        catch (Exception e) {
            jcommander.usage();
            throw e;
        }
        if (arguments.configurationStore == null && arguments.globalZookeeper == null) {
            System.err.println("Configuration store address argument is required (--configuration-store)");
            jcommander.usage();
            System.exit(1);
        }
        if (arguments.configurationStore != null && arguments.globalZookeeper != null) {
            System.err.println("Configuration store argument (--configuration-store) supersedes the deprecated (--global-zookeeper) argument");
            jcommander.usage();
            System.exit(1);
        }
        if (arguments.configurationStore == null) {
            arguments.configurationStore = arguments.globalZookeeper;
        }
        if (arguments.numTransactionCoordinators <= 0) {
            System.err.println("Number of transaction coordinators must greater than 0");
            System.exit(1);
        }
        log.info("Setting up cluster {} with zk={} configuration-store={}", new Object[]{arguments.cluster, arguments.zookeeper, arguments.configurationStore});
        ZooKeeper localZk = PulsarClusterMetadataSetup.initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
        ZooKeeper configStoreZk = PulsarClusterMetadataSetup.initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
        ServerConfiguration bkConf = new ServerConfiguration();
        if (arguments.existingBkMetadataServiceUri == null && arguments.bookieMetadataServiceUri == null) {
            bkConf.setZkServers(arguments.zookeeper);
            bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
            if (localZk.exists("/ledgers", false) == null && !BookKeeperAdmin.format((ServerConfiguration)bkConf, (boolean)false, (boolean)false)) {
                throw new IOException("Failed to initialize BookKeeper metadata");
            }
        }
        if (arguments.numStreamStorageContainers > 0) {
            String uriStr = bkConf.getMetadataServiceUri();
            if (arguments.existingBkMetadataServiceUri != null) {
                uriStr = arguments.existingBkMetadataServiceUri;
            } else if (arguments.bookieMetadataServiceUri != null) {
                uriStr = arguments.bookieMetadataServiceUri;
            }
            ServiceURI bkMetadataServiceUri = ServiceURI.create((String)uriStr);
            ZkClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
            initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
        }
        if (localZk.exists("/bookies", false) == null) {
            PulsarClusterMetadataSetup.createZkNode(localZk, "/bookies", "{}".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        PulsarClusterMetadataSetup.createZkNode(localZk, "/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        PulsarClusterMetadataSetup.createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        PulsarClusterMetadataSetup.createZkNode(configStoreZk, "/admin/policies", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        PulsarClusterMetadataSetup.createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls, arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
        byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)clusterData);
        PulsarClusterMetadataSetup.createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        ClusterData globalClusterData = new ClusterData(null, null);
        byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)globalClusterData);
        PulsarClusterMetadataSetup.createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        PulsarClusterMetadataSetup.createTenantIfAbsent(configStoreZk, "public", arguments.cluster);
        PulsarClusterMetadataSetup.createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
        PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStoreZk, NamespaceName.get((String)"public", (String)"default"), arguments.cluster);
        PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
        PulsarClusterMetadataSetup.createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators);
        localZk.close();
        configStoreZk.close();
        log.info("Cluster metadata for '{}' setup correctly", (Object)arguments.cluster);
    }

    static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String cluster) throws IOException, KeeperException, InterruptedException {
        String tenantPath = "/admin/policies/" + tenant;
        Stat stat = configStoreZk.exists(tenantPath, false);
        if (stat == null) {
            TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));
            PulsarClusterMetadataSetup.createZkNode(configStoreZk, tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)publicTenant), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            byte[] content = configStoreZk.getData(tenantPath, false, null);
            TenantInfo publicTenant = (TenantInfo)ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
            if (!publicTenant.getAllowedClusters().contains(cluster)) {
                publicTenant.getAllowedClusters().add(cluster);
                configStoreZk.setData(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)publicTenant), stat.getVersion());
            }
        }
    }

    static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster) throws KeeperException, InterruptedException, IOException {
        String namespacePath = "/admin/policies/" + namespaceName.toString();
        Stat stat = configStoreZk.exists(namespacePath, false);
        if (stat == null) {
            Policies policies = new Policies();
            policies.bundles = PulsarClusterMetadataSetup.getBundles(16);
            policies.replication_clusters = Collections.singleton(cluster);
            PulsarClusterMetadataSetup.createZkNode(configStoreZk, namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)policies), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            byte[] content = configStoreZk.getData(namespacePath, false, null);
            Policies policies = (Policies)ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
            if (!policies.replication_clusters.contains(cluster)) {
                policies.replication_clusters.add(cluster);
                configStoreZk.setData(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)policies), stat.getVersion());
            }
        }
    }

    static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions) throws KeeperException, InterruptedException, IOException {
        String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
        Stat stat = configStoreZk.exists(partitionedTopicPath, false);
        PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
        if (stat == null) {
            PulsarClusterMetadataSetup.createZkNode(configStoreZk, partitionedTopicPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)metadata), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            byte[] content = configStoreZk.getData(partitionedTopicPath, false, null);
            PartitionedTopicMetadata existsMeta = (PartitionedTopicMetadata)ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
            if (existsMeta.partitions < numPartitions) {
                configStoreZk.setData(partitionedTopicPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)metadata), stat.getVersion());
            }
        }
    }

    public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
        ZookeeperClientFactoryImpl zkfactory = new ZookeeperClientFactoryImpl();
        int chrootIndex = connection.indexOf("/");
        if (chrootIndex > 0) {
            String chrootPath = connection.substring(chrootIndex);
            String zkConnectForChrootCreation = connection.substring(0, chrootIndex);
            ZooKeeper chrootZk = (ZooKeeper)zkfactory.create(zkConnectForChrootCreation, ZooKeeperClientFactory.SessionType.ReadWrite, sessionTimeout).get();
            if (chrootZk.exists(chrootPath, false) == null) {
                PulsarClusterMetadataSetup.createZkNode(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                log.info("Created zookeeper chroot path {} successfully", (Object)chrootPath);
            }
            chrootZk.close();
        }
        ZooKeeper zkConnect = (ZooKeeper)zkfactory.create(connection, ZooKeeperClientFactory.SessionType.ReadWrite, sessionTimeout).get();
        return zkConnect;
    }

    private static BundlesData getBundles(int numBundles) {
        Long maxVal = 0x100000000L;
        Long segSize = maxVal / (long)numBundles;
        ArrayList partitions = Lists.newArrayList();
        partitions.add(String.format("0x%08x", 0L));
        Long curPartition = segSize;
        for (int i = 0; i < numBundles; ++i) {
            if (i != numBundles - 1) {
                partitions.add(String.format("0x%08x", curPartition));
            } else {
                partitions.add(String.format("0x%08x", maxVal - 1L));
            }
            curPartition = curPartition + segSize;
        }
        return new BundlesData((List)partitions);
    }

    private static class Arguments {
        @Parameter(names={"-c", "--cluster"}, description="Cluster name", required=true)
        private String cluster;
        @Parameter(names={"-uw", "--web-service-url"}, description="Web-service URL for new cluster", required=true)
        private String clusterWebServiceUrl;
        @Parameter(names={"-tw", "--web-service-url-tls"}, description="Web-service URL for new cluster with TLS encryption", required=false)
        private String clusterWebServiceUrlTls;
        @Parameter(names={"-ub", "--broker-service-url"}, description="Broker-service URL for new cluster", required=false)
        private String clusterBrokerServiceUrl;
        @Parameter(names={"-tb", "--broker-service-url-tls"}, description="Broker-service URL for new cluster with TLS encryption", required=false)
        private String clusterBrokerServiceUrlTls;
        @Parameter(names={"-zk", "--zookeeper"}, description="Local ZooKeeper quorum connection string", required=true)
        private String zookeeper;
        @Parameter(names={"--zookeeper-session-timeout-ms"}, description="Local zookeeper session timeout ms")
        private int zkSessionTimeoutMillis = 30000;
        @Parameter(names={"-gzk", "--global-zookeeper"}, description="Global ZooKeeper quorum connection string", required=false, hidden=true)
        private String globalZookeeper;
        @Parameter(names={"-cs", "--configuration-store"}, description="Configuration Store connection string", required=true)
        private String configurationStore;
        @Parameter(names={"--initial-num-stream-storage-containers"}, description="Num storage containers of BookKeeper stream storage")
        private int numStreamStorageContainers = 16;
        @Parameter(names={"--initial-num-transaction-coordinators"}, description="Num transaction coordinators will assigned in cluster")
        private int numTransactionCoordinators = 16;
        @Parameter(names={"--existing-bk-metadata-service-uri"}, description="The metadata service URI of the existing BookKeeper cluster that you want to use")
        private String existingBkMetadataServiceUri;
        @Deprecated
        @Parameter(names={"--bookkeeper-metadata-service-uri"}, description="The metadata service URI of the existing BookKeeper cluster that you want to use", hidden=true)
        private String bookieMetadataServiceUri;
        @Parameter(names={"-h", "--help"}, description="Show this help message")
        private boolean help = false;

        private Arguments() {
        }
    }
}

