package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.class */
public class ZooKeeperJobGraphStore implements JobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperJobGraphStore.class);
    private final Object cacheLock = new Object();
    private final Set<JobID> addedJobGraphs = new HashSet();
    private final ZooKeeperStateHandleStore<JobGraph> jobGraphsInZooKeeper;
    private final PathChildrenCache pathCache;
    private final String zooKeeperFullBasePath;
    private JobGraphStore.JobGraphListener jobGraphListener;
    private boolean isRunning;

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore$JobGraphsPathCacheListener.class */
    private final class JobGraphsPathCacheListener implements PathChildrenCacheListener {
        private JobGraphsPathCacheListener() {
        }

        @Override // org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            if (ZooKeeperJobGraphStore.LOG.isDebugEnabled()) {
                if (pathChildrenCacheEvent.getData() != null) {
                    ZooKeeperJobGraphStore.LOG.debug("Received {} event (path: {})", pathChildrenCacheEvent.getType(), pathChildrenCacheEvent.getData().getPath());
                } else {
                    ZooKeeperJobGraphStore.LOG.debug("Received {} event", pathChildrenCacheEvent.getType());
                }
            }
            switch (pathChildrenCacheEvent.getType()) {
                case CHILD_ADDED:
                    JobID fromEvent = fromEvent(pathChildrenCacheEvent);
                    ZooKeeperJobGraphStore.LOG.debug("Received CHILD_ADDED event notification for job {}", fromEvent);
                    synchronized (ZooKeeperJobGraphStore.this.cacheLock) {
                        try {
                            if (ZooKeeperJobGraphStore.this.jobGraphListener != null && !ZooKeeperJobGraphStore.this.addedJobGraphs.contains(fromEvent)) {
                                try {
                                    ZooKeeperJobGraphStore.this.jobGraphListener.onAddedJobGraph(fromEvent);
                                } catch (Throwable th) {
                                    ZooKeeperJobGraphStore.LOG.error("Error in callback", th);
                                }
                            }
                        } catch (Exception e) {
                            ZooKeeperJobGraphStore.LOG.error("Error in JobGraphsPathCacheListener", e);
                        }
                    }
                    return;
                case CHILD_UPDATED:
                default:
                    return;
                case CHILD_REMOVED:
                    JobID fromEvent2 = fromEvent(pathChildrenCacheEvent);
                    ZooKeeperJobGraphStore.LOG.debug("Received CHILD_REMOVED event notification for job {}", fromEvent2);
                    synchronized (ZooKeeperJobGraphStore.this.cacheLock) {
                        try {
                            if (ZooKeeperJobGraphStore.this.jobGraphListener != null && ZooKeeperJobGraphStore.this.addedJobGraphs.contains(fromEvent2)) {
                                try {
                                    ZooKeeperJobGraphStore.this.jobGraphListener.onRemovedJobGraph(fromEvent2);
                                } catch (Throwable th2) {
                                    ZooKeeperJobGraphStore.LOG.error("Error in callback", th2);
                                }
                            }
                        } catch (Exception e2) {
                            ZooKeeperJobGraphStore.LOG.error("Error in JobGraphsPathCacheListener", e2);
                            return;
                        }
                    }
                    return;
                case CONNECTION_SUSPENDED:
                    ZooKeeperJobGraphStore.LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not monitored (temporarily).");
                    return;
                case CONNECTION_LOST:
                    ZooKeeperJobGraphStore.LOG.warn("ZooKeeper connection LOST. Changes to the submitted job graphs are not monitored (permanently).");
                    return;
                case CONNECTION_RECONNECTED:
                    ZooKeeperJobGraphStore.LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.");
                    return;
                case INITIALIZED:
                    ZooKeeperJobGraphStore.LOG.info("JobGraphsPathCacheListener initialized");
                    return;
            }
        }

        private JobID fromEvent(PathChildrenCacheEvent pathChildrenCacheEvent) {
            return JobID.fromHexString(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath()));
        }
    }

    public ZooKeeperJobGraphStore(String str, ZooKeeperStateHandleStore<JobGraph> zooKeeperStateHandleStore, PathChildrenCache pathChildrenCache) {
        Preconditions.checkNotNull(str, "Current jobs path");
        this.zooKeeperFullBasePath = str;
        this.jobGraphsInZooKeeper = (ZooKeeperStateHandleStore) Preconditions.checkNotNull(zooKeeperStateHandleStore);
        this.pathCache = (PathChildrenCache) Preconditions.checkNotNull(pathChildrenCache);
        pathChildrenCache.getListenable().addListener(new JobGraphsPathCacheListener());
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
        synchronized (this.cacheLock) {
            if (!this.isRunning) {
                this.jobGraphListener = jobGraphListener;
                this.pathCache.start();
                this.isRunning = true;
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public void stop() throws Exception {
        synchronized (this.cacheLock) {
            if (this.isRunning) {
                this.jobGraphListener = null;
                Exception exc = null;
                try {
                    try {
                        this.jobGraphsInZooKeeper.releaseAll();
                    } catch (Exception e) {
                        exc = e;
                    }
                    try {
                        this.pathCache.close();
                    } catch (Exception e2) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                    }
                    if (exc != null) {
                        throw new FlinkException("Could not properly stop the ZooKeeperJobGraphStore.", exc);
                    }
                    this.isRunning = false;
                } catch (Throwable th) {
                    this.isRunning = false;
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    @Nullable
    public JobGraph recoverJobGraph(JobID jobID) throws Exception {
        JobGraph retrieveState;
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobID, this.zooKeeperFullBasePath, pathForJob});
        synchronized (this.cacheLock) {
            verifyIsRunning();
            try {
                try {
                    try {
                        retrieveState = this.jobGraphsInZooKeeper.getAndLock(pathForJob).retrieveState();
                        this.addedJobGraphs.add(retrieveState.getJobID());
                        LOG.info("Recovered {}.", retrieveState);
                        if (1 == 0) {
                            this.jobGraphsInZooKeeper.release(pathForJob);
                        }
                    } catch (IOException e) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + pathForJob + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", e);
                    } catch (ClassNotFoundException e2) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + pathForJob + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", e2);
                    }
                } catch (Throwable th) {
                    if (0 == 0) {
                        this.jobGraphsInZooKeeper.release(pathForJob);
                    }
                    throw th;
                }
            } catch (KeeperException.NoNodeException e3) {
                if (1 == 0) {
                    this.jobGraphsInZooKeeper.release(pathForJob);
                }
                return null;
            } catch (Exception e4) {
                throw new FlinkException("Could not retrieve the submitted job graph state handle for " + pathForJob + " from the submitted job graph store.", e4);
            }
        }
        return retrieveState;
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter
    public void putJobGraph(JobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull(jobGraph, "Job graph");
        String pathForJob = getPathForJob(jobGraph.getJobID());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{jobGraph.getJobID(), this.zooKeeperFullBasePath, pathForJob});
        boolean z = false;
        while (!z) {
            synchronized (this.cacheLock) {
                verifyIsRunning();
                int exists = this.jobGraphsInZooKeeper.exists(pathForJob);
                if (exists == -1) {
                    try {
                        this.jobGraphsInZooKeeper.addAndLock(pathForJob, jobGraph);
                        this.addedJobGraphs.add(jobGraph.getJobID());
                        z = true;
                    } catch (KeeperException.NodeExistsException e) {
                    }
                } else {
                    if (!this.addedJobGraphs.contains(jobGraph.getJobID())) {
                        throw new IllegalStateException("Oh, no. Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                    }
                    try {
                        this.jobGraphsInZooKeeper.replace(pathForJob, exists, jobGraph);
                        LOG.info("Updated {} in ZooKeeper.", jobGraph);
                        z = true;
                    } catch (KeeperException.NoNodeException e2) {
                    }
                }
            }
        }
        LOG.info("Added {} to ZooKeeper.", jobGraph);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter
    public void removeJobGraph(JobID jobID) throws Exception {
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobID, this.zooKeeperFullBasePath, pathForJob});
        synchronized (this.cacheLock) {
            if (this.addedJobGraphs.contains(jobID)) {
                if (!this.jobGraphsInZooKeeper.releaseAndTryRemove(pathForJob)) {
                    throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobID));
                }
                this.addedJobGraphs.remove(jobID);
            }
        }
        LOG.info("Removed job graph {} from ZooKeeper.", jobID);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter
    public void releaseJobGraph(JobID jobID) throws Exception {
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Releasing locks of job graph {} from {}{}.", new Object[]{jobID, this.zooKeeperFullBasePath, pathForJob});
        synchronized (this.cacheLock) {
            if (this.addedJobGraphs.contains(jobID)) {
                this.jobGraphsInZooKeeper.release(pathForJob);
                this.addedJobGraphs.remove(jobID);
            }
        }
        LOG.info("Released locks of job graph {} from ZooKeeper.", jobID);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public Collection<JobID> getJobIds() throws Exception {
        LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", this.zooKeeperFullBasePath);
        try {
            Collection<String> allPaths = this.jobGraphsInZooKeeper.getAllPaths();
            ArrayList arrayList = new ArrayList(allPaths.size());
            for (String str : allPaths) {
                try {
                    arrayList.add(jobIdfromPath(str));
                } catch (Exception e) {
                    LOG.warn("Could not parse job id from {}. This indicates a malformed path.", str, e);
                }
            }
            return arrayList;
        } catch (Exception e2) {
            throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e2);
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState(this.isRunning, "Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobID) {
        Preconditions.checkNotNull(jobID, "Job ID");
        return String.format("/%s", jobID);
    }

    public static JobID jobIdfromPath(String str) {
        return JobID.fromHexString(str);
    }
}
