package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsStateBackend.class */
public class FsStateBackend extends AbstractStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
    public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
    private static final int MAX_FILE_STATE_THRESHOLD = 1048576;
    private final Path basePath;
    private final int fileStateThreshold;
    private final boolean asynchronousSnapshots;

    public FsStateBackend(String str) throws IOException {
        this(new Path(str));
    }

    public FsStateBackend(String str, boolean z) throws IOException {
        this(new Path(str), z);
    }

    public FsStateBackend(Path path) throws IOException {
        this(path.toUri());
    }

    public FsStateBackend(Path path, boolean z) throws IOException {
        this(path.toUri(), z);
    }

    public FsStateBackend(URI uri) throws IOException {
        this(uri, 1024, false);
    }

    public FsStateBackend(URI uri, boolean z) throws IOException {
        this(uri, 1024, z);
    }

    public FsStateBackend(URI uri, int i) throws IOException {
        this(uri, i, false);
    }

    public FsStateBackend(URI uri, int i, boolean z) throws IOException {
        Preconditions.checkArgument(i >= 0, "The threshold for file state size must be zero or larger.");
        Preconditions.checkArgument(i <= 1048576, "The threshold for file state size cannot be larger than %s", new Object[]{1048576});
        this.fileStateThreshold = i;
        this.basePath = validateAndNormalizeUri(uri);
        this.asynchronousSnapshots = z;
    }

    public Path getBasePath() {
        return this.basePath;
    }

    public int getMinFileSizeThreshold() {
        return this.fileStateThreshold;
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public CheckpointStreamFactory createStreamFactory(JobID jobID, String str) throws IOException {
        return new FsCheckpointStreamFactory(this.basePath, jobID, this.fileStateThreshold);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public CheckpointStreamFactory createSavepointStreamFactory(JobID jobID, String str, String str2) throws IOException {
        return new FsSavepointStreamFactory(new Path(str2), jobID, this.fileStateThreshold);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry) throws IOException {
        return new HeapKeyedStateBackend(taskKvStateRegistry, typeSerializer, environment.getUserClassLoader(), i, keyGroupRange, this.asynchronousSnapshots, environment.getExecutionConfig());
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str) throws Exception {
        return new DefaultOperatorStateBackend(environment.getUserClassLoader(), environment.getExecutionConfig(), this.asynchronousSnapshots);
    }

    public String toString() {
        return "File State Backend @ " + this.basePath;
    }

    public static Path validateAndNormalizeUri(URI uri) throws IOException {
        String scheme = uri.getScheme();
        String path = uri.getPath();
        if (scheme == null) {
            throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.");
        }
        if (path == null) {
            throw new IllegalArgumentException("The path to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.");
        }
        if (path.length() == 0 || path.equals(ZKPaths.PATH_SEPARATOR)) {
            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
        }
        if (!FileSystem.isFlinkSupportedScheme(uri.getScheme())) {
            return new Path(uri);
        }
        FileSystem fileSystem = FileSystem.get(uri);
        if (fileSystem == null) {
            LOG.warn("Could not verify checkpoint path. This might be caused by a genuine problem or by the fact that the file system is not accessible from the client. Reason:{}", "Could not find a file system for the given scheme inthe available configurations.");
            return new Path(uri);
        }
        URI uri2 = fileSystem.getUri();
        try {
            return new Path(new URI(uri2.getScheme(), uri2.getAuthority(), path, null, null));
        } catch (URISyntaxException e) {
            LOG.warn("Could not verify checkpoint path. This might be caused by a genuine problem or by the fact that the file system is not accessible from the client. Reason: {}", String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(), uri, uri2));
            return new Path(uri);
        }
    }
}
