/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;

final class OperatorCoordinatorCheckpoints {
    OperatorCoordinatorCheckpoints() {
    }

    public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(OperatorCoordinatorCheckpointContext coordinatorContext, long checkpointId) throws Exception {
        CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<byte[]>();
        coordinatorContext.checkpointCoordinator(checkpointId, checkpointFuture);
        return checkpointFuture.thenApply(state -> new CoordinatorSnapshot(coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), (byte[])state)));
    }

    public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(Collection<OperatorCoordinatorCheckpointContext> coordinators, long checkpointId) throws Exception {
        ArrayList<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<CompletableFuture<CoordinatorSnapshot>>(coordinators.size());
        for (OperatorCoordinatorCheckpointContext coordinator : coordinators) {
            CompletableFuture<CoordinatorSnapshot> checkpointFuture = OperatorCoordinatorCheckpoints.triggerCoordinatorCheckpoint(coordinator, checkpointId);
            individualSnapshots.add(checkpointFuture);
        }
        return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
    }

    public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(Collection<OperatorCoordinatorCheckpointContext> coordinators, PendingCheckpoint checkpoint, Executor acknowledgeExecutor) throws Exception {
        CompletableFuture<AllCoordinatorSnapshots> snapshots = OperatorCoordinatorCheckpoints.triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());
        return snapshots.thenAcceptAsync(allSnapshots -> {
            try {
                OperatorCoordinatorCheckpoints.acknowledgeAllCoordinators(checkpoint, ((AllCoordinatorSnapshots)allSnapshots).snapshots);
            }
            catch (Exception e) {
                throw new CompletionException(e);
            }
        }, acknowledgeExecutor);
    }

    public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(Collection<OperatorCoordinatorCheckpointContext> coordinators, PendingCheckpoint checkpoint, Executor acknowledgeExecutor) throws CompletionException {
        try {
            return OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpoints(coordinators, checkpoint, acknowledgeExecutor);
        }
        catch (Exception e) {
            throw new CompletionException(e);
        }
    }

    private static void acknowledgeAllCoordinators(PendingCheckpoint checkpoint, Collection<CoordinatorSnapshot> snapshots) throws CheckpointException {
        for (CoordinatorSnapshot snapshot : snapshots) {
            CheckpointException error;
            PendingCheckpoint.TaskAcknowledgeResult result = checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
            if (result == PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) continue;
            String errorMessage = "Coordinator state not acknowledged successfully: " + (Object)((Object)result);
            CheckpointException checkpointException = error = checkpoint.isDisposed() ? checkpoint.getFailureCause() : null;
            if (error != null) {
                throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, error);
            }
            throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
        }
    }

    static final class CoordinatorSnapshot {
        final OperatorInfo coordinator;
        final ByteStreamStateHandle state;

        CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
            this.coordinator = coordinator;
            this.state = state;
        }
    }

    static final class AllCoordinatorSnapshots {
        private final Collection<CoordinatorSnapshot> snapshots;

        AllCoordinatorSnapshots(Collection<CoordinatorSnapshot> snapshots) {
            this.snapshots = snapshots;
        }

        public Iterable<CoordinatorSnapshot> snapshots() {
            return this.snapshots;
        }
    }
}

