/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.coordination.impl;

import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceLockImpl<T>
implements ResourceLock<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ResourceLockImpl.class);
    private final MetadataStoreExtended store;
    private final MetadataSerde<T> serde;
    private final String path;
    private volatile T value;
    private long version;
    private final CompletableFuture<Void> expiredFuture;
    private boolean revalidateAfterReconnection = false;
    private final Backoff backoff;
    private final FutureUtil.Sequencer<Void> sequencer;
    private final ScheduledExecutorService executor;
    private ScheduledFuture<?> revalidateTask;
    private State state;

    ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path, ScheduledExecutorService executor) {
        this.store = store;
        this.serde = serde;
        this.path = path;
        this.version = -1L;
        this.expiredFuture = new CompletableFuture();
        this.sequencer = FutureUtil.Sequencer.create();
        this.state = State.Init;
        this.executor = executor;
        this.backoff = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(60L, TimeUnit.SECONDS).create();
    }

    @Override
    public synchronized T getValue() {
        return this.value;
    }

    @Override
    public synchronized CompletableFuture<Void> updateValue(T newValue) {
        return this.sequencer.sequential(() -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                if (this.state != State.Valid) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Lock was not in valid state: " + this.state));
                }
                return this.acquire(newValue);
            }
        });
    }

    @Override
    public synchronized CompletableFuture<Void> release() {
        if (this.state == State.Released) {
            return CompletableFuture.completedFuture(null);
        }
        this.state = State.Releasing;
        if (this.revalidateTask != null) {
            this.revalidateTask.cancel(true);
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.store.delete(this.path, Optional.of(this.version)).thenRun(() -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                this.state = State.Released;
            }
            this.expiredFuture.complete(null);
            result.complete(null);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                ResourceLockImpl resourceLockImpl = this;
                synchronized (resourceLockImpl) {
                    this.state = State.Released;
                }
                this.expiredFuture.complete(null);
                result.complete(null);
            } else {
                result.completeExceptionally((Throwable)ex);
            }
            return null;
        });
        return result;
    }

    @Override
    public CompletableFuture<Void> getLockExpiredFuture() {
        return this.expiredFuture;
    }

    @Override
    public String getPath() {
        return this.path;
    }

    public int hashCode() {
        return this.path.hashCode();
    }

    synchronized CompletableFuture<Void> acquire(T newValue) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.acquireWithNoRevalidation(newValue).thenRun(() -> result.complete(null))).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.LockBusyException) {
                ((CompletableFuture)this.revalidate(newValue).thenAccept(__ -> result.complete(null))).exceptionally(ex1 -> {
                    result.completeExceptionally((Throwable)ex1);
                    return null;
                });
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    private CompletableFuture<Void> acquireWithNoRevalidation(T newValue) {
        byte[] payload;
        if (log.isDebugEnabled()) {
            log.debug("acquireWithNoRevalidation,newValue={},version={}", newValue, (Object)this.version);
        }
        try {
            payload = this.serde.serialize(this.path, newValue);
        }
        catch (Throwable t) {
            return FutureUtils.exception(t);
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.store.put(this.path, payload, Optional.of(this.version), EnumSet.of(CreateOption.Ephemeral)).thenAccept(stat -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                this.state = State.Valid;
                this.version = stat.getVersion();
                this.value = newValue;
            }
            log.info("Acquired resource lock on {}", (Object)this.path);
            result.complete(null);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                result.completeExceptionally(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    synchronized void lockWasInvalidated() {
        log.info("Lock on resource {} was invalidated. state {}", (Object)this.path, (Object)this.state);
        this.silentRevalidateOnce();
    }

    synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
        if (this.revalidateAfterReconnection) {
            this.revalidateAfterReconnection = false;
            log.warn("Revalidate lock at {} after reconnection", (Object)this.path);
            return this.silentRevalidateOnce();
        }
        return CompletableFuture.completedFuture(null);
    }

    synchronized CompletableFuture<Void> silentRevalidateOnce() {
        if (this.state != State.Valid) {
            return CompletableFuture.completedFuture(null);
        }
        return ((CompletableFuture)this.sequencer.sequential(() -> this.revalidate(this.value)).thenRun(() -> {
            log.info("Successfully revalidated the lock on {}", (Object)this.path);
            this.backoff.reset();
        })).exceptionally(ex -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                if (realCause instanceof MetadataStoreException.BadVersionException || realCause instanceof MetadataStoreException.LockBusyException) {
                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}", (Object)this.path, (Object)realCause.getMessage());
                    this.state = State.Released;
                    this.expiredFuture.complete(null);
                } else {
                    this.revalidateAfterReconnection = true;
                    long delayMillis = this.backoff.next();
                    log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", new Object[]{this.path, realCause.getMessage(), (double)delayMillis / 1000.0});
                    this.revalidateTask = this.executor.schedule(this::silentRevalidateOnce, delayMillis, TimeUnit.MILLISECONDS);
                }
            }
            return null;
        });
    }

    private synchronized CompletableFuture<Void> revalidate(T newValue) {
        if (this.state != State.Valid && this.state != State.Init) {
            return CompletableFuture.failedFuture(new IllegalStateException("Lock was not in valid state: " + this.state));
        }
        if (log.isDebugEnabled()) {
            log.debug("doRevalidate with newValue={}, version={}", newValue, (Object)this.version);
        }
        return this.store.get(this.path).thenCompose(optGetResult -> {
            T existingValue;
            if (!optGetResult.isPresent()) {
                this.setVersion(-1L);
                return this.acquireWithNoRevalidation(newValue).thenRun(() -> log.info("Successfully re-acquired missing lock at {}", (Object)this.path));
            }
            GetResult res = (GetResult)optGetResult.get();
            if (!res.getStat().isEphemeral()) {
                return FutureUtils.exception(new MetadataStoreException.LockBusyException("Path " + this.path + " is already created as non-ephemeral"));
            }
            try {
                existingValue = this.serde.deserialize(this.path, res.getValue(), res.getStat());
            }
            catch (Throwable t) {
                return FutureUtils.exception(t);
            }
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                if (newValue.equals(existingValue)) {
                    if (res.getStat().isCreatedBySelf()) {
                        this.version = res.getStat().getVersion();
                        this.value = newValue;
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Deleting stale lock at {}", (Object)this.path);
                    return ((CompletableFuture)((CompletableFuture)this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenRun(() -> this.setVersion(-1L))).thenCompose(__ -> this.acquireWithNoRevalidation(newValue))).thenRun(() -> log.info("Successfully re-acquired stale lock at {}", (Object)this.path));
                }
                if (!res.getStat().isCreatedBySelf()) {
                    return FutureUtils.exception(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
                }
                return ((CompletableFuture)((CompletableFuture)this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenRun(() -> this.setVersion(-1L))).thenCompose(__ -> this.acquireWithNoRevalidation(newValue))).thenRun(() -> log.info("Successfully re-acquired lock at {}", (Object)this.path));
            }
        });
    }

    private synchronized void setVersion(long version) {
        this.version = version;
    }

    private static enum State {
        Init,
        Valid,
        Releasing,
        Released;

    }
}

