package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.class */
public class TaskExecutorStateChangelogStoragesManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class);
    private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId = new HashMap();
    private boolean closed = false;
    private final Thread shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);

    @Nullable
    public StateChangelogStorage<?> stateChangelogStorageForJob(@Nonnull JobID jobID, Configuration configuration) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorage.");
        }
        Optional<StateChangelogStorage<?>> optional = this.changelogStoragesByJobId.get(jobID);
        if (optional == null) {
            StateChangelogStorage<?> load = StateChangelogStorageLoader.load(configuration);
            optional = Optional.ofNullable(load);
            this.changelogStoragesByJobId.put(jobID, optional);
            if (load != null) {
                LOG.debug("Registered new state changelog storage for job {} : {}.", jobID, load);
            } else {
                LOG.info("Try to registered new state changelog storage for job {}, but result is null.", jobID);
            }
        } else if (optional.isPresent()) {
            LOG.debug("Found existing state changelog storage for job {}: {}.", jobID, optional.get());
        } else {
            LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", jobID);
        }
        return optional.orElse(null);
    }

    public void releaseStateChangelogStorageForJob(@Nonnull JobID jobID) {
        Optional<StateChangelogStorage<?>> remove;
        LOG.debug("Releasing state changelog storage under job id {}.", jobID);
        if (this.closed || (remove = this.changelogStoragesByJobId.remove(jobID)) == null) {
            return;
        }
        remove.ifPresent(this::doRelease);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        HashMap hashMap = new HashMap(this.changelogStoragesByJobId);
        this.changelogStoragesByJobId.clear();
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
        LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            ((Optional) ((Map.Entry) it.next()).getValue()).ifPresent(this::doRelease);
        }
    }

    private void doRelease(StateChangelogStorage<?> stateChangelogStorage) {
        if (stateChangelogStorage != null) {
            try {
                stateChangelogStorage.close();
            } catch (Exception e) {
                LOG.warn("Exception while disposing state changelog storage {}.", stateChangelogStorage, e);
            }
        }
    }
}
