/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.manager.RedirectManager;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceService
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
    private final ServiceConfiguration config;
    private final AtomicReference<LoadManager> loadManager;
    private final PulsarService pulsar;
    private final OwnershipCache ownershipCache;
    private final MetadataCache<LocalBrokerData> localBrokerDataCache;
    private final NamespaceBundleFactory bundleFactory;
    private final String host;
    public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
    public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
    public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile("sla-monitor/[^/]+/([^:]+:\\d+)");
    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
    public static final String SLA_NAMESPACE_FMT = "sla-monitor/%s/%s";
    private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
    private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;
    private final List<NamespaceBundleSplitListener> bundleSplitListeners;
    private final RedirectManager redirectManager;
    private static final Counter lookupRedirects = (Counter)Counter.build((String)"pulsar_broker_lookup_redirects", (String)"-").register();
    private static final Counter lookupFailures = (Counter)Counter.build((String)"pulsar_broker_lookup_failures", (String)"-").register();
    private static final Counter lookupAnswers = (Counter)Counter.build((String)"pulsar_broker_lookup_answers", (String)"-").register();
    private static final Summary lookupLatency = (Summary)Summary.build("pulsar_broker_lookup", "-").quantile(0.5).quantile(0.99).quantile(0.999).quantile(1.0).register();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesAuthoritative = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesNotAuthoritative = ConcurrentOpenHashMap.newBuilder().build();

    public NamespaceService(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.host = pulsar.getAdvertisedAddress();
        this.config = pulsar.getConfiguration();
        this.loadManager = pulsar.getLoadManager();
        this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
        this.ownershipCache = new OwnershipCache(pulsar, this.bundleFactory, this);
        this.namespaceClients = ConcurrentOpenHashMap.newBuilder().build();
        this.bundleOwnershipListeners = new CopyOnWriteArrayList<NamespaceBundleOwnershipListener>();
        this.bundleSplitListeners = new CopyOnWriteArrayList<NamespaceBundleSplitListener>();
        this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
        this.redirectManager = new RedirectManager(pulsar);
    }

    public void initialize() {
        if (!this.getOwnershipCache().refreshSelfOwnerInfo()) {
            throw new RuntimeException("Failed to refresh self owner info.");
        }
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
        long startTime = System.nanoTime();
        CompletionStage future = this.getBundleAsync(topic).thenCompose(bundle -> this.findRedirectLookupResultAsync((ServiceUnitId)bundle).thenCompose(optResult -> {
            if (optResult.isPresent()) {
                LOG.info("[{}] Redirect lookup request to {} for topic {}", new Object[]{this.pulsar.getBrokerId(), optResult.get(), topic});
                return CompletableFuture.completedFuture(optResult);
            }
            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
                return this.loadManager.get().findBrokerServiceUrl((Optional<ServiceUnitId>)Optional.of(topic), (ServiceUnitId)bundle);
            }
            return this.findBrokerServiceUrl((NamespaceBundle)bundle, options);
        }));
        ((CompletableFuture)((CompletableFuture)future).thenAccept(optResult -> {
            lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            if (optResult.isPresent()) {
                if (((LookupResult)optResult.get()).isRedirect()) {
                    lookupRedirects.inc();
                } else {
                    lookupAnswers.inc();
                }
            }
        })).exceptionally(ex -> {
            lookupFailures.inc();
            return null;
        });
        return future;
    }

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
        if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
            return CompletableFuture.completedFuture(Optional.empty());
        }
        return this.redirectManager.findRedirectLookupResultAsync();
    }

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
        return this.bundleFactory.getBundlesAsync(topic.getNamespaceObject()).thenApply(bundles -> bundles.findBundle(topic));
    }

    public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
        Optional<NamespaceBundles> bundles = this.bundleFactory.getBundlesIfPresent(topicName.getNamespaceObject());
        return bundles.map(b -> b.findBundle(topicName));
    }

    public NamespaceBundle getBundle(TopicName topicName) {
        return this.bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
    }

    public int getBundleCount(NamespaceName namespace) throws Exception {
        return this.bundleFactory.getBundles(namespace).size();
    }

    private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return this.bundleFactory.getFullBundle(fqnn);
    }

    private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
        return this.bundleFactory.getFullBundleAsync(fqnn);
    }

    public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) {
        if (suName instanceof TopicName) {
            TopicName name = (TopicName)suName;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting web service URL of topic: {} - options: {}", (Object)name, (Object)options);
            }
            return this.getBundleAsync(name).thenCompose(namespaceBundle -> this.internalGetWebServiceUrl((ServiceUnitId)name, (NamespaceBundle)namespaceBundle, options));
        }
        if (suName instanceof NamespaceName) {
            NamespaceName namespaceName = (NamespaceName)suName;
            return this.getFullBundleAsync(namespaceName).thenCompose(namespaceBundle -> this.internalGetWebServiceUrl(null, (NamespaceBundle)namespaceBundle, options));
        }
        if (suName instanceof NamespaceBundle) {
            NamespaceBundle namespaceBundle2 = (NamespaceBundle)suName;
            return this.internalGetWebServiceUrl(null, namespaceBundle2, options);
        }
        throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
    }

    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
        return this.getWebServiceUrlAsync(suName, options).get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic, NamespaceBundle bundle, LookupOptions options) {
        return this.findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
            if (optResult.isPresent()) {
                LOG.info("[{}] Redirect lookup request to {} for topic {}", new Object[]{this.pulsar.getBrokerId(), optResult.get(), topic});
                try {
                    LookupData lookupData = ((LookupResult)optResult.get()).getLookupData();
                    String redirectUrl = options.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
                    return CompletableFuture.completedFuture(Optional.of(new URL(redirectUrl)));
                }
                catch (Exception e) {
                    LOG.warn("internalGetWebServiceUrl [{}]", (Object)e.getMessage(), (Object)e);
                    return CompletableFuture.completedFuture(Optional.empty());
                }
            }
            CompletableFuture<Optional<LookupResult>> future = ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? this.loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) : this.findBrokerServiceUrl(bundle, options);
            return future.thenApply(lookupResult -> {
                if (lookupResult.isPresent()) {
                    try {
                        LookupData lookupData = ((LookupResult)lookupResult.get()).getLookupData();
                        String redirectUrl = options.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
                        return Optional.of(new URL(redirectUrl));
                    }
                    catch (Exception e) {
                        LOG.warn("internalGetWebServiceUrl [{}]", (Object)e.getMessage(), (Object)e);
                    }
                }
                return Optional.empty();
            });
        });
    }

    public void registerBootstrapNamespaces() throws PulsarServerException {
        String brokerId = this.pulsar.getBrokerId();
        if (this.registerNamespace(NamespaceService.getHeartbeatNamespace(brokerId, this.config), true)) {
            LOG.info("added heartbeat namespace name in local cache: ns={}", (Object)NamespaceService.getHeartbeatNamespace(brokerId, this.config));
        }
        if (this.registerNamespace(NamespaceService.getHeartbeatNamespaceV2(brokerId, this.config), true)) {
            LOG.info("added heartbeat namespace name in local cache: ns={}", (Object)NamespaceService.getHeartbeatNamespaceV2(brokerId, this.config));
        }
        for (String namespace : this.config.getBootstrapNamespaces()) {
            if (!this.registerNamespace(NamespaceName.get((String)namespace), false)) continue;
            LOG.info("added bootstrap namespace name in local cache: ns={}", (Object)namespace);
        }
    }

    public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) throws PulsarServerException {
        try {
            NamespaceEphemeralData otherData;
            NamespaceBundle nsFullBundle = this.bundleFactory.getFullBundle(nsname);
            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
                ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
                otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
            } else {
                otherData = this.ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
            }
            if (StringUtils.equals((CharSequence)this.pulsar.getBrokerServiceUrl(), (CharSequence)otherData.getNativeUrl()) || StringUtils.equals((CharSequence)this.pulsar.getBrokerServiceUrlTls(), (CharSequence)otherData.getNativeUrlTls())) {
                if (nsFullBundle != null) {
                    this.pulsar.loadNamespaceTopics(nsFullBundle);
                }
                return true;
            }
            String msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s", nsname, StringUtils.defaultString((String)this.pulsar.getBrokerServiceUrl(), (String)this.pulsar.getBrokerServiceUrlTls()), StringUtils.defaultString((String)otherData.getNativeUrl(), (String)otherData.getNativeUrlTls()));
            if (!ensureOwned) {
                LOG.info(msg);
                return false;
            }
            throw new IllegalStateException(msg);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new PulsarServerException((Throwable)e);
        }
    }

    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, LookupOptions options) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("findBrokerServiceUrl: {} - options: {}", (Object)bundle, (Object)options);
        }
        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap = options.isAuthoritative() ? this.findingBundlesAuthoritative : this.findingBundlesNotAuthoritative;
        return (CompletableFuture)targetMap.computeIfAbsent((Object)bundle, k -> {
            CompletableFuture future = new CompletableFuture();
            ((CompletableFuture)this.ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
                if (nsData.isEmpty()) {
                    if (options.isReadOnly()) {
                        future.complete(Optional.empty());
                    } else {
                        this.pulsar.getExecutor().execute(() -> this.searchForCandidateBroker(bundle, future, options));
                    }
                } else if (((NamespaceEphemeralData)nsData.get()).isDisabled()) {
                    future.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Namespace bundle {} already owned by {} ", (Object)bundle, nsData);
                    }
                    if (options.hasAdvertisedListenerName()) {
                        AdvertisedListener listener = ((NamespaceEphemeralData)nsData.get()).getAdvertisedListeners().get(options.getAdvertisedListenerName());
                        if (listener == null) {
                            future.completeExceptionally(new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
                        } else {
                            URI url = listener.getBrokerServiceUrl();
                            URI urlTls = listener.getBrokerServiceUrlTls();
                            future.complete(Optional.of(new LookupResult((NamespaceEphemeralData)nsData.get(), url == null ? null : url.toString(), urlTls == null ? null : urlTls.toString())));
                        }
                    } else {
                        future.complete(Optional.of(new LookupResult((NamespaceEphemeralData)nsData.get())));
                    }
                }
            })).exceptionally(exception -> {
                LOG.warn("Failed to check owner for bundle {}: {}", new Object[]{bundle, exception.getMessage(), exception});
                future.completeExceptionally((Throwable)exception);
                return null;
            });
            future.whenComplete((r, t) -> this.pulsar.getExecutor().execute(() -> targetMap.remove((Object)bundle)));
            return future;
        });
    }

    private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<Optional<LookupResult>> lookupFuture, LookupOptions options) {
        String candidateBroker;
        if (null == this.pulsar.getLeaderElectionService()) {
            LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", (Object)bundle);
            lookupFuture.completeExceptionally(new IllegalStateException("The leader election has not yet been completed!"));
            return;
        }
        LeaderElectionService les = this.pulsar.getLeaderElectionService();
        if (les == null) {
            LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup. NamespaceBundle[{}]", (Object)bundle);
            lookupFuture.complete(Optional.empty());
            return;
        }
        boolean authoritativeRedirect = les.isLeader();
        try {
            String broker;
            candidateBroker = NamespaceService.checkHeartbeatNamespace(bundle);
            if (candidateBroker == null) {
                candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(bundle);
            }
            if (candidateBroker == null && (broker = NamespaceService.getSLAMonitorBrokerName(bundle)) != null && this.isBrokerActive(broker)) {
                candidateBroker = broker;
            }
            if (candidateBroker == null) {
                Optional<LeaderBroker> currentLeader = this.pulsar.getLeaderElectionService().getCurrentLeader();
                if (options.isAuthoritative()) {
                    candidateBroker = this.pulsar.getBrokerId();
                } else {
                    boolean makeLoadManagerDecisionOnThisBroker;
                    LoadManager loadManager = this.loadManager.get();
                    boolean bl = makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
                    if (!makeLoadManagerDecisionOnThisBroker) {
                        boolean leaderBrokerActive;
                        boolean bl2 = leaderBrokerActive = currentLeader.isPresent() && this.isBrokerActive(currentLeader.get().getBrokerId());
                        if (!leaderBrokerActive) {
                            makeLoadManagerDecisionOnThisBroker = true;
                            if (currentLeader.isEmpty()) {
                                LOG.warn("The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", (Object)bundle);
                            } else {
                                LOG.warn("The current leader broker {} isn't active. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", (Object)currentLeader.get(), (Object)bundle);
                            }
                        }
                    }
                    if (makeLoadManagerDecisionOnThisBroker) {
                        Optional<String> availableBroker = this.getLeastLoadedFromLoadManager(bundle);
                        if (availableBroker.isEmpty()) {
                            LOG.warn("Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[{}]", (Object)bundle);
                            lookupFuture.complete(Optional.empty());
                            return;
                        }
                        candidateBroker = availableBroker.get();
                        authoritativeRedirect = true;
                    } else {
                        candidateBroker = currentLeader.get().getBrokerId();
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Error when searching for candidate broker to acquire {}: {}", new Object[]{bundle, e.getMessage(), e});
            lookupFuture.completeExceptionally(e);
            return;
        }
        try {
            Objects.requireNonNull(candidateBroker);
            if (candidateBroker.equals(this.pulsar.getBrokerId())) {
                ((CompletableFuture)this.ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
                    if (ownerInfo.isDisabled()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Namespace bundle {} is currently being unloaded", (Object)bundle);
                        }
                        lookupFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded", bundle)));
                    } else {
                        if (options.isLoadTopicsInBundle()) {
                            this.pulsar.loadNamespaceTopics(bundle);
                        }
                        if (options.hasAdvertisedListenerName()) {
                            AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
                            if (listener == null) {
                                lookupFuture.completeExceptionally(new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
                            } else {
                                URI url = listener.getBrokerServiceUrl();
                                URI urlTls = listener.getBrokerServiceUrlTls();
                                lookupFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData)ownerInfo, url == null ? null : url.toString(), urlTls == null ? null : urlTls.toString())));
                            }
                        } else {
                            lookupFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData)ownerInfo)));
                        }
                    }
                })).exceptionally(exception -> {
                    LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", (Object)bundle, exception);
                    lookupFuture.completeExceptionally(new PulsarServerException("Failed to acquire ownership for namespace bundle " + bundle, exception));
                    return null;
                });
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", (Object)candidateBroker, (Object)bundle);
                }
                ((CompletableFuture)this.createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName()).thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))).exceptionally(ex -> {
                    lookupFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        }
        catch (Exception e) {
            LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", new Object[]{bundle, e.getMessage(), e});
            lookupFuture.completeExceptionally(e);
        }
    }

    public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect, String advertisedListenerName) {
        CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<LookupResult>();
        try {
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)candidateBroker), (String)"Lookup broker can't be null %s", (Object)candidateBroker);
            String path = "/loadbalance/brokers/" + candidateBroker;
            ((CompletableFuture)this.localBrokerDataCache.get(path).thenAccept(reportData -> {
                if (reportData.isPresent()) {
                    LocalBrokerData lookupData = (LocalBrokerData)reportData.get();
                    if (StringUtils.isNotBlank((CharSequence)advertisedListenerName)) {
                        AdvertisedListener listener = (AdvertisedListener)lookupData.getAdvertisedListeners().get(advertisedListenerName);
                        if (listener == null) {
                            lookupFuture.completeExceptionally(new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
                        } else {
                            URI url = listener.getBrokerServiceUrl();
                            URI urlTls = listener.getBrokerServiceUrlTls();
                            lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(), lookupData.getWebServiceUrlTls(), url == null ? null : url.toString(), urlTls == null ? null : urlTls.toString(), authoritativeRedirect));
                        }
                    } else {
                        lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(), lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(), lookupData.getPulsarServiceUrlTls(), authoritativeRedirect));
                    }
                } else {
                    lookupFuture.completeExceptionally(new MetadataStoreException.NotFoundException(path));
                }
            })).exceptionally(ex -> {
                lookupFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception e) {
            lookupFuture.completeExceptionally(e);
        }
        return lookupFuture;
    }

    public boolean isBrokerActive(String candidateBroker) {
        Set<String> availableBrokers = this.getAvailableBrokers();
        if (availableBrokers.contains(candidateBroker)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Broker {} is available for.", (Object)candidateBroker);
            }
            return true;
        }
        LOG.warn("Broker {} couldn't be found in available brokers {}", (Object)candidateBroker, (Object)String.join((CharSequence)",", availableBrokers));
        return false;
    }

    private Set<String> getAvailableBrokers() {
        try {
            return this.loadManager.get().getAvailableBrokers();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
        Optional<ResourceUnit> leastLoadedBroker = this.loadManager.get().getLeastLoaded(serviceUnit);
        if (leastLoadedBroker.isEmpty()) {
            LOG.warn("No broker is available for {}", (Object)serviceUnit);
            return Optional.empty();
        }
        String lookupAddress = leastLoadedBroker.get().getResourceId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", (Object)this.pulsar.getBrokerId(), (Object)lookupAddress);
        }
        return Optional.of(lookupAddress);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
        return this.unloadNamespaceBundle(bundle, Optional.empty());
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
        return this.unloadNamespaceBundle(bundle, destinationBroker, this.config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker, long timeout, TimeUnit timeoutUnit) {
        return this.unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
        return this.unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit, boolean closeWithoutWaitingClientDisconnect) {
        return this.unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, closeWithoutWaitingClientDisconnect);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker, long timeout, TimeUnit timeoutUnit, boolean closeWithoutWaitingClientDisconnect) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return ExtensibleLoadManagerImpl.get(this.loadManager.get()).unloadNamespaceBundleAsync(bundle, destinationBroker);
        }
        OwnedBundle ob = this.ownershipCache.getOwnedBundle(bundle);
        if (ob == null) {
            return FutureUtil.failedFuture((Throwable)new IllegalStateException("Bundle " + bundle + " is not currently owned"));
        }
        return ob.handleUnloadRequest(this.pulsar, timeout, timeoutUnit, closeWithoutWaitingClientDisconnect);
    }

    public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
        return this.pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
    }

    public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
        return ((CompletableFuture)this.pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().getIsolationDataPoliciesAsync(this.pulsar.getConfiguration().getClusterName()).thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))).thenCompose(namespaceIsolationPolicies -> {
            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
                ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
                Map<String, NamespaceOwnershipStatus> statusMap = extensibleLoadManager.getOwnedServiceUnits().stream().collect(Collectors.toMap(NamespaceBundle::toString, bundle -> this.getNamespaceOwnershipStatus(true, namespaceIsolationPolicies.getPolicyByNamespace(bundle.getNamespaceObject()))));
                return CompletableFuture.completedFuture(statusMap);
            }
            Collection<CompletableFuture<OwnedBundle>> futures = this.ownershipCache.getOwnedBundlesAsync().values();
            return FutureUtil.waitForAll(futures).thenApply(__ -> futures.stream().map(CompletableFuture::join).collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(), bundle -> this.getNamespaceOwnershipStatus(bundle.isActive(), namespaceIsolationPolicies.getPolicyByNamespace(bundle.getNamespaceBundle().getNamespaceObject())))));
        });
    }

    private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive, NamespaceIsolationPolicy nsIsolationPolicy) {
        NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, isActive);
        if (nsIsolationPolicy == null) {
            return nsOwnedStatus;
        }
        nsOwnedStatus.is_controlled = true;
        if (nsIsolationPolicy.isPrimaryBroker(this.pulsar.getAdvertisedAddress())) {
            nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
        } else if (nsIsolationPolicy.isSecondaryBroker(this.pulsar.getAdvertisedAddress())) {
            nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
        }
        return nsOwnedStatus;
    }

    public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
        try {
            CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = this.ownershipCache.getOwnerAsync(bundle);
            if (nsDataFuture != null) {
                Optional nsData = nsDataFuture.getNow(null);
                if (nsData != null && nsData.isPresent()) {
                    return ((NamespaceEphemeralData)nsData.get()).isDisabled();
                }
                return false;
            }
            return false;
        }
        catch (Exception e) {
            LOG.warn("Exception in getting ownership info for service unit {}: {}", new Object[]{bundle, e.getMessage(), e});
            return false;
        }
    }

    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm, List<Long> boundaries) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return ExtensibleLoadManagerImpl.get(this.loadManager.get()).splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
        }
        CompletableFuture<Void> unloadFuture = new CompletableFuture<Void>();
        AtomicInteger counter = new AtomicInteger(7);
        this.splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);
        return unloadFuture;
    }

    void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, CompletableFuture<Void> completionFuture, NamespaceBundleSplitAlgorithm splitAlgorithm, List<Long> boundaries) {
        BundleSplitOption bundleSplitOption = this.getBundleSplitOption(bundle, boundaries, this.config);
        splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
            CompletableFuture updateFuture = new CompletableFuture();
            if (ex == null) {
                if (splitBoundaries == null || splitBoundaries.size() == 0) {
                    LOG.info("[{}] No valid boundary found in {} to split bundle {}", new Object[]{bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange()});
                    completionFuture.complete(null);
                    return;
                }
                try {
                    this.bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, (List<Long>)splitBoundaries).thenAccept(splitBundles -> {
                        if (splitBundles == null) {
                            String msg = String.format("bundle %s not found under namespace", bundle.toString());
                            LOG.warn(msg);
                            updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
                            return;
                        }
                        Objects.requireNonNull((NamespaceBundles)splitBundles.getLeft());
                        Objects.requireNonNull((List)splitBundles.getRight());
                        Preconditions.checkArgument((((List)splitBundles.getRight()).size() == splitBoundaries.size() + 1 ? 1 : 0) != 0, (Object)("bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles"));
                        NamespaceName nsname = bundle.getNamespaceObject();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}", new Object[]{nsname.toString(), bundle.getBundleRange(), counter.get(), splitBundles.getRight()});
                        }
                        try {
                            for (NamespaceBundle sBundle : (List)splitBundles.getRight()) {
                                Objects.requireNonNull(this.ownershipCache.tryAcquiringOwnership(sBundle));
                            }
                            ((CompletableFuture)((CompletableFuture)this.updateNamespaceBundles(nsname, (NamespaceBundles)splitBundles.getLeft()).thenCompose(__ -> this.updateNamespaceBundlesForPolicies(nsname, (NamespaceBundles)splitBundles.getLeft()))).thenRun(() -> {
                                this.bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
                                updateFuture.complete((List)splitBundles.getRight());
                            })).exceptionally(ex1 -> {
                                String msg = String.format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", nsname.toString(), bundle.getBundleRange(), ex1.getMessage());
                                LOG.warn(msg);
                                updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg, ex1.getCause()));
                                return null;
                            });
                        }
                        catch (Exception e) {
                            String msg = String.format("failed to acquire ownership of split bundle for namespace [%s], %s", nsname.toString(), e.getMessage());
                            LOG.warn(msg, (Throwable)e);
                            updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg, e));
                        }
                    });
                }
                catch (Exception e) {
                    updateFuture.completeExceptionally(e);
                }
            } else {
                updateFuture.completeExceptionally((Throwable)ex);
            }
            updateFuture.whenCompleteAsync((r, t) -> {
                if (t != null) {
                    if (t.getCause() instanceof MetadataStoreException.BadVersionException && counter.decrementAndGet() >= 0) {
                        this.pulsar.getExecutor().schedule(() -> this.pulsar.getOrderedExecutor().execute(() -> this.splitAndOwnBundleOnceAndRetry(bundle, unload, counter, completionFuture, splitAlgorithm, boundaries)), 100L, TimeUnit.MILLISECONDS);
                    } else if (t instanceof IllegalArgumentException) {
                        completionFuture.completeExceptionally((Throwable)t);
                    } else {
                        String msg2 = String.format(" %s not success update nsBundles, counter %d, reason %s", bundle.toString(), counter.get(), t.getMessage());
                        LOG.warn(msg2);
                        completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg2));
                    }
                    return;
                }
                ((CompletableFuture)this.getOwnershipCache().updateBundleState(bundle, false).thenRun(() -> {
                    this.pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                    this.loadManager.get().setLoadReportForceUpdateFlag();
                    this.pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
                    completionFuture.complete(null);
                    if (unload) {
                        r.forEach(this::unloadNamespaceBundle);
                    }
                    this.onNamespaceBundleSplit(bundle);
                })).exceptionally(e -> {
                    String msg1 = String.format("failed to disable bundle %s under namespace [%s] with error %s", bundle.getNamespaceObject().toString(), bundle, ex.getMessage());
                    LOG.warn(msg1, e);
                    completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg1));
                    return null;
                });
            }, (Executor)this.pulsar.getOrderedExecutor());
        });
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(NamespaceBundle bundle, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm, List<Long> boundaries) {
        CompletableFuture<List<Long>> splitBoundary = this.getSplitBoundary(bundle, boundaries, nsBundleSplitAlgorithm);
        return splitBoundary.thenCompose(splitBoundaries -> {
            if (splitBoundaries == null || splitBoundaries.size() == 0) {
                LOG.info("[{}] No valid boundary found in {} to split bundle {}", new Object[]{bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange()});
                return CompletableFuture.completedFuture(null);
            }
            return this.pulsar.getNamespaceService().getNamespaceBundleFactory().splitBundles(bundle, splitBoundaries.size() + 1, (List<Long>)splitBoundaries);
        });
    }

    public CompletableFuture<List<Long>> getSplitBoundary(NamespaceBundle bundle, List<Long> boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
        BundleSplitOption bundleSplitOption = this.getBundleSplitOption(bundle, boundaries, this.config);
        return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
    }

    private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle, List<Long> boundaries, ServiceConfiguration config) {
        BundleSplitOption bundleSplitOption;
        if (config.getDefaultNamespaceBundleSplitAlgorithm().equals("flow_or_qps_equally_divide")) {
            Map<String, TopicStatsImpl> topicStatsMap = this.pulsar.getBrokerService().getTopicStats(bundle);
            bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, topicStatsMap, config.getLoadBalancerNamespaceBundleMaxMsgRate(), config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), config.getFlowOrQpsDifferenceThresholdPercentage());
        } else {
            bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
        }
        return bundleSplitOption;
    }

    public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
        NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
        if (algorithm == null) {
            algorithm = NamespaceBundleSplitAlgorithm.of(this.pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
        }
        if (algorithm == null) {
            algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
        }
        return algorithm;
    }

    public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname, NamespaceBundles nsBundles) {
        Objects.requireNonNull(nsname);
        Objects.requireNonNull(nsBundles);
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> {
            if (policies.isPresent()) {
                return this.pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, oldPolicies -> {
                    oldPolicies.bundles = nsBundles.getBundlesData();
                    return oldPolicies;
                });
            }
            LOG.error("Policies of namespace {} is not exist!", (Object)nsname);
            Policies newPolicies = new Policies();
            newPolicies.bundles = nsBundles.getBundlesData();
            return this.pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, newPolicies);
        });
    }

    public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
        Objects.requireNonNull(nsname);
        Objects.requireNonNull(nsBundles);
        LocalPolicies localPolicies = nsBundles.toLocalPolicies();
        return this.pulsar.getPulsarResources().getLocalPolicies().setLocalPoliciesWithVersion(nsname, localPolicies, nsBundles.getVersion());
    }

    public OwnershipCache getOwnershipCache() {
        return this.ownershipCache;
    }

    public Set<NamespaceBundle> getOwnedServiceUnits() {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
            return extensibleLoadManager.getOwnedServiceUnits();
        }
        return this.ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle).collect(Collectors.toSet());
    }

    public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
        return this.isServiceUnitOwnedAsync(suName).get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName) {
        if (suName instanceof TopicName) {
            return this.isTopicOwnedAsync((TopicName)suName);
        }
        if (suName instanceof NamespaceName) {
            return this.isNamespaceOwnedAsync((NamespaceName)suName);
        }
        if (suName instanceof NamespaceBundle) {
            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
                return this.loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
            }
            return CompletableFuture.completedFuture(this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle)suName));
        }
        return FutureUtil.failedFuture((Throwable)new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
    }

    @Deprecated
    public boolean isServiceUnitActive(TopicName topicName) {
        try {
            return this.isServiceUnitActiveAsync(topicName).get(this.pulsar.getConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", (Object)topicName, (Object)e);
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return this.getBundleAsync(topicName).thenCompose(bundle -> this.loadManager.get().checkOwnershipAsync((Optional<ServiceUnitId>)Optional.of(topicName), (ServiceUnitId)bundle));
        }
        return this.getBundleAsync(topicName).thenCompose(bundle -> {
            Optional<CompletableFuture<OwnedBundle>> optionalFuture = this.ownershipCache.getOwnedBundleAsync((NamespaceBundle)bundle);
            if (optionalFuture.isEmpty()) {
                return CompletableFuture.completedFuture(false);
            }
            return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
        });
    }

    private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return this.getFullBundleAsync(fqnn).thenCompose(bundle -> this.loadManager.get().checkOwnershipAsync(Optional.empty(), (ServiceUnitId)bundle));
        }
        return this.getFullBundleAsync(fqnn).thenApply(bundle -> this.ownershipCache.getOwnedBundle((NamespaceBundle)bundle) != null);
    }

    private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return this.getBundleAsync(topic).thenCompose(bundle -> this.loadManager.get().checkOwnershipAsync((Optional<ServiceUnitId>)Optional.of(topic), (ServiceUnitId)bundle));
        }
        return this.getBundleAsync(topic).thenApply(this.ownershipCache::isNamespaceBundleOwned);
    }

    public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return this.getBundleAsync(topicName).thenCompose(bundle -> this.loadManager.get().checkOwnershipAsync((Optional<ServiceUnitId>)Optional.of(topicName), (ServiceUnitId)bundle));
        }
        return this.getBundleAsync(topicName).thenCompose(this.ownershipCache::checkOwnershipAsync);
    }

    public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
        CompletableFuture<Void> future;
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
            future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
        } else {
            future = this.ownershipCache.removeOwnership(nsBundle);
        }
        return future.thenRun(() -> this.bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
    }

    public void onNamespaceBundleOwned(NamespaceBundle bundle) {
        for (NamespaceBundleOwnershipListener bundleOwnedListener : this.bundleOwnershipListeners) {
            this.notifyNamespaceBundleOwnershipListener(bundle, bundleOwnedListener);
        }
    }

    public void onNamespaceBundleUnload(NamespaceBundle bundle) {
        for (NamespaceBundleOwnershipListener bundleOwnedListener : this.bundleOwnershipListeners) {
            try {
                if (!bundleOwnedListener.test(bundle)) continue;
                bundleOwnedListener.unLoad(bundle);
            }
            catch (Throwable t) {
                LOG.error("Call bundle {} ownership listener error", (Object)bundle, (Object)t);
            }
        }
    }

    public void onNamespaceBundleSplit(NamespaceBundle bundle) {
        for (NamespaceBundleSplitListener bundleSplitListener : this.bundleSplitListeners) {
            try {
                if (!bundleSplitListener.test(bundle)) continue;
                bundleSplitListener.onSplit(bundle);
            }
            catch (Throwable t) {
                LOG.error("Call bundle {} split listener {} error", new Object[]{bundle, bundleSplitListener, t});
            }
        }
    }

    public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener ... listeners) {
        Objects.requireNonNull(listeners);
        for (NamespaceBundleOwnershipListener listener : listeners) {
            if (listener == null) continue;
            this.bundleOwnershipListeners.add(listener);
        }
        this.getOwnedServiceUnits().forEach(bundle -> this.notifyNamespaceBundleOwnershipListener((NamespaceBundle)bundle, listeners));
    }

    public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener ... listeners) {
        Objects.requireNonNull(listeners);
        for (NamespaceBundleSplitListener listener : listeners) {
            if (listener == null) continue;
            this.bundleSplitListeners.add(listener);
        }
    }

    private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle, NamespaceBundleOwnershipListener ... listeners) {
        if (listeners != null) {
            for (NamespaceBundleOwnershipListener listener : listeners) {
                try {
                    if (!listener.test(bundle)) continue;
                    listener.onLoad(bundle);
                }
                catch (Throwable t) {
                    LOG.error("Call bundle {} ownership listener error", (Object)bundle, (Object)t);
                }
            }
        }
    }

    public NamespaceBundleFactory getNamespaceBundleFactory() {
        return this.bundleFactory;
    }

    public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception {
        return this.getBundle(topicName);
    }

    public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
        return this.getListOfPersistentTopics(namespaceName).thenCombine(this.getListOfNonPersistentTopics(namespaceName), ListUtils::union);
    }

    public CompletableFuture<List<String>> getFullListOfPartitionedTopic(NamespaceName namespaceName) {
        NamespaceResources.PartitionedTopicResources partitionedTopicResources = this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
        return partitionedTopicResources.listPartitionedTopicsAsync(namespaceName, TopicDomain.persistent).thenCombine((CompletionStage)partitionedTopicResources.listPartitionedTopicsAsync(namespaceName, TopicDomain.non_persistent), ListUtils::union);
    }

    public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(NamespaceBundle bundle) {
        return ((CompletableFuture)this.getFullListOfTopics(bundle.getNamespaceObject()).thenCompose(topics -> CompletableFuture.completedFuture(topics.stream().filter(topic -> bundle.includes(TopicName.get((String)topic))).collect(Collectors.toList())))).thenCombine(this.getAllPartitions(bundle.getNamespaceObject()).thenCompose(topics -> CompletableFuture.completedFuture(topics.stream().filter(topic -> bundle.includes(TopicName.get((String)topic))).collect(Collectors.toList()))), (left, right) -> {
            for (String topic : right) {
                if (left.contains(topic)) continue;
                left.add(topic);
            }
            return left;
        });
    }

    public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
        CompletableFuture future = topic.isPersistent() && topic.isPartitioned() ? this.pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic) : CompletableFuture.completedFuture(false);
        return future.thenCompose(found -> {
            if (found != null && found.booleanValue()) {
                return CompletableFuture.completedFuture(true);
            }
            return this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get((String)topic.getPartitionedTopicName())).thenCompose(metadata -> {
                if (metadata.partitions > 0) {
                    return CompletableFuture.completedFuture(true);
                }
                if (topic.isPersistent()) {
                    return this.pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
                }
                CompletableFuture nonPersistentTopicFuture = (CompletableFuture)this.pulsar.getBrokerService().getTopics().get((Object)topic.toString());
                if (nonPersistentTopicFuture == null) {
                    return CompletableFuture.completedFuture(false);
                }
                return nonPersistentTopicFuture.thenApply(Optional::isPresent);
            });
        });
    }

    public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, CommandGetTopicsOfNamespace.Mode mode) {
        switch (mode) {
            case ALL: {
                return this.getFullListOfTopics(namespaceName);
            }
            case NON_PERSISTENT: {
                return this.getListOfNonPersistentTopics(namespaceName);
            }
        }
        return this.getListOfPersistentTopics(namespaceName);
    }

    public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
        return this.getPartitions(namespaceName, TopicDomain.persistent).thenCombine(this.getPartitions(namespaceName, TopicDomain.non_persistent), ListUtils::union);
    }

    public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName, TopicDomain topicDomain) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting children from partitioned-topics now: {} - {}", (Object)namespaceName, (Object)topicDomain);
        }
        return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().listPartitionedTopicsAsync(namespaceName, topicDomain).thenCompose(topics -> {
            CompletableFuture result = new CompletableFuture();
            List resultPartitions = Collections.synchronizedList(new ArrayList());
            if (CollectionUtils.isNotEmpty((Collection)topics)) {
                ArrayList<CompletableFuture<List<String>>> futures = new ArrayList<CompletableFuture<List<String>>>();
                for (String topic : topics) {
                    CompletableFuture<List<String>> future = this.getPartitionsForTopic(TopicName.get((String)topic));
                    futures.add(future);
                    future.thenAccept(resultPartitions::addAll);
                }
                FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
                    if (ex != null) {
                        result.completeExceptionally((Throwable)ex);
                    } else {
                        result.complete(resultPartitions);
                    }
                });
            } else {
                result.complete(resultPartitions);
            }
            return result;
        });
    }

    private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicName) {
        return this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(meta -> {
            ArrayList<String> result = new ArrayList<String>();
            for (int i = 0; i < meta.partitions; ++i) {
                result.add(topicName.getPartition(i).toString());
            }
            return CompletableFuture.completedFuture(result);
        });
    }

    public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
        return this.pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
    }

    public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(this.pulsar, namespaceName, true).thenCompose(peerClusterData -> {
            if (peerClusterData != null) {
                return this.getNonPersistentTopicsFromPeerCluster((ClusterDataImpl)peerClusterData, namespaceName);
            }
            ArrayList topics = new ArrayList();
            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.pulsar.getBrokerService().getMultiLayerTopicMap();
            synchronized (concurrentOpenHashMap) {
                if (this.pulsar.getBrokerService().getMultiLayerTopicMap().containsKey((Object)namespaceName.toString())) {
                    ((ConcurrentOpenHashMap)this.pulsar.getBrokerService().getMultiLayerTopicMap().get((Object)namespaceName.toString())).forEach((__, bundle) -> bundle.forEach((topicName, topic) -> {
                        if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) {
                            topics.add(topicName);
                        }
                    }));
                }
            }
            topics.sort(null);
            return CompletableFuture.completedFuture(topics);
        });
    }

    private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl peerClusterData, NamespaceName namespace) {
        PulsarClientImpl client = this.getNamespaceClient(peerClusterData);
        return client.getLookup().getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT, null, null).thenApply(GetTopicsResult::getTopics);
    }

    public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
        PulsarClientImpl client = (PulsarClientImpl)this.namespaceClients.get((Object)cluster);
        if (client != null) {
            return client;
        }
        return (PulsarClientImpl)this.namespaceClients.computeIfAbsent((Object)cluster, key -> {
            try {
                ClientBuilder clientBuilder = PulsarClient.builder().memoryLimit(0L, SizeUnit.BYTES).enableTcpNoDelay(false).statsInterval(0L, TimeUnit.SECONDS);
                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties((Properties)this.config.getProperties(), (String)"brokerClient_"));
                clientBuilder.connectionMaxIdleSeconds(-1);
                if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    clientBuilder.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                if (this.pulsar.getConfiguration().isTlsEnabled()) {
                    clientBuilder.serviceUrl(StringUtils.isNotBlank((CharSequence)cluster.getBrokerServiceUrlTls()) ? cluster.getBrokerServiceUrlTls() : cluster.getServiceUrlTls()).enableTls(true).tlsTrustCertsFilePath(this.pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()).allowTlsInsecureConnection(this.pulsar.getConfiguration().isTlsAllowInsecureConnection()).enableTlsHostnameVerification(this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                } else {
                    clientBuilder.serviceUrl(StringUtils.isNotBlank((CharSequence)cluster.getBrokerServiceUrl()) ? cluster.getBrokerServiceUrl() : cluster.getServiceUrl());
                }
                ClientConfigurationData conf = ((ClientBuilderImpl)clientBuilder).getClientConfigurationData();
                return this.pulsar.createClientImpl(conf);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
            return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle).thenCompose(lookupData -> lookupData.map(brokerLookupData -> CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData()))).orElseGet(() -> CompletableFuture.completedFuture(Optional.empty())));
        }
        return this.ownershipCache.getOwnerAsync(bundle);
    }

    public boolean checkOwnershipPresent(NamespaceBundle bundle) throws Exception {
        return this.checkOwnershipPresentAsync(bundle).get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle).thenApply(Optional::isPresent);
        }
        return this.getOwnerAsync(bundle).thenApply(Optional::isPresent);
    }

    public void unloadSLANamespace() throws Exception {
        NamespaceName namespaceName = NamespaceService.getSLAMonitorNamespace(this.pulsar.getBrokerId(), this.config);
        LOG.info("Checking owner for SLA namespace {}", (Object)namespaceName);
        NamespaceBundle nsFullBundle = this.getFullBundle(namespaceName);
        if (!this.checkOwnershipPresent(nsFullBundle)) {
            return;
        }
        LOG.info("Trying to unload SLA namespace {}", (Object)namespaceName);
        PulsarAdmin adminClient = this.pulsar.getAdminClient();
        adminClient.namespaces().unload(namespaceName.toString());
        LOG.info("Namespace {} unloaded successfully", (Object)namespaceName);
    }

    public static NamespaceName getHeartbeatNamespace(String lookupBroker, ServiceConfiguration config) {
        return NamespaceName.get((String)String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
    }

    public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, ServiceConfiguration config) {
        return NamespaceName.get((String)String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
    }

    public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) {
        return NamespaceName.get((String)String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
    }

    public static String checkHeartbeatNamespace(ServiceUnitId ns) {
        Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
        if (m.matches()) {
            LOG.debug("Heartbeat namespace matched the lookup namespace {}", (Object)ns.getNamespaceObject().toString());
            return m.group(1);
        }
        return null;
    }

    public static String checkHeartbeatNamespaceV2(ServiceUnitId ns) {
        Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
        if (m.matches()) {
            LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", (Object)ns.getNamespaceObject().toString());
            return m.group(1);
        }
        return null;
    }

    public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
        Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
        if (m.matches()) {
            return m.group(1);
        }
        return null;
    }

    public static boolean isSystemServiceNamespace(String namespace) {
        return NamespaceName.SYSTEM_NAMESPACE.toString().equals(namespace) || SLA_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
    }

    public static boolean isSLAOrHeartbeatNamespace(String namespace) {
        return SLA_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
    }

    public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
        String namespace = ns.getNamespaceObject().toString();
        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
    }

    public boolean registerSLANamespace() throws PulsarServerException {
        String brokerId = this.pulsar.getBrokerId();
        boolean isNameSpaceRegistered = this.registerNamespace(NamespaceService.getSLAMonitorNamespace(brokerId, this.config), false);
        if (isNameSpaceRegistered) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", (Object)NamespaceService.getSLAMonitorNamespace(brokerId, this.config));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("SLA Monitoring not owned by the broker: ns={}", (Object)NamespaceService.getSLAMonitorNamespace(brokerId, this.config));
        }
        return isNameSpaceRegistered;
    }

    @Override
    public void close() {
        this.namespaceClients.forEach((cluster, client) -> {
            try {
                client.shutdown();
            }
            catch (PulsarClientException e) {
                LOG.warn("Error shutting down namespace client for cluster {}", cluster, (Object)e);
            }
        });
    }
}

