/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OwnershipCache {
    private static final Logger log = LoggerFactory.getLogger(OwnershipCache.class);
    private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
    private final String ownerBrokerUrl;
    private final String ownerBrokerUrlTls;
    private NamespaceEphemeralData selfOwnerInfo;
    private final NamespaceEphemeralData selfOwnerInfoDisabled;
    private final LockManager<NamespaceEphemeralData> lockManager;
    private final Map<NamespaceBundle, ResourceLock<NamespaceEphemeralData>> locallyAcquiredLocks;
    private final AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache;
    private final NamespaceBundleFactory bundleFactory;
    private final NamespaceService namespaceService;
    private final PulsarService pulsar;

    public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory, NamespaceService namespaceService) {
        this.namespaceService = namespaceService;
        this.pulsar = pulsar;
        this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
        this.selfOwnerInfo = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
        this.selfOwnerInfoDisabled = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
        this.bundleFactory = bundleFactory;
        this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
        this.locallyAcquiredLocks = new ConcurrentHashMap<NamespaceBundle, ResourceLock<NamespaceEphemeralData>>();
        this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor()).recordStats().buildAsync((AsyncCacheLoader)new OwnedServiceUnitCacheLoader());
        CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache);
    }

    public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle bundle) {
        Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = this.getOwnedBundleAsync(bundle);
        if (!ownedBundleFuture.isPresent()) {
            return CompletableFuture.completedFuture(false);
        }
        return ownedBundleFuture.get().thenApply(bd -> bd != null && bd.isActive());
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle suName) {
        CompletableFuture ownedBundleFuture = this.ownedBundlesCache.getIfPresent((Object)suName);
        if (ownedBundleFuture != null) {
            return ownedBundleFuture.thenApply(serviceUnit -> Optional.of(serviceUnit.isActive() ? this.selfOwnerInfo : this.selfOwnerInfoDisabled));
        }
        String path = ServiceUnitUtils.path(suName);
        return this.lockManager.readLock(path);
    }

    public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle bundle) throws Exception {
        if (!this.refreshSelfOwnerInfo()) {
            return FutureUtil.failedFuture((Throwable)new RuntimeException("Namespace service is not ready for acquiring ownership"));
        }
        LOG.info("Trying to acquire ownership of {}", (Object)bundle);
        return this.ownedBundlesCache.get((Object)bundle).thenApply(namespaceBundle -> {
            LOG.info("Successfully acquired ownership of {}", namespaceBundle);
            this.namespaceService.onNamespaceBundleOwned(bundle);
            return this.selfOwnerInfo;
        });
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
        ResourceLock<NamespaceEphemeralData> lock = this.locallyAcquiredLocks.remove(bundle);
        if (lock == null) {
            return CompletableFuture.completedFuture(null);
        }
        return lock.release();
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
        ArrayList allFutures = Lists.newArrayList();
        for (NamespaceBundle bundle : bundles.getBundles()) {
            if (this.getOwnedBundle(bundle) == null) continue;
            allFutures.add(this.removeOwnership(bundle));
        }
        return FutureUtil.waitForAll((Collection)allFutures);
    }

    public Map<NamespaceBundle, OwnedBundle> getOwnedBundles() {
        return this.ownedBundlesCache.synchronous().asMap();
    }

    public Map<NamespaceBundle, CompletableFuture<OwnedBundle>> getOwnedBundlesAsync() {
        return this.ownedBundlesCache.asMap();
    }

    public boolean isNamespaceBundleOwned(NamespaceBundle bundle) {
        OwnedBundle ownedBundle = this.getOwnedBundle(bundle);
        return ownedBundle != null && ownedBundle.isActive();
    }

    public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
        CompletableFuture future = this.ownedBundlesCache.getIfPresent((Object)bundle);
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            try {
                return (OwnedBundle)future.get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException | TimeoutException e) {
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e.getCause());
            }
        }
        return null;
    }

    public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle bundle) {
        return Optional.ofNullable(this.ownedBundlesCache.getIfPresent((Object)bundle));
    }

    @Deprecated
    public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
        return this.updateBundleState(bundle, false).thenCompose(__ -> {
            ResourceLock<NamespaceEphemeralData> lock = this.locallyAcquiredLocks.get(bundle);
            if (lock == null) {
                return CompletableFuture.completedFuture(null);
            }
            return lock.updateValue((Object)this.selfOwnerInfoDisabled);
        });
    }

    public CompletableFuture<Void> updateBundleState(NamespaceBundle bundle, boolean isActive) {
        CompletableFuture f = this.ownedBundlesCache.getIfPresent((Object)bundle);
        if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
            return f.thenAccept(ob -> ob.setActive(isActive));
        }
        return CompletableFuture.completedFuture(null);
    }

    public void invalidateLocalOwnerCache() {
        this.ownedBundlesCache.synchronous().invalidateAll();
    }

    public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) {
        this.ownedBundlesCache.synchronous().invalidate((Object)namespaceBundle);
    }

    @VisibleForTesting
    public Map<NamespaceBundle, ResourceLock<NamespaceEphemeralData>> getLocallyAcquiredLocks() {
        return this.locallyAcquiredLocks;
    }

    public synchronized boolean refreshSelfOwnerInfo() {
        this.selfOwnerInfo = new NamespaceEphemeralData(this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), false, this.pulsar.getAdvertisedListeners());
        return this.selfOwnerInfo.getNativeUrl() != null || this.selfOwnerInfo.getNativeUrlTls() != null;
    }

    private class OwnedServiceUnitCacheLoader
    implements AsyncCacheLoader<NamespaceBundle, OwnedBundle> {
        private OwnedServiceUnitCacheLoader() {
        }

        public CompletableFuture<OwnedBundle> asyncLoad(NamespaceBundle namespaceBundle, Executor executor) {
            return OwnershipCache.this.lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), (Object)OwnershipCache.this.selfOwnerInfo).thenApply(rl -> {
                OwnershipCache.this.locallyAcquiredLocks.put(namespaceBundle, (ResourceLock<NamespaceEphemeralData>)rl);
                rl.getLockExpiredFuture().thenRun(() -> {
                    log.info("Resource lock for {} has expired", (Object)rl.getPath());
                    OwnershipCache.this.namespaceService.unloadNamespaceBundle(namespaceBundle);
                    OwnershipCache.this.ownedBundlesCache.synchronous().invalidate((Object)namespaceBundle);
                    OwnershipCache.this.namespaceService.onNamespaceBundleUnload(namespaceBundle);
                });
                return new OwnedBundle(namespaceBundle);
            });
        }
    }
}

