package org.apache.flink.runtime.zookeeper;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.BackgroundCallback;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.CuratorEventType;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.class */
public class ZooKeeperStateHandleStore<T extends Serializable> {
    public static Logger LOG = LoggerFactory.getLogger((Class<?>) ZooKeeperStateHandleStore.class);
    private final CuratorFramework client;
    private final RetrievableStateStorageHelper<T> storage;
    private final Executor executor;
    private final String lockNode = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore$RemoveBackgroundCallback.class */
    public static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {

        @Nullable
        private final RetrievableStateHandle<T> stateHandle;

        @Nullable
        private final RemoveCallback<T> callback;
        private final String pathInZooKeeper;

        private RemoveBackgroundCallback(@Nullable RetrievableStateHandle<T> retrievableStateHandle, @Nullable RemoveCallback<T> removeCallback, String str) {
            this.stateHandle = retrievableStateHandle;
            this.callback = removeCallback;
            this.pathInZooKeeper = (String) Preconditions.checkNotNull(str);
        }

        @Override // org.apache.flink.shaded.curator.org.apache.curator.framework.api.BackgroundCallback
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            try {
                if (curatorEvent.getType() != CuratorEventType.DELETE) {
                    throw new IllegalStateException("Unexpected event type " + curatorEvent.getType() + " in '" + curatorEvent + "' callback.");
                }
                KeeperException.Code code = KeeperException.Code.get(curatorEvent.getResultCode());
                if (code == KeeperException.Code.OK) {
                    Exception exc = null;
                    if (null != this.callback) {
                        try {
                            this.callback.apply(this.stateHandle);
                        } catch (Throwable th) {
                            exc = new Exception("Could not execute delete action for node " + this.pathInZooKeeper + '.', th);
                        }
                    }
                    if (this.stateHandle != null) {
                        try {
                            this.stateHandle.discardState();
                        } catch (Throwable th2) {
                            Exception exc2 = new Exception("Could not discard state handle of node " + this.pathInZooKeeper + '.', th2);
                            if (exc == null) {
                                exc = exc2;
                            } else {
                                exc.addSuppressed(exc2);
                            }
                        }
                    }
                    if (exc != null) {
                        throw exc;
                    }
                } else {
                    if (code != KeeperException.Code.NOTEMPTY) {
                        throw new IllegalStateException("Unexpected result code " + code.name() + " in '" + curatorEvent + "' callback.");
                    }
                    ZooKeeperStateHandleStore.LOG.debug("Could not delete node " + this.pathInZooKeeper + " because it is still locked.");
                }
            } catch (Exception e) {
                ZooKeeperStateHandleStore.LOG.warn("Failed to run callback for delete operation on node " + this.pathInZooKeeper + '.', (Throwable) e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore$RemoveCallback.class */
    public interface RemoveCallback<T extends Serializable> {
        void apply(@Nullable RetrievableStateHandle<T> retrievableStateHandle) throws FlinkException;
    }

    public ZooKeeperStateHandleStore(CuratorFramework curatorFramework, RetrievableStateStorageHelper<T> retrievableStateStorageHelper, Executor executor) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "Curator client");
        this.storage = (RetrievableStateStorageHelper) Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
        this.executor = (Executor) Preconditions.checkNotNull(executor);
    }

    public RetrievableStateHandle<T> addAndLock(String str, T t) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Preconditions.checkNotNull(t, "State");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> store = this.storage.store(t);
        boolean z = false;
        try {
            try {
                ((CuratorTransactionBridge) ((CuratorTransactionBridge) this.client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(normalizePath, InstantiationUtil.serializeObject(store))).and().create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(normalizePath))).and().commit();
                z = true;
                if (1 == 0 && store != null) {
                    store.discardState();
                }
                return store;
            } catch (KeeperException.NodeExistsException e) {
                throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
            }
        } catch (Throwable th) {
            if (!z && store != null) {
                store.discardState();
            }
            throw th;
        }
    }

    public void replace(String str, int i, T t) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Preconditions.checkNotNull(t, "State");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> retrievableStateHandle = get(normalizePath, false);
        RetrievableStateHandle<T> store = this.storage.store(t);
        boolean z = false;
        try {
            try {
                this.client.setData().withVersion(i).forPath(normalizePath, InstantiationUtil.serializeObject(store));
                z = true;
                if (1 != 0) {
                    retrievableStateHandle.discardState();
                } else {
                    store.discardState();
                }
            } catch (KeeperException.NoNodeException e) {
                throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
            }
        } catch (Throwable th) {
            if (z) {
                retrievableStateHandle.discardState();
            } else {
                store.discardState();
            }
            throw th;
        }
    }

    public int exists(String str) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Stat forPath = this.client.checkExists().forPath(normalizePath(str));
        if (forPath != null) {
            return forPath.getVersion();
        }
        return -1;
    }

    public RetrievableStateHandle<T> getAndLock(String str) throws Exception {
        return get(str, true);
    }

    public Collection<String> getAllPaths() throws Exception {
        while (this.client.checkExists().forPath("/") != null) {
            try {
                return this.client.getChildren().forPath("/");
            } catch (KeeperException.NoNodeException e) {
            }
        }
        return Collections.emptyList();
    }

    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        while (!z) {
            arrayList.clear();
            Stat forPath = this.client.checkExists().forPath("/");
            if (forPath == null) {
                break;
            }
            int cversion = forPath.getCversion();
            Iterator<String> it = this.client.getChildren().forPath("/").iterator();
            while (it.hasNext()) {
                String str = "/" + it.next();
                try {
                    arrayList.add(new Tuple2(getAndLock(str), str));
                } catch (IOException e) {
                    LOG.warn("Could not get all ZooKeeper children. Node {} contained corrupted data. Ignoring this node.", str, e);
                } catch (KeeperException.NoNodeException e2) {
                }
            }
            z = cversion == this.client.checkExists().forPath("/").getCversion();
        }
        return arrayList;
    }

    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByNameAndLock() throws Exception {
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        while (!z) {
            arrayList.clear();
            Stat forPath = this.client.checkExists().forPath("/");
            if (forPath == null) {
                break;
            }
            int cversion = forPath.getCversion();
            Iterator<String> it = ZKPaths.getSortedChildren(this.client.getZookeeperClient().getZooKeeper(), ZKPaths.fixForNamespace(this.client.getNamespace(), "/")).iterator();
            while (it.hasNext()) {
                String str = "/" + it.next();
                try {
                    arrayList.add(new Tuple2(getAndLock(str), str));
                } catch (IOException e) {
                    LOG.warn("Could not get all ZooKeeper children. Node {} contained corrupted data. Releasing and trying to remove this node.", str, e);
                    releaseAndTryRemove(str);
                } catch (KeeperException.NoNodeException e2) {
                }
            }
            z = cversion == this.client.checkExists().forPath("/").getCversion();
        }
        return arrayList;
    }

    public void releaseAndTryRemove(String str) throws Exception {
        releaseAndTryRemove(str, null);
    }

    public void releaseAndTryRemove(String str, @Nullable RemoveCallback<T> removeCallback) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> retrievableStateHandle = null;
        try {
            retrievableStateHandle = get(normalizePath, false);
        } catch (Exception e) {
            LOG.warn("Could not retrieve the state handle from node " + normalizePath + '.', (Throwable) e);
        }
        release(str);
        this.client.delete().inBackground((BackgroundCallback) new RemoveBackgroundCallback(retrievableStateHandle, removeCallback, normalizePath), this.executor).forPath(normalizePath);
    }

    public void releaseAndTryRemoveAll() throws Exception {
        Exception exc = null;
        Iterator<String> it = getAllPaths().iterator();
        while (it.hasNext()) {
            try {
                releaseAndTryRemove('/' + it.next());
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        if (exc != null) {
            throw new Exception("Could not properly release and try removing all state nodes.", exc);
        }
    }

    public void release(String str) throws Exception {
        try {
            this.client.delete().forPath(getLockPath(normalizePath(str)));
        } catch (KeeperException.NoNodeException e) {
        } catch (Exception e2) {
            throw new Exception("Could not release the lock: " + getLockPath(str) + '.', e2);
        }
    }

    public void releaseAll() throws Exception {
        Exception exc = null;
        Iterator<String> it = getAllPaths().iterator();
        while (it.hasNext()) {
            try {
                release(it.next());
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        if (exc != null) {
            throw new Exception("Could not properly release all state nodes.", exc);
        }
    }

    protected String getLockPath(String str) {
        return str + '/' + this.lockNode;
    }

    private RetrievableStateHandle<T> get(String str, boolean z) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        String normalizePath = normalizePath(str);
        if (z) {
            try {
                this.client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(normalizePath));
            } catch (KeeperException.NoNodeException e) {
                throw new Exception("Cannot lock the node " + normalizePath + " since it does not exist.", e);
            } catch (KeeperException.NodeExistsException e2) {
            }
        }
        boolean z2 = false;
        try {
            try {
                try {
                    RetrievableStateHandle<T> retrievableStateHandle = (RetrievableStateHandle) InstantiationUtil.deserializeObject(this.client.getData().forPath(normalizePath), Thread.currentThread().getContextClassLoader());
                    z2 = true;
                    if (1 == 0 && z) {
                        release(normalizePath);
                    }
                    return retrievableStateHandle;
                } catch (IOException | ClassNotFoundException e3) {
                    throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + normalizePath + '.', e3);
                }
            } catch (Exception e4) {
                throw new Exception("Failed to retrieve state handle data under " + normalizePath + " from ZooKeeper.", e4);
            }
        } catch (Throwable th) {
            if (!z2 && z) {
                release(normalizePath);
            }
            throw th;
        }
    }

    private static String normalizePath(String str) {
        return str.startsWith("/") ? str : '/' + str;
    }
}
