/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl;

import java.time.Instant;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.pulsar.functions.runtime.shaded.com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import org.apache.pulsar.functions.runtime.shaded.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.functions.runtime.shaded.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.functions.runtime.shaded.com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMetadataStore
implements MetadataStoreExtended,
Consumer<Notification> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMetadataStore.class);
    private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList();
    private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList();
    protected final String metadataStoreName;
    protected final ScheduledExecutorService executor;
    private final AsyncLoadingCache<String, List<String>> childrenCache;
    private final AsyncLoadingCache<String, Boolean> existsCache;
    private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList();
    private final MetadataStoreStats metadataStoreStats;
    private boolean isConnected = true;
    protected final AtomicBoolean isClosed = new AtomicBoolean(false);

    protected abstract CompletableFuture<List<String>> getChildrenFromStore(String var1);

    protected abstract CompletableFuture<Boolean> existsFromStore(String var1);

    protected AbstractMetadataStore(String metadataStoreName) {
        this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : this.getClass().getSimpleName()));
        this.registerListener(this);
        this.childrenCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2L, TimeUnit.MILLISECONDS).buildAsync((AsyncCacheLoader)new AsyncCacheLoader<String, List<String>>(){

            public CompletableFuture<List<String>> asyncLoad(String key, Executor executor) {
                return AbstractMetadataStore.this.getChildrenFromStore(key);
            }

            public CompletableFuture<List<String>> asyncReload(String key, List<String> oldValue, Executor executor) {
                if (AbstractMetadataStore.this.isConnected) {
                    return AbstractMetadataStore.this.getChildrenFromStore(key);
                }
                return CompletableFuture.completedFuture(oldValue);
            }
        });
        this.existsCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2L, TimeUnit.MILLISECONDS).buildAsync((AsyncCacheLoader)new AsyncCacheLoader<String, Boolean>(){

            public CompletableFuture<Boolean> asyncLoad(String key, Executor executor) {
                return AbstractMetadataStore.this.existsFromStore(key);
            }

            public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue, Executor executor) {
                if (AbstractMetadataStore.this.isConnected) {
                    return AbstractMetadataStore.this.existsFromStore(key);
                }
                return CompletableFuture.completedFuture(oldValue);
            }
        });
        this.metadataStoreName = metadataStoreName;
        this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
    }

    @Override
    public CompletableFuture<Void> handleMetadataEvent(MetadataEvent event) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.get(event.getPath()).thenApply(res -> {
            GetResult existingValue;
            Set<CreateOption> options;
            Set<CreateOption> set = options = event.getOptions() != null ? event.getOptions() : Collections.emptySet();
            if (res.isPresent() && this.shouldIgnoreEvent(event, existingValue = (GetResult)res.get())) {
                result.complete(null);
                return result;
            }
            CompletableFuture<Void> updateResult = event.getType() == NotificationType.Deleted ? this.deleteInternal(event.getPath(), Optional.ofNullable(event.getExpectedVersion())) : this.putInternal(event.getPath(), event.getValue(), Optional.ofNullable(event.getExpectedVersion()), options);
            ((CompletableFuture)updateResult.thenApply(stat -> {
                if (log.isDebugEnabled()) {
                    log.debug("successfully updated {}", (Object)event.getPath());
                }
                return result.complete(null);
            })).exceptionally(ex -> {
                log.warn("Failed to update metadata {}", (Object)event.getPath(), (Object)ex.getCause());
                if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                    result.complete(null);
                } else {
                    result.completeExceptionally((Throwable)ex);
                }
                return false;
            });
            return result;
        });
        return result;
    }

    @Deprecated
    protected void registerSyncLister(Optional<MetadataEventSynchronizer> synchronizer) {
        this.registerSyncListener(synchronizer);
    }

    protected void registerSyncListener(Optional<MetadataEventSynchronizer> synchronizer) {
        synchronizer.ifPresent(s -> s.registerSyncListener(this::handleMetadataEvent));
    }

    @VisibleForTesting
    protected boolean shouldIgnoreEvent(MetadataEvent event, GetResult existingValue) {
        long existingVersion = existingValue.getStat() != null ? existingValue.getStat().getVersion() : -1L;
        long existingTimestamp = existingValue.getStat() != null ? existingValue.getStat().getModificationTimestamp() : -1L;
        String sourceClusterName = event.getSourceCluster();
        Set options = event.getOptions() != null ? event.getOptions() : Collections.emptySet();
        String currentClusterName = this.getMetadataEventSynchronizer().get().getClusterName();
        if (sourceClusterName == null || currentClusterName == null) {
            return true;
        }
        if (options.contains((Object)CreateOption.Ephemeral) || event.getOptions().contains((Object)CreateOption.Sequential)) {
            return true;
        }
        if (event.getLastUpdatedTimestamp() < existingTimestamp) {
            return true;
        }
        return currentClusterName.equals(sourceClusterName) ? event.getExpectedVersion() != null && event.getExpectedVersion() > 0L && event.getExpectedVersion() != existingVersion : event.getLastUpdatedTimestamp() == existingTimestamp && sourceClusterName.compareTo(currentClusterName) < 0;
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
        MetadataCacheImpl metadataCache = new MetadataCacheImpl((MetadataStore)this, TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig);
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>((MetadataStore)this, typeRef, cacheConfig);
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>((MetadataStore)this, serde, cacheConfig);
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public CompletableFuture<Optional<GetResult>> get(String path) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        long start = System.currentTimeMillis();
        if (!AbstractMetadataStore.isValidPath(path)) {
            this.metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.storeGet(path).whenComplete((v, t) -> {
            if (t != null) {
                this.metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
            } else {
                this.metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start);
            }
        });
    }

    protected abstract CompletableFuture<Optional<GetResult>> storeGet(String var1);

    @Override
    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion) {
        return this.put(path, value, expectedVersion, EnumSet.noneOf(CreateOption.class));
    }

    @Override
    public final CompletableFuture<List<String>> getChildren(String path) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.childrenCache.get((Object)path);
    }

    @Override
    public final CompletableFuture<Boolean> exists(String path) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.existsCache.get((Object)path);
    }

    @Override
    public void registerListener(Consumer<Notification> listener) {
        if (!this.isClosed()) {
            this.listeners.add(listener);
        }
    }

    protected CompletableFuture<Void> receivedNotification(Notification notification) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                this.listeners.forEach((Consumer<Consumer<Notification>>)((Consumer<Consumer>)listener -> {
                    try {
                        listener.accept(notification);
                    }
                    catch (Throwable t) {
                        log.error("Failed to process metadata store notification", t);
                    }
                }));
                return null;
            }, this.executor);
        }
        catch (RejectedExecutionException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override
    public void accept(Notification n) {
        String path = n.getPath();
        NotificationType type = n.getType();
        if (type == NotificationType.Created || type == NotificationType.Deleted) {
            this.existsCache.synchronous().invalidate((Object)path);
            this.childrenCache.synchronous().invalidate((Object)path);
            String parent = AbstractMetadataStore.parent(path);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate((Object)parent);
            }
        }
        if (type == NotificationType.ChildrenChanged) {
            this.childrenCache.synchronous().invalidate((Object)path);
        }
        if (type == NotificationType.Created || type == NotificationType.Deleted || type == NotificationType.Modified) {
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.accept(n)));
        }
    }

    protected abstract CompletableFuture<Void> storeDelete(String var1, Optional<Long> var2);

    @Override
    public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        long start = System.currentTimeMillis();
        if (!AbstractMetadataStore.isValidPath(path)) {
            this.metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        if (this.getMetadataEventSynchronizer().isPresent()) {
            MetadataEvent event = new MetadataEvent(path, null, new HashSet<CreateOption>(), expectedVersion.orElse(null), Instant.now().toEpochMilli(), this.getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted);
            return ((CompletableFuture)this.getMetadataEventSynchronizer().get().notify(event).thenCompose(__ -> this.deleteInternal(path, expectedVersion))).whenComplete((v, t) -> {
                if (null != t) {
                    this.metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
                } else {
                    this.metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
                }
            });
        }
        return this.deleteInternal(path, expectedVersion).whenComplete((v, t) -> {
            if (null != t) {
                this.metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
            } else {
                this.metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
            }
        });
    }

    private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expectedVersion) {
        return this.storeDelete(path, expectedVersion).thenRun(() -> {
            this.existsCache.synchronous().invalidate((Object)path);
            this.childrenCache.synchronous().invalidate((Object)path);
            String parent = AbstractMetadataStore.parent(path);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate((Object)parent);
            }
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.invalidate(path)));
        });
    }

    @Override
    public CompletableFuture<Void> deleteRecursive(String path) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        return ((CompletableFuture)((CompletableFuture)this.getChildren(path).thenCompose(children -> FutureUtil.waitForAll(children.stream().map(child -> this.deleteRecursive(path + "/" + child)).collect(Collectors.toList())))).thenCompose(__ -> this.exists(path))).thenCompose(exists -> {
            if (exists.booleanValue()) {
                return this.delete(path, Optional.empty());
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    protected abstract CompletableFuture<Stat> storePut(String var1, byte[] var2, Optional<Long> var3, EnumSet<CreateOption> var4);

    @Override
    public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
        if (this.isClosed()) {
            return FutureUtil.failedFuture(new MetadataStoreException.AlreadyClosedException());
        }
        long start = System.currentTimeMillis();
        if (!AbstractMetadataStore.isValidPath(path)) {
            this.metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        HashSet<CreateOption> ops = new HashSet<CreateOption>(options);
        if (this.getMetadataEventSynchronizer().isPresent()) {
            Long version = optExpectedVersion.isPresent() && optExpectedVersion.get() < 0L ? null : (Long)optExpectedVersion.orElse(null);
            MetadataEvent event = new MetadataEvent(path, data, ops, version, Instant.now().toEpochMilli(), this.getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Modified);
            return ((CompletableFuture)this.getMetadataEventSynchronizer().get().notify(event).thenCompose(__ -> this.putInternal(path, data, optExpectedVersion, options))).whenComplete((v, t) -> {
                if (t != null) {
                    this.metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
                } else {
                    int len = data == null ? 0 : data.length;
                    this.metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
                }
            });
        }
        return this.putInternal(path, data, optExpectedVersion, options).whenComplete((v, t) -> {
            if (t != null) {
                this.metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
            } else {
                int len = data == null ? 0 : data.length;
                this.metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
            }
        });
    }

    public final CompletableFuture<Stat> putInternal(String path, byte[] data, Optional<Long> optExpectedVersion, Set<CreateOption> options) {
        return this.storePut(path, data, optExpectedVersion, options != null && !options.isEmpty() ? EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class)).thenApply(stat -> {
            NotificationType type;
            NotificationType notificationType = type = stat.isFirstVersion() ? NotificationType.Created : NotificationType.Modified;
            if (type == NotificationType.Created) {
                this.existsCache.synchronous().invalidate((Object)path);
                String parent = AbstractMetadataStore.parent(path);
                if (parent != null) {
                    this.childrenCache.synchronous().invalidate((Object)parent);
                }
            }
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.refresh(path)));
            return stat;
        });
    }

    @Override
    public void registerSessionListener(Consumer<SessionEvent> listener) {
        this.sessionListeners.add(listener);
    }

    protected void receivedSessionEvent(SessionEvent event) {
        this.isConnected = event.isConnected();
        try {
            this.executor.execute(() -> this.sessionListeners.forEach((Consumer<Consumer<SessionEvent>>)((Consumer<Consumer>)l -> {
                try {
                    l.accept(event);
                }
                catch (Throwable t) {
                    log.warn("Error in processing session event " + event, t);
                }
            })));
        }
        catch (RejectedExecutionException e) {
            log.warn("Error in processing session event " + event, (Throwable)e);
        }
    }

    private boolean isClosed() {
        return this.isClosed.get();
    }

    @Override
    public void close() throws Exception {
        this.executor.shutdownNow();
        this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        this.metadataStoreStats.close();
    }

    @VisibleForTesting
    public void invalidateAll() {
        this.childrenCache.synchronous().invalidateAll();
        this.existsCache.synchronous().invalidateAll();
    }

    public void invalidateCaches(String ... paths) {
        LoadingCache loadingCache = this.childrenCache.synchronous();
        for (String path : paths) {
            loadingCache.invalidate((Object)path);
        }
    }

    @VisibleForTesting
    public void execute(Runnable task, CompletableFuture<?> future) {
        try {
            this.executor.execute(task);
        }
        catch (Throwable t) {
            future.completeExceptionally(t);
        }
    }

    @VisibleForTesting
    public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) {
        try {
            this.executor.execute(task);
        }
        catch (Throwable t) {
            futures.get().forEach(f -> f.completeExceptionally(t));
        }
    }

    protected static String parent(String path) {
        int idx = path.lastIndexOf(47);
        if (idx <= 0) {
            return null;
        }
        return path.substring(0, idx);
    }

    static boolean isValidPath(String path) {
        return StringUtils.equals(path, "/") || StringUtils.isNotBlank(path) && path.startsWith("/") && !path.endsWith("/");
    }

    protected void notifyParentChildrenChanged(String path) {
        String parent = AbstractMetadataStore.parent(path);
        while (parent != null) {
            this.receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
            parent = AbstractMetadataStore.parent(parent);
        }
    }

    public boolean isConnected() {
        return this.isConnected;
    }
}

