/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.ServiceUnitZkUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OwnershipCache {
    private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
    private final String ownerBrokerUrl;
    private final String ownerBrokerUrlTls;
    private NamespaceEphemeralData selfOwnerInfo;
    private final NamespaceEphemeralData selfOwnerInfoDisabled;
    private final ZooKeeperDataCache<NamespaceEphemeralData> ownershipReadOnlyCache;
    private final AsyncLoadingCache<String, OwnedBundle> ownedBundlesCache;
    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
    private final ZooKeeperCache localZkCache;
    private final NamespaceBundleFactory bundleFactory;
    private final NamespaceService namespaceService;
    private final PulsarService pulsar;

    public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory, NamespaceService namespaceService) {
        this.namespaceService = namespaceService;
        this.pulsar = pulsar;
        this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
        this.selfOwnerInfo = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
        this.selfOwnerInfoDisabled = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
        this.bundleFactory = bundleFactory;
        this.localZkCache = pulsar.getLocalZkCache();
        this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
        this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor()).recordStats().buildAsync((AsyncCacheLoader)new OwnedServiceUnitCacheLoader());
        CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache);
    }

    private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> resolveOwnership(String path) {
        return this.ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> {
            Map.Entry ownerDataWithStat;
            Stat stat;
            if (optionalOwnerDataWithStat.isPresent() && (stat = (Stat)(ownerDataWithStat = (Map.Entry)optionalOwnerDataWithStat.get()).getValue()).getEphemeralOwner() == this.localZkCache.getZooKeeper().getSessionId()) {
                LOG.info("Successfully reestablish ownership of {}", (Object)path);
                OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, this.bundleFactory));
                if (this.selfOwnerInfo.getNativeUrl().equals(((NamespaceEphemeralData)ownerDataWithStat.getKey()).getNativeUrl())) {
                    this.ownedBundlesCache.put((Object)path, CompletableFuture.completedFuture(ownedBundle));
                }
                this.ownershipReadOnlyCache.invalidate(path);
                this.namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
            }
            return optionalOwnerDataWithStat;
        });
    }

    public CompletableFuture<Boolean> checkOwnership(NamespaceBundle bundle) {
        OwnedBundle ownedBundle = this.getOwnedBundle(bundle);
        if (ownedBundle != null) {
            return CompletableFuture.completedFuture(true);
        }
        String bundlePath = ServiceUnitZkUtils.path(bundle);
        return this.resolveOwnership(bundlePath).thenApply(optionalOwnedDataWithStat -> {
            if (!optionalOwnedDataWithStat.isPresent()) {
                return false;
            }
            Stat stat = (Stat)((Map.Entry)optionalOwnedDataWithStat.get()).getValue();
            return stat.getEphemeralOwner() == this.localZkCache.getZooKeeper().getSessionId();
        });
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle suname) {
        String path = ServiceUnitZkUtils.path(suname);
        CompletableFuture ownedBundleFuture = this.ownedBundlesCache.getIfPresent((Object)path);
        if (ownedBundleFuture != null) {
            return ownedBundleFuture.thenApply(serviceUnit -> Optional.of(serviceUnit.isActive() ? this.selfOwnerInfo : this.selfOwnerInfoDisabled));
        }
        return this.resolveOwnership(path).thenApply(optional -> optional.map(Map.Entry::getKey));
    }

    public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle bundle) throws Exception {
        String path = ServiceUnitZkUtils.path(bundle);
        CompletableFuture<NamespaceEphemeralData> future = new CompletableFuture<NamespaceEphemeralData>();
        if (!this.refreshSelfOwnerInfo()) {
            future.completeExceptionally(new RuntimeException("Namespace service does not ready for acquiring ownership"));
            return future;
        }
        LOG.info("Trying to acquire ownership of {}", (Object)bundle);
        ((CompletableFuture)this.ownedBundlesCache.get((Object)path).thenAccept(namespaceBundle -> {
            LOG.info("Successfully acquired ownership of {}", (Object)path);
            this.namespaceService.onNamespaceBundleOwned(bundle);
            future.complete(this.selfOwnerInfo);
        })).exceptionally(exception -> {
            if (exception instanceof CompletionException && exception.getCause() instanceof KeeperException.NodeExistsException) {
                ((CompletableFuture)this.resolveOwnership(path).thenAccept(optionalOwnerDataWithStat -> {
                    if (optionalOwnerDataWithStat.isPresent()) {
                        Map.Entry ownerDataWithStat = (Map.Entry)optionalOwnerDataWithStat.get();
                        NamespaceEphemeralData ownerData = (NamespaceEphemeralData)ownerDataWithStat.getKey();
                        Stat stat = (Stat)ownerDataWithStat.getValue();
                        if (stat.getEphemeralOwner() != this.localZkCache.getZooKeeper().getSessionId()) {
                            LOG.info("Failed to acquire ownership of {} -- Already owned by broker {}", (Object)path, (Object)ownerData);
                        }
                        future.complete(ownerData);
                    } else {
                        LOG.info("Failed to acquire ownership of {} -- Already owned by unknown broker", (Object)path);
                        future.completeExceptionally((Throwable)exception);
                    }
                })).exceptionally(ex -> {
                    LOG.warn("Failed to check ownership of {}: {}", new Object[]{bundle, ex.getMessage(), ex});
                    future.completeExceptionally((Throwable)exception);
                    return null;
                });
            } else {
                LOG.warn("Failed to acquire ownership of {}: {}", new Object[]{bundle, exception.getMessage(), exception});
                future.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return future;
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        String key = ServiceUnitZkUtils.path(bundle);
        this.localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
            this.ownedBundlesCache.synchronous().invalidate((Object)key);
            this.ownershipReadOnlyCache.invalidate(key);
            this.namespaceService.onNamespaceBundleUnload(bundle);
            if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
                LOG.info("[{}] Removed zk lock for service unit: {}", (Object)key, (Object)KeeperException.Code.get((int)rc));
                result.complete(null);
            } else {
                LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", (Object)key, (Object)KeeperException.Code.get((int)rc));
                result.completeExceptionally(KeeperException.create((int)rc));
            }
        }, null);
        return result;
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
        ArrayList allFutures = Lists.newArrayList();
        for (NamespaceBundle bundle : bundles.getBundles()) {
            if (this.getOwnedBundle(bundle) == null) continue;
            allFutures.add(this.removeOwnership(bundle));
        }
        return FutureUtil.waitForAll((List)allFutures);
    }

    public Map<String, OwnedBundle> getOwnedBundles() {
        return this.ownedBundlesCache.synchronous().asMap();
    }

    public boolean isNamespaceBundleOwned(NamespaceBundle bundle) {
        OwnedBundle ownedBundle = this.getOwnedBundle(bundle);
        return ownedBundle != null && ownedBundle.isActive();
    }

    public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
        CompletableFuture future = this.ownedBundlesCache.getIfPresent((Object)ServiceUnitZkUtils.path(bundle));
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return (OwnedBundle)future.join();
        }
        return null;
    }

    public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
        String path = ServiceUnitZkUtils.path(bundle);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.updateBundleState(bundle, false).thenRun(() -> {
            byte[] value;
            try {
                value = this.jsonMapper.writeValueAsBytes((Object)this.selfOwnerInfoDisabled);
            }
            catch (JsonProcessingException e) {
                future.completeExceptionally(e);
                return;
            }
            this.localZkCache.getZooKeeper().setData(path, value, -1, (rc, path1, ctx, stat) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    this.ownershipReadOnlyCache.invalidate(path1);
                    future.complete(null);
                } else {
                    future.completeExceptionally(KeeperException.create((int)rc));
                }
            }, null);
        })).exceptionally(ex -> {
            LOG.warn("Failed to update state on namespace bundle {}: {}", new Object[]{bundle, ex.getMessage(), ex});
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    public CompletableFuture<Void> updateBundleState(NamespaceBundle bundle, boolean isActive) {
        String path = ServiceUnitZkUtils.path(bundle);
        CompletableFuture f = this.ownedBundlesCache.getIfPresent((Object)path);
        if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
            return f.thenAccept(ob -> ob.setActive(isActive));
        }
        return CompletableFuture.completedFuture(null);
    }

    public void invalidateLocalOwnerCache() {
        this.ownedBundlesCache.synchronous().invalidateAll();
    }

    public NamespaceEphemeralData getSelfOwnerInfo() {
        return this.selfOwnerInfo;
    }

    public synchronized boolean refreshSelfOwnerInfo() {
        if (this.selfOwnerInfo.getNativeUrl() == null) {
            this.selfOwnerInfo = new NamespaceEphemeralData(this.pulsar.getSafeBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), false, this.pulsar.getAdvertisedListeners());
        }
        return this.selfOwnerInfo.getNativeUrl() != null;
    }

    private class OwnedServiceUnitCacheLoader
    implements AsyncCacheLoader<String, OwnedBundle> {
        private OwnedServiceUnitCacheLoader() {
        }

        public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Executor executor) {
            byte[] znodeContent;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Acquiring zk lock on namespace {}", (Object)namespaceBundleZNode);
            }
            try {
                znodeContent = OwnershipCache.this.jsonMapper.writeValueAsBytes((Object)OwnershipCache.this.selfOwnerInfo);
            }
            catch (JsonProcessingException e) {
                return FutureUtil.failedFuture((Throwable)e);
            }
            CompletableFuture<OwnedBundle> future = new CompletableFuture<OwnedBundle>();
            ZkUtils.asyncCreateFullPathOptimistic((ZooKeeper)OwnershipCache.this.localZkCache.getZooKeeper(), (String)namespaceBundleZNode, (byte[])znodeContent, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully acquired zk lock on {}", (Object)namespaceBundleZNode);
                    }
                    OwnershipCache.this.ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
                    future.complete(new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, OwnershipCache.this.bundleFactory)));
                } else {
                    future.completeExceptionally(KeeperException.create((int)rc));
                }
            }, null);
            return future;
        }
    }
}

