package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService.class */
public class NamespaceService {
    private final ServiceConfiguration config;
    private final AtomicReference<LoadManager> loadManager;
    private final PulsarService pulsar;
    private final OwnershipCache ownershipCache;
    private final NamespaceBundleFactory bundleFactory;
    private int uncountedNamespaces;
    private final String host;
    private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
    public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
    public static final String SLA_NAMESPACE_FMT = "sla-monitor/%s/%s:%s";
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
    public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile("sla-monitor/[^/]+/([^:]+:\\d+)");

    /* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService$AddressType.class */
    public enum AddressType {
        BROKER_URL,
        LOOKUP_URL;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static AddressType[] valuesCustom() {
            AddressType[] valuesCustom = values();
            int length = valuesCustom.length;
            AddressType[] addressTypeArr = new AddressType[length];
            System.arraycopy(valuesCustom, 0, addressTypeArr, 0, length);
            return addressTypeArr;
        }
    }

    public NamespaceService(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.host = pulsarService.getAdvertisedAddress();
        this.config = pulsarService.getConfiguration();
        this.loadManager = pulsarService.getLoadManager();
        ServiceUnitZkUtils.initZK(pulsarService.getLocalZkCache().getZooKeeper(), pulsarService.getBrokerServiceUrl());
        this.bundleFactory = new NamespaceBundleFactory(pulsarService, Hashing.crc32());
        this.ownershipCache = new OwnershipCache(pulsarService, this.bundleFactory);
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(DestinationName destinationName, boolean z) {
        return getBundleAsync(destinationName).thenCompose(namespaceBundle -> {
            return findBrokerServiceUrl(namespaceBundle, z, false);
        });
    }

    public CompletableFuture<NamespaceBundle> getBundleAsync(DestinationName destinationName) {
        return this.bundleFactory.getBundlesAsync(destinationName.getNamespaceObject()).thenApply(namespaceBundles -> {
            return namespaceBundles.findBundle(destinationName);
        });
    }

    public NamespaceBundle getBundle(DestinationName destinationName) throws Exception {
        return this.bundleFactory.getBundles(destinationName.getNamespaceObject()).findBundle(destinationName);
    }

    public int getBundleCount(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getBundles(namespaceName).size();
    }

    private NamespaceBundle getFullBundle(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getFullBundle(namespaceName);
    }

    public Optional<URL> getWebServiceUrl(ServiceUnitId serviceUnitId, boolean z, boolean z2, boolean z3) throws Exception {
        if (serviceUnitId instanceof DestinationName) {
            DestinationName destinationName = (DestinationName) serviceUnitId;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting web service URL of destination: {} - auth: {}", destinationName, Boolean.valueOf(z));
            }
            return internalGetWebServiceUrl(getBundle(destinationName), z, z2, z3).get();
        }
        if (serviceUnitId instanceof NamespaceName) {
            return internalGetWebServiceUrl(getFullBundle((NamespaceName) serviceUnitId), z, z2, z3).get();
        }
        if (serviceUnitId instanceof NamespaceBundle) {
            return internalGetWebServiceUrl((NamespaceBundle) serviceUnitId, z, z2, z3).get();
        }
        throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + serviceUnitId.getClass().getName());
    }

    private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle namespaceBundle, boolean z, boolean z2, boolean z3) {
        return findBrokerServiceUrl(namespaceBundle, z, z3).thenApply(optional -> {
            if (optional.isPresent()) {
                try {
                    LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                    return Optional.of(new URL(z2 ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl()));
                } catch (Exception e) {
                    LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
                }
            }
            return Optional.empty();
        });
    }

    public void registerBootstrapNamespaces() throws PulsarServerException {
        if (registerNamespace(getHeartbeatNamespace(this.host, this.config), true)) {
            this.uncountedNamespaces++;
            LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(this.host, this.config));
        }
        for (String str : this.config.getBootstrapNamespaces()) {
            if (registerNamespace(str, false)) {
                LOG.info("added bootstrap namespace name in local cache: ns={}", str);
            }
        }
    }

    private boolean registerNamespace(String str, boolean z) throws PulsarServerException {
        String brokerServiceUrl = this.pulsar.getBrokerServiceUrl();
        try {
            NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get(str));
            String nativeUrl = this.ownershipCache.tryAcquiringOwnership(fullBundle).get().getNativeUrl();
            if (brokerServiceUrl.equals(nativeUrl)) {
                if (fullBundle == null) {
                    return true;
                }
                this.pulsar.loadNamespaceDestinations(fullBundle);
                return true;
            }
            String format = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s", str, brokerServiceUrl, nativeUrl);
            if (z) {
                throw new IllegalStateException(format);
            }
            LOG.info(format);
            return false;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new PulsarServerException(e);
        }
    }

    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle namespaceBundle, boolean z, boolean z2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("findBrokerServiceUrl: {} - read-only: {}", namespaceBundle, Boolean.valueOf(z2));
        }
        CompletableFuture<Optional<LookupResult>> completableFuture = new CompletableFuture<>();
        this.ownershipCache.getOwnerAsync(namespaceBundle).thenAccept(optional -> {
            if (!optional.isPresent()) {
                if (z2) {
                    completableFuture.complete(Optional.empty());
                    return;
                } else {
                    this.pulsar.getExecutor().execute(() -> {
                        searchForCandidateBroker(namespaceBundle, completableFuture, z);
                    });
                    return;
                }
            }
            if (((NamespaceEphemeralData) optional.get()).isDisabled()) {
                completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded", namespaceBundle)));
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Namespace bundle {} already owned by {} ", namespaceBundle, optional);
            }
            completableFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData) optional.get())));
        }).exceptionally(th -> {
            LOG.warn("Failed to check owner for bundle {}: {}", new Object[]{namespaceBundle, th.getMessage(), th});
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private void searchForCandidateBroker(NamespaceBundle namespaceBundle, CompletableFuture<Optional<LookupResult>> completableFuture, boolean z) {
        String sLAMonitorBrokerName;
        try {
            String checkHeartbeatNamespace = checkHeartbeatNamespace(namespaceBundle);
            if (checkHeartbeatNamespace == null && (sLAMonitorBrokerName = getSLAMonitorBrokerName(namespaceBundle)) != null && isBrokerActive(sLAMonitorBrokerName)) {
                checkHeartbeatNamespace = sLAMonitorBrokerName;
            }
            if (checkHeartbeatNamespace == null) {
                if (!this.loadManager.get().isCentralized() || this.pulsar.getLeaderElectionService().isLeader()) {
                    Optional<String> leastLoadedFromLoadManager = getLeastLoadedFromLoadManager(namespaceBundle);
                    if (!leastLoadedFromLoadManager.isPresent()) {
                        completableFuture.complete(Optional.empty());
                        return;
                    }
                    checkHeartbeatNamespace = leastLoadedFromLoadManager.get();
                } else {
                    checkHeartbeatNamespace = z ? this.pulsar.getWebServiceAddress() : this.pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
                }
            }
            try {
                Preconditions.checkNotNull(checkHeartbeatNamespace);
                if (this.pulsar.getWebServiceAddress().equals(checkHeartbeatNamespace)) {
                    this.ownershipCache.tryAcquiringOwnership(namespaceBundle).thenAccept(namespaceEphemeralData -> {
                        if (!namespaceEphemeralData.isDisabled()) {
                            this.pulsar.loadNamespaceDestinations(namespaceBundle);
                            completableFuture.complete(Optional.of(new LookupResult(namespaceEphemeralData)));
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Namespace bundle {} is currently being unloaded", namespaceBundle);
                            }
                            completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded", namespaceBundle)));
                        }
                    }).exceptionally(th -> {
                        LOG.warn("Failed to acquire ownership for namespace bundle {}: ", new Object[]{namespaceBundle, th.getMessage(), th});
                        completableFuture.completeExceptionally(new PulsarServerException("Failed to acquire ownership for namespace bundle " + namespaceBundle, th));
                        return null;
                    });
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", checkHeartbeatNamespace, namespaceBundle);
                }
                createLookupResult(checkHeartbeatNamespace).thenAccept(lookupResult -> {
                    completableFuture.complete(Optional.of(lookupResult));
                }).exceptionally(th2 -> {
                    completableFuture.completeExceptionally(th2);
                    return null;
                });
            } catch (Exception e) {
                LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
                completableFuture.completeExceptionally(e);
            }
        } catch (Exception e2) {
            LOG.warn("Error when searching for candidate broker to acquire {}: {}", new Object[]{namespaceBundle, e2.getMessage(), e2});
            completableFuture.completeExceptionally(e2);
        }
    }

    protected CompletableFuture<LookupResult> createLookupResult(String str) throws Exception {
        CompletableFuture<LookupResult> completableFuture = new CompletableFuture<>();
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "Lookup broker can't be null " + str);
            URI uri = new URI(str);
            String format = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), Integer.valueOf(uri.getPort()));
            this.pulsar.getLocalZkCache().getDataAsync(format, this.pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    completableFuture.completeExceptionally(new KeeperException.NoNodeException(format));
                } else {
                    ServiceLookupData serviceLookupData = (ServiceLookupData) optional.get();
                    completableFuture.complete(new LookupResult(serviceLookupData.getWebServiceUrl(), serviceLookupData.getWebServiceUrlTls(), serviceLookupData.getPulsarServiceUrl(), serviceLookupData.getPulsarServiceUrlTls()));
                }
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private boolean isBrokerActive(String str) throws KeeperException, InterruptedException {
        for (String str2 : this.pulsar.getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
            if (str.equals("http://" + str2)) {
                if (!LOG.isDebugEnabled()) {
                    return true;
                }
                LOG.debug("Broker {} found for SLA Monitoring Namespace", str2);
                return true;
            }
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Broker not found for SLA Monitoring Namespace {}", String.valueOf(str) + ":" + this.config.getWebServicePort());
        return false;
    }

    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnitId) throws Exception {
        Optional<ResourceUnit> leastLoaded = this.loadManager.get().getLeastLoaded(serviceUnitId);
        if (!leastLoaded.isPresent()) {
            LOG.warn("No broker is available for {}", serviceUnitId);
            return Optional.empty();
        }
        String resourceId = leastLoaded.get().getResourceId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", this.pulsar.getWebServiceAddress(), resourceId);
        }
        return Optional.of(resourceId);
    }

    public void unloadNamespaceBundle(NamespaceBundle namespaceBundle) throws Exception {
        unloadNamespaceBundle(namespaceBundle, 5L, TimeUnit.MINUTES);
    }

    public void unloadNamespaceBundle(NamespaceBundle namespaceBundle, long j, TimeUnit timeUnit) throws Exception {
        ((OwnedBundle) Preconditions.checkNotNull(this.ownershipCache.getOwnedBundle(namespaceBundle))).handleUnloadRequest(this.pulsar, j, timeUnit);
    }

    public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus() throws Exception {
        NamespaceIsolationPolicies localNamespaceIsolationPolicies = getLocalNamespaceIsolationPolicies();
        HashMap hashMap = new HashMap();
        for (OwnedBundle ownedBundle : this.ownershipCache.getOwnedBundles().values()) {
            hashMap.put(ownedBundle.getNamespaceBundle().toString(), getNamespaceOwnershipStatus(ownedBundle, localNamespaceIsolationPolicies.getPolicyByNamespace(ownedBundle.getNamespaceBundle().getNamespaceObject())));
        }
        return hashMap;
    }

    private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle ownedBundle, NamespaceIsolationPolicy namespaceIsolationPolicy) {
        NamespaceOwnershipStatus namespaceOwnershipStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, ownedBundle.isActive());
        if (namespaceIsolationPolicy == null) {
            return namespaceOwnershipStatus;
        }
        namespaceOwnershipStatus.is_controlled = true;
        if (namespaceIsolationPolicy.isPrimaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.primary;
        } else if (namespaceIsolationPolicy.isSecondaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.secondary;
        }
        return namespaceOwnershipStatus;
    }

    private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
        return (NamespaceIsolationPolicies) this.pulsar.getConfigurationCache().namespaceIsolationPoliciesCache().get(AdminResource.path("clusters", this.pulsar.getConfiguration().getClusterName(), "namespaceIsolationPolicies")).orElseGet(() -> {
            return new NamespaceIsolationPolicies();
        });
    }

    public boolean isNamespaceBundleDisabled(NamespaceBundle namespaceBundle) throws Exception {
        Optional<NamespaceEphemeralData> now;
        try {
            CompletableFuture<Optional<NamespaceEphemeralData>> ownerAsync = this.ownershipCache.getOwnerAsync(namespaceBundle);
            if (ownerAsync == null || (now = ownerAsync.getNow(null)) == null || !now.isPresent()) {
                return false;
            }
            return now.get().isDisabled();
        } catch (Exception e) {
            LOG.warn("Exception in getting ownership info for service unit {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
            return false;
        }
    }

    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle namespaceBundle, boolean z) throws Exception {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        splitAndOwnBundleOnceAndRetry(namespaceBundle, z, new AtomicInteger(7), completableFuture);
        return completableFuture;
    }

    void splitAndOwnBundleOnceAndRetry(NamespaceBundle namespaceBundle, boolean z, AtomicInteger atomicInteger, CompletableFuture<Void> completableFuture) {
        CompletableFuture completableFuture2 = new CompletableFuture();
        Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = this.bundleFactory.splitBundles(namespaceBundle, 2);
        if (splitBundles != null) {
            Preconditions.checkNotNull((NamespaceBundles) splitBundles.getLeft());
            Preconditions.checkNotNull((List) splitBundles.getRight());
            Preconditions.checkArgument(((List) splitBundles.getRight()).size() == 2, "bundle has to be split in two bundles");
            NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
            if (LOG.isDebugEnabled()) {
                Logger logger = LOG;
                Object[] objArr = new Object[5];
                objArr[0] = namespaceObject.toString();
                objArr[1] = namespaceBundle.getBundleRange();
                objArr[2] = Integer.valueOf(atomicInteger.get());
                objArr[3] = splitBundles != null ? ((NamespaceBundle) ((List) splitBundles.getRight()).get(0)).getBundleRange() : "null splittedBundles";
                objArr[4] = splitBundles != null ? ((NamespaceBundle) ((List) splitBundles.getRight()).get(1)).getBundleRange() : "null splittedBundles";
                logger.debug("[{}] splitAndOwnBundleOnce: {}, counter: {},  2 bundles: {}, {}", objArr);
            }
            try {
                Iterator it = ((List) splitBundles.getRight()).iterator();
                while (it.hasNext()) {
                    Preconditions.checkNotNull(this.ownershipCache.tryAcquiringOwnership((NamespaceBundle) it.next()));
                }
                updateNamespaceBundles(namespaceObject, (NamespaceBundles) splitBundles.getLeft(), (i, str, obj, stat) -> {
                    if (i == KeeperException.Code.OK.intValue()) {
                        this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
                        completableFuture2.complete((NamespaceBundles) splitBundles.getLeft());
                    } else if (i == KeeperException.Code.BADVERSION.intValue()) {
                        KeeperException create = KeeperException.create(KeeperException.Code.get(i));
                        LOG.warn(String.format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s, counter: %d", namespaceObject.toString(), namespaceBundle.getBundleRange(), create.getMessage(), Integer.valueOf(atomicInteger.get())));
                        completableFuture2.completeExceptionally(new BrokerServiceException.ServerMetadataException(create));
                    } else {
                        String format = String.format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", namespaceObject.toString(), namespaceBundle.getBundleRange(), KeeperException.create(KeeperException.Code.get(i)).getMessage());
                        LOG.warn(format);
                        completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                    }
                });
            } catch (Exception e) {
                String format = String.format("failed to acquire ownership of split bundle for namespace [%s], %s", namespaceObject.toString(), e.getMessage());
                LOG.warn(format, e);
                completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
            }
        } else {
            String format2 = String.format("bundle %s not found under namespace", namespaceBundle.toString());
            LOG.warn(format2);
            completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format2));
        }
        completableFuture2.whenComplete((namespaceBundles, th) -> {
            if (th != null) {
                if ((th instanceof BrokerServiceException.ServerMetadataException) && atomicInteger.decrementAndGet() >= 0) {
                    this.pulsar.getOrderedExecutor().submit(new SafeRunnable() { // from class: org.apache.pulsar.broker.namespace.NamespaceService.1
                        public void safeRun() {
                            NamespaceService.this.splitAndOwnBundleOnceAndRetry(namespaceBundle, z, atomicInteger, completableFuture);
                        }
                    });
                    return;
                }
                String format3 = String.format(" %s not success update nsBundles, counter %d, reason %s", namespaceBundle.toString(), Integer.valueOf(atomicInteger.get()), th.getMessage());
                LOG.warn(format3);
                completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format3));
                return;
            }
            try {
                getOwnershipCache().updateBundleState(namespaceBundle, false);
                this.pulsar.getBrokerService().refreshTopicToStatsMaps(namespaceBundle);
                this.loadManager.get().setLoadReportForceUpdateFlag();
                if (z) {
                    namespaceBundles.getBundles().forEach(namespaceBundle2 -> {
                        try {
                            unloadNamespaceBundle(namespaceBundle2);
                        } catch (Exception e2) {
                            LOG.warn("Failed to unload split bundle {}", namespaceBundle2, e2);
                            throw new RuntimeException("Failed to unload split bundle " + namespaceBundle2, e2);
                        }
                    });
                }
                completableFuture.complete(null);
            } catch (Exception e2) {
                String format4 = String.format("failed to disable bundle %s under namespace [%s] with error %s", namespaceBundle.getNamespaceObject().toString(), namespaceBundle.toString(), e2.getMessage());
                LOG.warn(format4, e2);
                completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format4));
            }
        });
    }

    private void updateNamespaceBundles(NamespaceName namespaceName, NamespaceBundles namespaceBundles, AsyncCallback.StatCallback statCallback) throws Exception {
        Preconditions.checkNotNull(namespaceName);
        Preconditions.checkNotNull(namespaceBundles);
        String joinPath = PulsarWebResource.joinPath(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, namespaceName.toString());
        if (!this.pulsar.getLocalZkCacheService().policiesCache().get(joinPath).isPresent()) {
            this.pulsar.getLocalZkCacheService().createPolicies(joinPath, false).get(30L, TimeUnit.SECONDS);
        }
        long version = namespaceBundles.getVersion();
        LocalPolicies localPolicies = new LocalPolicies();
        localPolicies.bundles = NamespaceBundleFactory.getBundlesData(namespaceBundles);
        this.pulsar.getLocalZkCache().getZooKeeper().setData(joinPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies), Math.toIntExact(version), statCallback, null);
        this.pulsar.getLocalZkCacheService().policiesCache().invalidate(joinPath);
    }

    public OwnershipCache getOwnershipCache() {
        return this.ownershipCache;
    }

    public int getTotalServiceUnitsLoaded() {
        return this.ownershipCache.getOwnedBundles().size() - this.uncountedNamespaces;
    }

    public Set<NamespaceBundle> getOwnedServiceUnits() {
        return (Set) this.ownershipCache.getOwnedBundles().values().stream().map((v0) -> {
            return v0.getNamespaceBundle();
        }).collect(Collectors.toSet());
    }

    public boolean isServiceUnitOwned(ServiceUnitId serviceUnitId) throws Exception {
        if (serviceUnitId instanceof DestinationName) {
            return isDestinationOwned((DestinationName) serviceUnitId);
        }
        if (serviceUnitId instanceof NamespaceName) {
            return isNamespaceOwned((NamespaceName) serviceUnitId);
        }
        if (serviceUnitId instanceof NamespaceBundle) {
            return this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle) serviceUnitId);
        }
        throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + serviceUnitId.getClass().getName());
    }

    public boolean isServiceUnitActive(DestinationName destinationName) {
        try {
            return this.ownershipCache.getOwnedBundle(getBundle(destinationName)).isActive();
        } catch (Exception unused) {
            LOG.warn("Unable to find OwnedBundle for fqdn - [{}]", destinationName.toString());
            return false;
        }
    }

    private boolean isNamespaceOwned(NamespaceName namespaceName) throws Exception {
        return this.ownershipCache.getOwnedBundle(getFullBundle(namespaceName)) != null;
    }

    private CompletableFuture<Boolean> isDestinationOwnedAsync(DestinationName destinationName) {
        return getBundleAsync(destinationName).thenApply(namespaceBundle -> {
            return Boolean.valueOf(this.ownershipCache.isNamespaceBundleOwned(namespaceBundle));
        });
    }

    private boolean isDestinationOwned(DestinationName destinationName) throws Exception {
        return this.ownershipCache.getOwnedBundle(getBundle(destinationName)) != null;
    }

    public void removeOwnedServiceUnit(NamespaceName namespaceName) throws Exception {
        this.ownershipCache.removeOwnership(getFullBundle(namespaceName)).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(namespaceName);
    }

    public void removeOwnedServiceUnit(NamespaceBundle namespaceBundle) throws Exception {
        this.ownershipCache.removeOwnership(namespaceBundle).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
    }

    public void removeOwnedServiceUnits(NamespaceName namespaceName, BundlesData bundlesData) throws Exception {
        this.ownershipCache.removeOwnership(this.bundleFactory.getBundles(namespaceName, bundlesData)).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(namespaceName);
    }

    public NamespaceBundleFactory getNamespaceBundleFactory() {
        return this.bundleFactory;
    }

    public ServiceUnitId getServiceUnitId(DestinationName destinationName) throws Exception {
        return getBundle(destinationName);
    }

    public List<String> getListOfDestinations(String str, String str2, String str3) throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            String format = String.format("/managed-ledgers/%s/%s/%s/persistent", str, str2, str3);
            LOG.debug("Getting children from managed-ledgers now: {}", format);
            Iterator it = this.pulsar.getLocalZkCacheService().managedLedgerListCache().get(format).iterator();
            while (it.hasNext()) {
                newArrayList.add(String.format("persistent://%s/%s/%s/%s", str, str2, str3, Codec.decode((String) it.next())));
            }
        } catch (KeeperException.NoNodeException unused) {
        }
        newArrayList.sort(null);
        return newArrayList;
    }

    public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle namespaceBundle) throws Exception {
        return getOwnerAsync(namespaceBundle).get(30L, TimeUnit.SECONDS);
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle namespaceBundle) {
        return this.ownershipCache.getOwnerAsync(namespaceBundle);
    }

    public void unloadSLANamespace() throws Exception {
        String sLAMonitorNamespace = getSLAMonitorNamespace(this.host, this.config);
        LOG.info("Checking owner for SLA namespace {}", sLAMonitorNamespace);
        if (getOwner(getFullBundle(NamespaceName.get(sLAMonitorNamespace))).isPresent()) {
            LOG.info("Trying to unload SLA namespace {}", sLAMonitorNamespace);
            this.pulsar.getAdminClient().namespaces().unload(sLAMonitorNamespace);
            LOG.info("Namespace {} unloaded successfully", sLAMonitorNamespace);
        }
    }

    public static String getHeartbeatNamespace(String str, ServiceConfiguration serviceConfiguration) {
        return String.format(HEARTBEAT_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str, Integer.valueOf(serviceConfiguration.getWebServicePort()));
    }

    public static String getSLAMonitorNamespace(String str, ServiceConfiguration serviceConfiguration) {
        return String.format(SLA_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str, Integer.valueOf(serviceConfiguration.getWebServicePort()));
    }

    public static String checkHeartbeatNamespace(ServiceUnitId serviceUnitId) {
        Matcher matcher = HEARTBEAT_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (!matcher.matches()) {
            return null;
        }
        LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", serviceUnitId.getNamespaceObject().toString());
        return String.format("http://%s", matcher.group(1));
    }

    public static String getSLAMonitorBrokerName(ServiceUnitId serviceUnitId) {
        Matcher matcher = SLA_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (matcher.matches()) {
            return String.format("http://%s", matcher.group(1));
        }
        return null;
    }

    public boolean registerSLANamespace() throws PulsarServerException {
        boolean registerNamespace = registerNamespace(getSLAMonitorNamespace(this.host, this.config), false);
        if (registerNamespace) {
            this.uncountedNamespaces++;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", getSLAMonitorNamespace(this.host, this.config));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(this.host, this.config));
        }
        return registerNamespace;
    }
}
