/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.ServiceUnitZkUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OwnershipCache {
    private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
    private final String ownerBrokerUrl;
    private final String ownerBrokerUrlTls;
    private final NamespaceEphemeralData selfOwnerInfo;
    private final NamespaceEphemeralData selfOwnerInfoDisabled;
    private final ZooKeeperDataCache<NamespaceEphemeralData> ownershipReadOnlyCache;
    private final AsyncLoadingCache<String, OwnedBundle> ownedBundlesCache;
    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
    private final ZooKeeperCache localZkCache;
    private final NamespaceBundleFactory bundleFactory;

    public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
        this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
        this.selfOwnerInfo = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
        this.selfOwnerInfoDisabled = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
        this.bundleFactory = bundleFactory;
        this.localZkCache = pulsar.getLocalZkCache();
        this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
        this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor()).buildAsync((AsyncCacheLoader)new OwnedServiceUnitCacheLoader());
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle suname) {
        String path = ServiceUnitZkUtils.path(suname);
        CompletableFuture ownedBundleFuture = this.ownedBundlesCache.getIfPresent((Object)path);
        if (ownedBundleFuture != null) {
            return ownedBundleFuture.thenApply(serviceUnit -> Optional.of(serviceUnit.isActive() ? this.selfOwnerInfo : this.selfOwnerInfoDisabled));
        }
        return this.ownershipReadOnlyCache.getAsync(path);
    }

    public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle bundle) throws Exception {
        String path = ServiceUnitZkUtils.path(bundle);
        CompletableFuture<NamespaceEphemeralData> future = new CompletableFuture<NamespaceEphemeralData>();
        LOG.info("Trying to acquire ownership of {}", (Object)bundle);
        ((CompletableFuture)this.ownedBundlesCache.get((Object)path).thenAccept(namespaceBundle -> {
            LOG.info("Successfully acquired ownership of {}", (Object)path);
            future.complete(this.selfOwnerInfo);
        })).exceptionally(exception -> {
            if (exception instanceof CompletionException && exception.getCause() instanceof KeeperException.NodeExistsException) {
                LOG.info("Failed to acquire ownership of {} -- Already owned by other broker", (Object)path);
                ((CompletableFuture)this.ownershipReadOnlyCache.getAsync(path).thenAccept(ownerData -> {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Found owner for {} at {}", (Object)bundle, ownerData);
                    }
                    if (ownerData.isPresent()) {
                        future.complete((NamespaceEphemeralData)ownerData.get());
                    } else {
                        future.completeExceptionally((Throwable)exception);
                    }
                })).exceptionally(ex -> {
                    LOG.warn("Failed to check ownership of {}: {}", new Object[]{bundle, ex.getMessage(), ex});
                    future.completeExceptionally((Throwable)exception);
                    return null;
                });
            } else {
                LOG.warn("Failed to acquire ownership of {}: {}", new Object[]{bundle, exception.getMessage(), exception});
                this.ownedBundlesCache.synchronous().invalidate((Object)path);
                future.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return future;
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        String key = ServiceUnitZkUtils.path(bundle);
        this.localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
            if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
                LOG.info("[{}] Removed zk lock for service unit: {}", (Object)key, (Object)KeeperException.Code.get((int)rc));
                this.ownedBundlesCache.synchronous().invalidate((Object)key);
                this.ownershipReadOnlyCache.invalidate(key);
                result.complete(null);
            } else {
                LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", (Object)key, (Object)KeeperException.Code.get((int)rc));
                result.completeExceptionally(KeeperException.create((int)rc));
            }
        }, null);
        return result;
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
        ArrayList allFutures = Lists.newArrayList();
        for (NamespaceBundle bundle : bundles.getBundles()) {
            if (this.getOwnedBundle(bundle) == null) continue;
            allFutures.add(this.removeOwnership(bundle));
        }
        return FutureUtil.waitForAll((List)allFutures);
    }

    public Map<String, OwnedBundle> getOwnedBundles() {
        return this.ownedBundlesCache.synchronous().asMap();
    }

    public boolean isNamespaceBundleOwned(NamespaceBundle bundle) {
        OwnedBundle ownedBundle = this.getOwnedBundle(bundle);
        return ownedBundle != null && ownedBundle.isActive();
    }

    public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
        CompletableFuture future = this.ownedBundlesCache.getIfPresent((Object)ServiceUnitZkUtils.path(bundle));
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return (OwnedBundle)future.join();
        }
        return null;
    }

    public void disableOwnership(NamespaceBundle bundle) throws Exception {
        String path = ServiceUnitZkUtils.path(bundle);
        this.updateBundleState(bundle, false);
        this.localZkCache.getZooKeeper().setData(path, this.jsonMapper.writeValueAsBytes((Object)this.selfOwnerInfoDisabled), -1);
        this.ownershipReadOnlyCache.invalidate(path);
    }

    public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws Exception {
        String path = ServiceUnitZkUtils.path(bundle);
        CompletableFuture f = this.ownedBundlesCache.getIfPresent((Object)path);
        if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
            ((OwnedBundle)f.join()).setActive(isActive);
        }
    }

    public NamespaceEphemeralData getSelfOwnerInfo() {
        return this.selfOwnerInfo;
    }

    private class OwnedServiceUnitCacheLoader
    implements AsyncCacheLoader<String, OwnedBundle> {
        private OwnedServiceUnitCacheLoader() {
        }

        public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Executor executor) {
            byte[] znodeContent;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Acquiring zk lock on namespace {}", (Object)namespaceBundleZNode);
            }
            try {
                znodeContent = OwnershipCache.this.jsonMapper.writeValueAsBytes((Object)OwnershipCache.this.selfOwnerInfo);
            }
            catch (JsonProcessingException e) {
                return FutureUtil.failedFuture((Throwable)e);
            }
            CompletableFuture<OwnedBundle> future = new CompletableFuture<OwnedBundle>();
            ZkUtils.asyncCreateFullPathOptimistic((ZooKeeper)OwnershipCache.this.localZkCache.getZooKeeper(), (String)namespaceBundleZNode, (byte[])znodeContent, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully acquired zk lock on {}", (Object)namespaceBundleZNode);
                    }
                    OwnershipCache.this.ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
                    future.complete(new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, OwnershipCache.this.bundleFactory)));
                } else {
                    future.completeExceptionally(KeeperException.create((int)rc));
                }
            }, null);
            return future;
        }
    }
}

