/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.zookeeper;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperStateHandleStore<T extends Serializable> {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
    private final CuratorFramework client;
    private final RetrievableStateStorageHelper<T> storage;
    private final String lockNode;

    public ZooKeeperStateHandleStore(CuratorFramework client, RetrievableStateStorageHelper<T> storage) {
        this.client = (CuratorFramework)Preconditions.checkNotNull((Object)client, (String)"Curator client");
        this.storage = (RetrievableStateStorageHelper)Preconditions.checkNotNull(storage, (String)"State storage");
        this.lockNode = UUID.randomUUID().toString();
    }

    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws Exception {
        Preconditions.checkNotNull((Object)pathInZooKeeper, (String)"Path in ZooKeeper");
        Preconditions.checkNotNull(state, (String)"State");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        RetrievableStateHandle<T> storeHandle = this.storage.store(state);
        boolean success = false;
        try {
            byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
            ((CuratorTransactionBridge)((ACLPathAndBytesable)((CuratorTransactionBridge)((ACLPathAndBytesable)this.client.inTransaction().create().withMode(CreateMode.PERSISTENT)).forPath(path, serializedStoreHandle)).and().create().withMode(CreateMode.EPHEMERAL)).forPath(this.getLockPath(path))).and().commit();
            success = true;
            RetrievableStateHandle<T> retrievableStateHandle = storeHandle;
            return retrievableStateHandle;
        }
        catch (KeeperException.NodeExistsException e) {
            throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
        }
        finally {
            if (!success && storeHandle != null) {
                storeHandle.discardState();
            }
        }
    }

    public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception {
        Preconditions.checkNotNull((Object)pathInZooKeeper, (String)"Path in ZooKeeper");
        Preconditions.checkNotNull(state, (String)"State");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        RetrievableStateHandle<T> oldStateHandle = this.get(path, false);
        RetrievableStateHandle<T> newStateHandle = this.storage.store(state);
        boolean success = false;
        try {
            byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle);
            ((BackgroundPathAndBytesable)this.client.setData().withVersion(expectedVersion)).forPath(path, serializedStateHandle);
            success = true;
        }
        catch (KeeperException.NoNodeException e) {
            throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
        }
        finally {
            if (success) {
                oldStateHandle.discardState();
            } else {
                newStateHandle.discardState();
            }
        }
    }

    public int exists(String pathInZooKeeper) throws Exception {
        Preconditions.checkNotNull((Object)pathInZooKeeper, (String)"Path in ZooKeeper");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        Stat stat = (Stat)this.client.checkExists().forPath(path);
        if (stat != null) {
            return stat.getVersion();
        }
        return -1;
    }

    public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception {
        return this.get(pathInZooKeeper, true);
    }

    public Collection<String> getAllPaths() throws Exception {
        String path = "/";
        Stat stat;
        while ((stat = (Stat)this.client.checkExists().forPath("/")) != null) {
            try {
                return (Collection)this.client.getChildren().forPath("/");
            }
            catch (KeeperException.NoNodeException noNodeException) {
                continue;
            }
            break;
        }
        return Collections.emptyList();
    }

    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
        ArrayList<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<Tuple2<RetrievableStateHandle<T>, String>>();
        boolean success = false;
        block3: while (!success) {
            stateHandles.clear();
            Stat stat = (Stat)this.client.checkExists().forPath("/");
            if (stat == null) break;
            int initialCVersion = stat.getCversion();
            List children = (List)this.client.getChildren().forPath("/");
            for (String path : children) {
                path = "/" + path;
                try {
                    RetrievableStateHandle<T> stateHandle = this.getAndLock(path);
                    stateHandles.add(new Tuple2(stateHandle, (Object)path));
                }
                catch (KeeperException.NoNodeException ignored) {
                    continue block3;
                }
                catch (IOException ioException) {
                    LOG.warn("Could not get all ZooKeeper children. Node {} contained corrupted data. Ignoring this node.", (Object)path, (Object)ioException);
                }
            }
            int finalCVersion = ((Stat)this.client.checkExists().forPath("/")).getCversion();
            success = initialCVersion == finalCVersion;
        }
        return stateHandles;
    }

    @Nullable
    public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
        Preconditions.checkNotNull((Object)pathInZooKeeper, (String)"Path in ZooKeeper");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        RetrievableStateHandle<T> stateHandle = null;
        try {
            stateHandle = this.get(path, false);
        }
        catch (Exception e) {
            LOG.warn("Could not retrieve the state handle from node {}.", (Object)path, (Object)e);
        }
        this.release(pathInZooKeeper);
        try {
            this.client.delete().forPath(path);
        }
        catch (KeeperException.NotEmptyException ignored) {
            LOG.debug("Could not delete znode {} because it is still locked.", (Object)path);
            return false;
        }
        if (stateHandle != null) {
            stateHandle.discardState();
        }
        return true;
    }

    public void releaseAndTryRemoveAll() throws Exception {
        Collection<String> children = this.getAllPaths();
        Exception exception = null;
        for (String child : children) {
            try {
                this.releaseAndTryRemove('/' + child);
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
        }
        if (exception != null) {
            throw new Exception("Could not properly release and try removing all state nodes.", exception);
        }
    }

    public void release(String pathInZooKeeper) throws Exception {
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        try {
            this.client.delete().forPath(this.getLockPath(path));
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (Exception e) {
            throw new Exception("Could not release the lock: " + this.getLockPath(pathInZooKeeper) + '.', e);
        }
    }

    public void releaseAll() throws Exception {
        Collection<String> children = this.getAllPaths();
        Exception exception = null;
        for (String child : children) {
            try {
                this.release(child);
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
        }
        if (exception != null) {
            throw new Exception("Could not properly release all state nodes.", exception);
        }
    }

    public void deleteChildren() throws Exception {
        String path = "/" + this.client.getNamespace();
        LOG.info("Removing {} from ZooKeeper", (Object)path);
        ZKPaths.deleteChildren(this.client.getZookeeperClient().getZooKeeper(), path, true);
    }

    protected String getLockPath(String rootPath) {
        return rootPath + '/' + this.lockNode;
    }

    private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) throws Exception {
        Preconditions.checkNotNull((Object)pathInZooKeeper, (String)"Path in ZooKeeper");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        if (lock) {
            try {
                ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(this.getLockPath(path));
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        boolean success = false;
        try {
            byte[] data = (byte[])this.client.getData().forPath(path);
            try {
                RetrievableStateHandle retrievableStateHandle = (RetrievableStateHandle)InstantiationUtil.deserializeObject((byte[])data, (ClassLoader)Thread.currentThread().getContextClassLoader());
                success = true;
                RetrievableStateHandle retrievableStateHandle2 = retrievableStateHandle;
                return retrievableStateHandle2;
            }
            catch (IOException | ClassNotFoundException e) {
                throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + path + '.', e);
            }
        }
        finally {
            if (!success && lock) {
                this.release(path);
            }
        }
    }

    private static String normalizePath(String path) {
        if (path.startsWith("/")) {
            return path;
        }
        return '/' + path;
    }
}

