package org.apache.flink.state.api;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.FileCopyFunction;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.output.StatePathExtractor;
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/state/api/SavepointWriter.class */
public class SavepointWriter {
    protected final SavepointMetadataV2 metadata;

    @Nullable
    protected final StateBackend stateBackend;
    private final Configuration configuration;

    public static SavepointWriter fromExistingSavepoint(String str) throws IOException {
        CheckpointMetadata loadSavepointMetadata = SavepointLoader.loadSavepointMetadata(str);
        return new SavepointWriter(new SavepointMetadataV2(((Integer) loadSavepointMetadata.getOperatorStates().stream().map((v0) -> {
            return v0.getMaxParallelism();
        }).max(Comparator.naturalOrder()).orElseThrow(() -> {
            return new RuntimeException("Savepoint must contain at least one operator state.");
        })).intValue(), loadSavepointMetadata.getMasterStates(), loadSavepointMetadata.getOperatorStates()), null);
    }

    public static SavepointWriter fromExistingSavepoint(String str, StateBackend stateBackend) throws IOException {
        CheckpointMetadata loadSavepointMetadata = SavepointLoader.loadSavepointMetadata(str);
        return new SavepointWriter(new SavepointMetadataV2(((Integer) loadSavepointMetadata.getOperatorStates().stream().map((v0) -> {
            return v0.getMaxParallelism();
        }).max(Comparator.naturalOrder()).orElseThrow(() -> {
            return new RuntimeException("Savepoint must contain at least one operator state.");
        })).intValue(), loadSavepointMetadata.getMasterStates(), loadSavepointMetadata.getOperatorStates()), stateBackend);
    }

    public static SavepointWriter newSavepoint(int i) {
        Preconditions.checkArgument(i > 0 && i <= 32768, "Maximum parallelism must be between 1 and 32768. Found: " + i);
        return new SavepointWriter(new SavepointMetadataV2(i, Collections.emptyList(), Collections.emptyList()), null);
    }

    public static SavepointWriter newSavepoint(StateBackend stateBackend, int i) {
        Preconditions.checkArgument(i > 0 && i <= 32768, "Maximum parallelism must be between 1 and 32768. Found: " + i);
        return new SavepointWriter(new SavepointMetadataV2(i, Collections.emptyList(), Collections.emptyList()), stateBackend);
    }

    private SavepointWriter(SavepointMetadataV2 savepointMetadataV2, @Nullable StateBackend stateBackend) {
        Preconditions.checkNotNull(savepointMetadataV2, "The savepoint metadata must not be null");
        this.metadata = savepointMetadataV2;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration();
    }

    public SavepointWriter removeOperator(String str) {
        this.metadata.removeOperator(str);
        return this;
    }

    public <T> SavepointWriter withOperator(String str, StateBootstrapTransformation<T> stateBootstrapTransformation) {
        this.metadata.addOperator(str, stateBootstrapTransformation);
        return this;
    }

    public <T> SavepointWriter withConfiguration(ConfigOption<T> configOption, T t) {
        this.configuration.set(configOption, t);
        return this;
    }

    public final void write(String str) {
        DataStream<OperatorState> union;
        Path path = new Path(str);
        DataStream<OperatorState> writeOperatorStates = writeOperatorStates(this.metadata.getNewOperators(), this.configuration, path);
        List<OperatorState> existingOperators = this.metadata.getExistingOperators();
        if (existingOperators.isEmpty()) {
            union = writeOperatorStates;
        } else {
            DataStream name = writeOperatorStates.getExecutionEnvironment().fromCollection(existingOperators).name("existingOperatorStates");
            name.flatMap(new StatePathExtractor()).setParallelism(1).addSink(new OutputFormatSinkFunction(new FileCopyFunction(str)));
            union = writeOperatorStates.union(new DataStream[]{name});
        }
        union.transform("reduce(OperatorState)", TypeInformation.of(CheckpointMetadata.class), new GroupReduceOperator(new MergeOperatorStates(this.metadata.getMasterStates()))).forceNonParallel().addSink(new OutputFormatSinkFunction(new SavepointOutputFormat(path))).setParallelism(1).name(str);
    }

    private DataStream<OperatorState> writeOperatorStates(List<StateBootstrapTransformationWithID<?>> list, Configuration configuration, Path path) {
        return (DataStream) list.stream().map(stateBootstrapTransformationWithID -> {
            return stateBootstrapTransformationWithID.getBootstrapTransformation().writeOperatorState(stateBootstrapTransformationWithID.getOperatorID(), this.stateBackend, configuration, this.metadata.getMaxParallelism(), path);
        }).reduce((obj, dataStream) -> {
            return ((DataStream) obj).union(new DataStream[]{dataStream});
        }).orElseThrow(() -> {
            return new IllegalStateException("Savepoint must contain at least one operator");
        });
    }
}
