/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeperClientFactoryImpl
implements BookKeeperClientFactory {
    private static final Logger log = LoggerFactory.getLogger(BookKeeperClientFactoryImpl.class);
    private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference();
    private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference();
    private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference();

    @Override
    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
        return this.create(conf, zkClient, ensemblePlacementPolicyClass, properties, (StatsLogger)NullStatsLogger.INSTANCE);
    }

    @Override
    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
        ClientConfiguration bkConf = this.createBkClientConfiguration(conf);
        if (properties != null) {
            properties.forEach((key, value) -> bkConf.setProperty(key, value));
        }
        if (ensemblePlacementPolicyClass.isPresent()) {
            this.setEnsemblePlacementPolicy(bkConf, conf, zkClient, ensemblePlacementPolicyClass.get());
        } else {
            BookKeeperClientFactoryImpl.setDefaultEnsemblePlacementPolicy(this.rackawarePolicyZkCache, this.clientIsolationZkCache, bkConf, conf, zkClient);
        }
        try {
            return BookKeeper.forConfig((ClientConfiguration)bkConf).allocator(PulsarByteBufAllocator.DEFAULT).statsLogger(statsLogger).build();
        }
        catch (InterruptedException | BKException e) {
            throw new IOException(e);
        }
    }

    @VisibleForTesting
    ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
        ClientConfiguration bkConf = new ClientConfiguration();
        if (conf.getBookkeeperClientAuthenticationPlugin() != null && conf.getBookkeeperClientAuthenticationPlugin().trim().length() > 0) {
            bkConf.setClientAuthProviderFactoryClass(conf.getBookkeeperClientAuthenticationPlugin());
            bkConf.setProperty(conf.getBookkeeperClientAuthenticationParametersName(), (Object)conf.getBookkeeperClientAuthenticationParameters());
        }
        if (conf.isBookkeeperTLSClientAuthentication()) {
            bkConf.setTLSClientAuthentication(true);
            bkConf.setTLSCertificatePath(conf.getBookkeeperTLSCertificateFilePath());
            bkConf.setTLSKeyStore(conf.getBookkeeperTLSKeyFilePath());
            bkConf.setTLSKeyStoreType(conf.getBookkeeperTLSKeyFileType());
            bkConf.setTLSKeyStorePasswordPath(conf.getBookkeeperTLSKeyStorePasswordPath());
            bkConf.setTLSProviderFactoryClass(conf.getBookkeeperTLSProviderFactoryClass());
            bkConf.setTLSTrustStore(conf.getBookkeeperTLSTrustCertsFilePath());
            bkConf.setTLSTrustStoreType(conf.getBookkeeperTLSTrustCertTypes());
            bkConf.setTLSTrustStorePasswordPath(conf.getBookkeeperTLSTrustStorePasswordPath());
        }
        bkConf.setThrottleValue(conf.getBookkeeperClientThrottleValue());
        bkConf.setAddEntryTimeout((int)conf.getBookkeeperClientTimeoutInSeconds());
        bkConf.setReadEntryTimeout((int)conf.getBookkeeperClientTimeoutInSeconds());
        bkConf.setSpeculativeReadTimeout(conf.getBookkeeperClientSpeculativeReadTimeoutInMillis());
        bkConf.setNumChannelsPerBookie(conf.getBookkeeperNumberOfChannelsPerBookie());
        bkConf.setUseV2WireProtocol(conf.isBookkeeperUseV2WireProtocol());
        bkConf.setEnableDigestTypeAutodetection(true);
        bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads());
        bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + 10240);
        bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());
        if (StringUtils.isNotBlank((CharSequence)conf.getBookkeeperMetadataServiceUri())) {
            bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataServiceUri());
        } else {
            String metadataServiceUri = PulsarService.bookieMetadataServiceUri(conf);
            bkConf.setMetadataServiceUri(metadataServiceUri);
        }
        if (conf.isBookkeeperClientHealthCheckEnabled()) {
            bkConf.enableBookieHealthCheck();
            bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
            bkConf.setBookieErrorThresholdPerInterval(conf.getBookkeeperClientHealthCheckErrorThresholdPerInterval());
            bkConf.setBookieQuarantineTime((int)conf.getBookkeeperClientHealthCheckQuarantineTimeInSeconds(), TimeUnit.SECONDS);
            bkConf.setBookieQuarantineRatio(conf.getBookkeeperClientQuarantineRatio());
        }
        bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
        bkConf.setExplictLacInterval(conf.getBookkeeperExplicitLacIntervalInMills());
        bkConf.setGetBookieInfoIntervalSeconds(conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
        bkConf.setGetBookieInfoRetryIntervalSeconds(conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
        PropertiesUtils.filterAndMapProperties((Properties)conf.getProperties(), (String)"bookkeeper_").forEach((key, value) -> {
            log.info("Applying BookKeeper client configuration setting {}={}", key, value);
            bkConf.setProperty(key, value);
        });
        return bkConf;
    }

    public static void setDefaultEnsemblePlacementPolicy(AtomicReference<ZooKeeperCache> rackawarePolicyZkCache, AtomicReference<ZooKeeperCache> clientIsolationZkCache, ClientConfiguration bkConf, ServiceConfiguration conf, ZooKeeper zkClient) {
        ZooKeeperCache zkc;
        if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) {
            if (conf.isBookkeeperClientRegionawarePolicyEnabled()) {
                bkConf.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
                bkConf.setProperty("reppEnableValidation", (Object)conf.getProperties().getProperty("reppEnableValidation", "true"));
                bkConf.setProperty("reppRegionsToWrite", (Object)conf.getProperties().getProperty("reppRegionsToWrite", null));
                bkConf.setProperty("reppMinimumRegionsForDurability", (Object)conf.getProperties().getProperty("reppMinimumRegionsForDurability", "2"));
                bkConf.setProperty("reppEnableDurabilityEnforcementInReplace", (Object)conf.getProperties().getProperty("reppEnableDurabilityEnforcementInReplace", "true"));
            } else {
                bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
            }
            bkConf.setMinNumRacksPerWriteQuorum(conf.getBookkeeperClientMinNumRacksPerWriteQuorum());
            bkConf.setEnforceMinNumRacksPerWriteQuorum(conf.isBookkeeperClientEnforceMinNumRacksPerWriteQuorum());
            bkConf.setProperty("reppDnsResolverClass", (Object)conf.getProperties().getProperty("reppDnsResolverClass", ZkBookieRackAffinityMapping.class.getName()));
            zkc = new ZooKeeperCache("bookies-racks", zkClient, conf.getZooKeeperOperationTimeoutSeconds()){};
            if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
                zkc.stop();
            }
            bkConf.setProperty("zk_cache_instance", (Object)rackawarePolicyZkCache.get());
        }
        if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
            bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
            bkConf.setProperty("isolationBookieGroups", (Object)conf.getBookkeeperClientIsolationGroups());
            bkConf.setProperty("secondaryIsolationBookieGroups", (Object)conf.getBookkeeperClientSecondaryIsolationGroups());
            if (bkConf.getProperty("zk_cache_instance") == null) {
                zkc = new ZooKeeperCache("bookies-isolation", zkClient, conf.getZooKeeperOperationTimeoutSeconds()){};
                if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
                    zkc.stop();
                }
                bkConf.setProperty("zk_cache_instance", (Object)clientIsolationZkCache.get());
            }
        }
    }

    private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, ZooKeeper zkClient, Class<? extends EnsemblePlacementPolicy> policyClass) {
        bkConf.setEnsemblePlacementPolicy(policyClass);
        if (bkConf.getProperty("zk_cache_instance") == null) {
            ZooKeeperCache zkc = new ZooKeeperCache("bookies-rackaware", zkClient, conf.getZooKeeperOperationTimeoutSeconds()){};
            if (!this.zkCache.compareAndSet(null, zkc)) {
                zkc.stop();
            }
            bkConf.setProperty("zk_cache_instance", (Object)this.zkCache.get());
        }
        if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) {
            bkConf.setProperty("reppDnsResolverClass", (Object)conf.getProperties().getProperty("reppDnsResolverClass", ZkBookieRackAffinityMapping.class.getName()));
        }
    }

    @Override
    public void close() {
        if (this.rackawarePolicyZkCache.get() != null) {
            this.rackawarePolicyZkCache.get().stop();
        }
        if (this.clientIsolationZkCache.get() != null) {
            this.clientIsolationZkCache.get().stop();
        }
        if (this.zkCache.get() != null) {
            this.zkCache.get().stop();
        }
    }
}

