/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.beust.jcommander.internal.Lists;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.stats.JvmMetrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadManagerShared {
    public static final Logger log = LoggerFactory.getLogger(LoadManagerShared.class);
    public static final int MIBI = 0x100000;
    private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<Set<String>>(){

        protected Set<String> initialValue() throws Exception {
            return new HashSet<String>();
        }
    };
    private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<Set<String>>(){

        protected Set<String> initialValue() throws Exception {
            return new HashSet<String>();
        }
    };
    public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
    private static final String DEFAULT_DOMAIN = "default";

    private LoadManagerShared() {
    }

    public static void applyNamespacePolicies(ServiceUnitId serviceUnit, SimpleResourceAllocationPolicies policies, Set<String> brokerCandidateCache, Set<String> availableBrokers, BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
        boolean isNonPersistentTopic;
        Set primariesCache = (Set)localPrimariesCache.get();
        primariesCache.clear();
        Set secondaryCache = (Set)localSecondaryCache.get();
        secondaryCache.clear();
        NamespaceName namespace = serviceUnit.getNamespaceObject();
        boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
        boolean bl = isNonPersistentTopic = serviceUnit instanceof NamespaceBundle ? ((NamespaceBundle)serviceUnit).hasNonPersistentTopic() : false;
        if (isIsolationPoliciesPresent) {
            log.debug("Isolation Policies Present for namespace - [{}]", (Object)namespace.toString());
        }
        for (String broker : availableBrokers) {
            URL brokerUrl;
            String brokerUrlString = String.format("http://%s", broker);
            try {
                brokerUrl = new URL(brokerUrlString);
            }
            catch (MalformedURLException e) {
                log.error("Unable to parse brokerUrl from ResourceUnitId - [{}]", (Throwable)e);
                continue;
            }
            if (isIsolationPoliciesPresent) {
                if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
                    primariesCache.add(broker);
                    if (!log.isDebugEnabled()) continue;
                    log.debug("Added Primary Broker - [{}] as possible Candidates for namespace - [{}] with policies", (Object)brokerUrl.getHost(), (Object)namespace.toString());
                    continue;
                }
                if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
                    secondaryCache.add(broker);
                    if (!log.isDebugEnabled()) continue;
                    log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}] with policies", (Object)brokerUrl.getHost(), (Object)namespace.toString());
                    continue;
                }
                if (!log.isDebugEnabled()) continue;
                log.debug("Skipping Broker - [{}] not primary broker and not shared for namespace - [{}] ", (Object)brokerUrl.getHost(), (Object)namespace.toString());
                continue;
            }
            if (isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
                if (!log.isDebugEnabled()) continue;
                log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]", (Object)brokerUrl.getHost(), (Object)namespace.toString());
                continue;
            }
            if (!isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
                if (!log.isDebugEnabled()) continue;
                log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]", (Object)brokerUrl.getHost(), (Object)namespace.toString());
                continue;
            }
            if (!policies.isSharedBroker(brokerUrl.getHost())) continue;
            secondaryCache.add(broker);
            if (!log.isDebugEnabled()) continue;
            log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]", (Object)brokerUrl.getHost(), (Object)namespace.toString());
        }
        if (isIsolationPoliciesPresent) {
            brokerCandidateCache.addAll(primariesCache);
            if (policies.shouldFailoverToSecondaries(namespace, primariesCache.size())) {
                log.debug("Not enough of primaries [{}] available for namespace - [{}], adding shared [{}] as possible candidate owners", new Object[]{primariesCache.size(), namespace.toString(), secondaryCache.size()});
                brokerCandidateCache.addAll(secondaryCache);
            }
        } else {
            log.debug("Policies not present for namespace - [{}] so only considering shared [{}] brokers for possible owner", (Object)namespace.toString(), (Object)secondaryCache.size());
            brokerCandidateCache.addAll(secondaryCache);
        }
    }

    public static void fillNamespaceToBundlesMap(Set<String> bundles, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
        bundles.forEach(bundleName -> {
            String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
            String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
            ((ConcurrentOpenHashSet)target.computeIfAbsent((Object)namespaceName, k -> new ConcurrentOpenHashSet())).add((Object)bundleRange);
        });
    }

    public static String getBundleRangeFromBundleName(String bundleName) {
        int pos = bundleName.lastIndexOf("/");
        Preconditions.checkArgument((pos != -1 ? 1 : 0) != 0);
        return bundleName.substring(pos + 1, bundleName.length());
    }

    public static String getNamespaceNameFromBundleName(String bundleName) {
        int pos = bundleName.lastIndexOf(47);
        Preconditions.checkArgument((pos != -1 ? 1 : 0) != 0);
        return bundleName.substring(0, pos);
    }

    public static SystemResourceUsage getSystemResourceUsage(BrokerHostUsage brokerHostUsage) throws IOException {
        SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
        long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
        long memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        systemResourceUsage.memory.usage = (double)memoryUsageInBytes / 1048576.0;
        systemResourceUsage.memory.limit = (double)maxHeapMemoryInBytes / 1048576.0;
        systemResourceUsage.directMemory.usage = JvmMetrics.getJvmDirectMemoryUsed() / 0x100000L;
        systemResourceUsage.directMemory.limit = PlatformDependent.maxDirectMemory() / 0x100000L;
        return systemResourceUsage;
    }

    public static boolean isLoadSheddingEnabled(PulsarService pulsar) {
        return pulsar.getConfiguration().isLoadBalancerEnabled() && pulsar.getConfiguration().isLoadBalancerSheddingEnabled();
    }

    public static void removeMostServicingBrokersForNamespace(String assignedBundleName, Set<String> candidates, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
        String broker2;
        int bundles;
        if (candidates.isEmpty()) {
            return;
        }
        String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(assignedBundleName);
        int leastBundles = Integer.MAX_VALUE;
        Iterator<String> iterator = candidates.iterator();
        while (iterator.hasNext() && (leastBundles = Math.min(leastBundles, bundles = (int)((ConcurrentOpenHashSet)((ConcurrentOpenHashMap)brokerToNamespaceToBundleRange.computeIfAbsent((Object)(broker2 = iterator.next()), k -> new ConcurrentOpenHashMap())).computeIfAbsent((Object)namespaceName, k -> new ConcurrentOpenHashSet())).size())) != 0) {
        }
        int finalLeastBundles = leastBundles;
        candidates.removeIf(broker -> ((ConcurrentOpenHashSet)((ConcurrentOpenHashMap)brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap())).computeIfAbsent((Object)namespaceName, k -> new ConcurrentOpenHashSet())).size() > (long)finalLeastBundles);
    }

    public static void filterAntiAffinityGroupOwnedBrokers(PulsarService pulsar, String assignedBundleName, Set<String> candidates, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange, Map<String, String> brokerToDomainMap) {
        if (candidates.isEmpty()) {
            return;
        }
        String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(assignedBundleName);
        try {
            Map<String, Integer> brokerToAntiAffinityNamespaceCount = LoadManagerShared.getAntiAffinityNamespaceOwnedBrokers(pulsar, namespaceName, brokerToNamespaceToBundleRange).get(30L, TimeUnit.SECONDS);
            if (brokerToAntiAffinityNamespaceCount == null) {
                return;
            }
            if (pulsar.getConfiguration().isFailureDomainsEnabled()) {
                LoadManagerShared.filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates, brokerToDomainMap);
            }
            int leastNamaespaceCount = Integer.MAX_VALUE;
            for (String broker2 : candidates) {
                if (brokerToAntiAffinityNamespaceCount.containsKey(broker2)) {
                    Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker2);
                    if (namespaceCount == null) {
                        leastNamaespaceCount = 0;
                        break;
                    }
                    leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount);
                    continue;
                }
                leastNamaespaceCount = 0;
                break;
            }
            if (leastNamaespaceCount == 0) {
                candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker) && (Integer)brokerToAntiAffinityNamespaceCount.get(broker) > 0);
            } else {
                int finalLeastNamespaceCount = leastNamaespaceCount;
                candidates.removeIf(broker -> (Integer)brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount);
            }
        }
        catch (Exception e) {
            log.error("Failed to filter anti-affinity group namespace {}", (Object)e.getMessage());
        }
    }

    private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(Map<String, Integer> brokerToAntiAffinityNamespaceCount, Set<String> candidates, Map<String, String> brokerToDomainMap) {
        if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) {
            return;
        }
        HashMap domainNamespaceCount = Maps.newHashMap();
        int leastNamespaceCount = Integer.MAX_VALUE;
        candidates.forEach(broker -> {
            String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN);
            int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0);
            domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count);
        });
        for (Map.Entry domainNsCountEntry : domainNamespaceCount.entrySet()) {
            if ((Integer)domainNsCountEntry.getValue() >= leastNamespaceCount) continue;
            leastNamespaceCount = (Integer)domainNsCountEntry.getValue();
        }
        int finalLeastNamespaceCount = leastNamespaceCount;
        candidates.removeIf(broker -> {
            Integer nsCount = (Integer)domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN));
            return nsCount != null && nsCount != finalLeastNamespaceCount;
        });
    }

    public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(PulsarService pulsar, String namespaceName, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
        CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<Map<String, Integer>>();
        ZooKeeperDataCache policiesCache = pulsar.getConfigurationCache().policiesCache();
        ((CompletableFuture)policiesCache.getAsync(PulsarWebResource.path("policies", namespaceName)).thenAccept(policies -> {
            if (!policies.isPresent() || StringUtils.isBlank((CharSequence)((Policies)policies.get()).antiAffinityGroup)) {
                antiAffinityNsBrokersResult.complete(null);
                return;
            }
            String antiAffinityGroup = ((Policies)policies.get()).antiAffinityGroup;
            ConcurrentHashMap brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap();
            List futures = Lists.newArrayList();
            brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> nsToBundleRange.forEach((ns, bundleRange) -> {
                if (bundleRange.isEmpty()) {
                    return;
                }
                CompletableFuture future = new CompletableFuture();
                futures.add(future);
                ((CompletableFuture)policiesCache.getAsync(PulsarWebResource.path("policies", ns)).thenAccept(nsPolicies -> {
                    if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(((Policies)nsPolicies.get()).antiAffinityGroup)) {
                        brokerToAntiAffinityNamespaceCount.compute(broker, (brokerName, count) -> count == null ? 1 : count + 1);
                    }
                    future.complete(null);
                })).exceptionally(ex -> {
                    future.complete(null);
                    return null;
                });
            }));
            FutureUtil.waitForAll((List)futures).thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
        })).exceptionally(ex -> {
            antiAffinityNsBrokersResult.complete(null);
            return null;
        });
        return antiAffinityNsBrokersResult;
    }

    public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker, PulsarService pulsar, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange, Set<String> candidateBroekrs) throws Exception {
        Map<String, Integer> brokerNamespaceCount = LoadManagerShared.getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace, brokerToNamespaceToBundleRange).get(10L, TimeUnit.SECONDS);
        if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
            int leastNsCount = Integer.MAX_VALUE;
            int currentBrokerNsCount = 0;
            for (String broker : candidateBroekrs) {
                int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
                if (currentBroker.equals(broker)) {
                    currentBrokerNsCount = nsCount;
                }
                if (leastNsCount <= nsCount) continue;
                leastNsCount = nsCount;
            }
            if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
                return true;
            }
            int leastNsOwnerBrokers = 0;
            for (String broker : candidateBroekrs) {
                if (leastNsCount != brokerNamespaceCount.getOrDefault(broker, 0)) continue;
                ++leastNsOwnerBrokers;
            }
            return candidateBroekrs.size() != leastNsOwnerBrokers;
        }
        return true;
    }

    public static void filterBrokersWithLargeTopicCount(Set<String> brokerCandidateCache, LoadData loadData, int loadBalancerBrokerMaxTopics) {
        Set filteredBrokerCandidates = brokerCandidateCache.stream().filter(broker -> {
            BrokerData brokerData = loadData.getBrokerData().get(broker);
            long totalTopics = brokerData != null && brokerData.getPreallocatedBundleData() != null ? brokerData.getPreallocatedBundleData().values().stream().mapToLong(preAllocatedBundle -> preAllocatedBundle.getTopics()).sum() + (long)brokerData.getLocalData().getNumTopics() : 0L;
            return totalTopics <= (long)loadBalancerBrokerMaxTopics;
        }).collect(Collectors.toSet());
        if (!filteredBrokerCandidates.isEmpty()) {
            brokerCandidateCache.clear();
            brokerCandidateCache.addAll(filteredBrokerCandidates);
        }
    }

    public static interface BrokerTopicLoadingPredicate {
        public boolean isEnablePersistentTopics(String var1);

        public boolean isEnableNonPersistentTopics(String var1);
    }
}

