package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.class */
public class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
    private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30000;
    private final boolean enableCheckpointAfterTasksFinished;
    private final CachingCheckpointStorageWorkerView checkpointStorage;
    private final String taskName;
    private final ExecutorService asyncOperationsThreadPool;
    private final Environment env;
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final ChannelStateWriter channelStateWriter;
    private final StreamTaskActionExecutor actionExecutor;
    private final BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot;
    private final Set<Long> abortedCheckpointIds;
    private final int maxRecordAbortedCheckpoints;
    private long maxAbortedCheckpointId;
    private long lastCheckpointId;
    private final Object lock;

    @GuardedBy("lock")
    private final Map<Long, AsyncCheckpointRunnable> checkpoints;

    @GuardedBy("lock")
    private boolean closed;
    private final BarrierAlignmentUtil.DelayableTimer registerTimer;
    private final Clock clock;
    private BarrierAlignmentUtil.Cancellable alignmentTimer;
    private long alignmentCheckpointId;

    @Nullable
    private final FileMergingSnapshotManager fileMergingSnapshotManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation = new int[Task.NotifyCheckpointOperation.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[Task.NotifyCheckpointOperation.ABORT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[Task.NotifyCheckpointOperation.COMPLETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[Task.NotifyCheckpointOperation.SUBSUME.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView.class */
    public static class CachingCheckpointStorageWorkerView implements CheckpointStorageWorkerView {
        private final Map<Long, CheckpointStreamFactory> cache;
        private final CheckpointStorageWorkerView delegate;

        private CachingCheckpointStorageWorkerView(CheckpointStorageWorkerView checkpointStorageWorkerView) {
            this.cache = new ConcurrentHashMap();
            this.delegate = checkpointStorageWorkerView;
        }

        void clearCacheFor(long j) {
            this.cache.remove(Long.valueOf(j));
        }

        public CheckpointStreamFactory resolveCheckpointStorageLocation(long j, CheckpointStorageLocationReference checkpointStorageLocationReference) {
            return this.cache.computeIfAbsent(Long.valueOf(j), l -> {
                try {
                    return this.delegate.resolveCheckpointStorageLocation(j, checkpointStorageLocationReference);
                } catch (IOException e) {
                    throw new FlinkRuntimeException(e);
                }
            });
        }

        public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
            return this.delegate.createTaskOwnedStateStream();
        }

        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
            return this.delegate.createTaskOwnedCheckpointStateToolset();
        }
    }

    @VisibleForTesting
    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorageWorkerView, String str, StreamTaskActionExecutor streamTaskActionExecutor, ExecutorService executorService, Environment environment, AsyncExceptionHandler asyncExceptionHandler, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> biFunctionWithException, int i, ChannelStateWriter channelStateWriter, boolean z, BarrierAlignmentUtil.DelayableTimer delayableTimer) {
        this(checkpointStorageWorkerView, str, streamTaskActionExecutor, executorService, environment, asyncExceptionHandler, biFunctionWithException, i, channelStateWriter, z, delayableTimer, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorageWorkerView, String str, StreamTaskActionExecutor streamTaskActionExecutor, ExecutorService executorService, Environment environment, AsyncExceptionHandler asyncExceptionHandler, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> biFunctionWithException, int i, ChannelStateWriter channelStateWriter, boolean z, BarrierAlignmentUtil.DelayableTimer delayableTimer, FileMergingSnapshotManager fileMergingSnapshotManager) {
        this.maxAbortedCheckpointId = 0L;
        this.checkpointStorage = new CachingCheckpointStorageWorkerView((CheckpointStorageWorkerView) Preconditions.checkNotNull(checkpointStorageWorkerView));
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.checkpoints = new HashMap();
        this.lock = new Object();
        this.asyncOperationsThreadPool = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.env = (Environment) Preconditions.checkNotNull(environment);
        this.asyncExceptionHandler = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
        this.actionExecutor = (StreamTaskActionExecutor) Preconditions.checkNotNull(streamTaskActionExecutor);
        this.channelStateWriter = (ChannelStateWriter) Preconditions.checkNotNull(channelStateWriter);
        this.prepareInputSnapshot = biFunctionWithException;
        this.abortedCheckpointIds = createAbortedCheckpointSetWithLimitSize(i);
        this.maxRecordAbortedCheckpoints = i;
        this.lastCheckpointId = -1L;
        this.closed = false;
        this.enableCheckpointAfterTasksFinished = z;
        this.registerTimer = delayableTimer;
        this.clock = SystemClock.getInstance();
        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
    }

    public static ChannelStateWriter openChannelStateWriter(String str, SupplierWithException<CheckpointStorageWorkerView, ? extends IOException> supplierWithException, Environment environment, int i) {
        return new ChannelStateWriterImpl(environment.getJobVertexId(), str, environment.getTaskInfo().getIndexOfThisSubtask(), supplierWithException, environment.getChannelStateExecutorFactory(), i);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException, OperatorChain<?, ?> operatorChain) throws IOException {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", Long.valueOf(j), this.taskName);
        this.lastCheckpointId = Math.max(this.lastCheckpointId, j);
        Iterator<Long> it = this.abortedCheckpointIds.iterator();
        while (it.hasNext() && it.next().longValue() < this.lastCheckpointId) {
            it.remove();
        }
        this.checkpointStorage.clearCacheFor(j);
        this.channelStateWriter.abort(j, checkpointException, true);
        this.env.declineCheckpoint(j, checkpointException);
        this.actionExecutor.runThrowing(() -> {
            if (j == this.alignmentCheckpointId) {
                cancelAlignmentTimer();
            }
            operatorChain.abortCheckpoint(j, checkpointException);
            operatorChain.broadcastEvent(new CancelCheckpointMarker(j));
        });
    }

    private void cancelAlignmentTimer() {
        if (this.alignmentTimer == null) {
            return;
        }
        this.alignmentTimer.cancel();
        this.alignmentTimer = null;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.checkpointStorage;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public ChannelStateWriter getChannelStateWriter() {
        return this.channelStateWriter;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder, OperatorChain<?, ?> operatorChain, boolean z, Supplier<Boolean> supplier) throws Exception {
        Preconditions.checkNotNull(checkpointOptions);
        Preconditions.checkNotNull(checkpointMetricsBuilder);
        if (this.lastCheckpointId >= checkpointMetaData.getCheckpointId()) {
            LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", Long.valueOf(this.lastCheckpointId), Long.valueOf(checkpointMetaData.getCheckpointId()));
            this.channelStateWriter.abort(checkpointMetaData.getCheckpointId(), new CancellationException(), true);
            checkAndClearAbortedStatus(checkpointMetaData.getCheckpointId());
            return;
        }
        logCheckpointProcessingDelay(checkpointMetaData);
        this.lastCheckpointId = checkpointMetaData.getCheckpointId();
        if (checkAndClearAbortedStatus(checkpointMetaData.getCheckpointId())) {
            operatorChain.broadcastEvent(new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()));
            this.channelStateWriter.abort(checkpointMetaData.getCheckpointId(), new CancellationException("checkpoint aborted via notification"), true);
            LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", Long.valueOf(checkpointMetaData.getCheckpointId()));
            return;
        }
        if (this.fileMergingSnapshotManager != null) {
            this.fileMergingSnapshotManager.notifyCheckpointStart(FileMergingSnapshotManager.SubtaskKey.of(this.env), checkpointMetaData.getCheckpointId());
        }
        if (checkpointOptions.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            checkpointOptions = checkpointOptions.withUnalignedSupported();
            initInputsCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
        }
        operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
        LOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}", new Object[]{this.taskName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(checkpointMetaData.getTimestamp()), Long.valueOf(System.currentTimeMillis() - checkpointMetaData.getTimestamp())});
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
        operatorChain.broadcastEvent(checkpointBarrier, checkpointOptions.isUnalignedCheckpoint());
        registerAlignmentTimer(checkpointMetaData.getCheckpointId(), operatorChain, checkpointBarrier);
        if (checkpointOptions.needsChannelState()) {
            this.channelStateWriter.finishOutput(checkpointMetaData.getCheckpointId());
        }
        HashMap newHashMapWithExpectedSize = CollectionUtil.newHashMapWithExpectedSize(operatorChain.getNumberOfOperators());
        try {
            if (takeSnapshotSync(newHashMapWithExpectedSize, checkpointMetaData, checkpointMetricsBuilder, checkpointOptions, operatorChain, supplier)) {
                finishAndReportAsync(newHashMapWithExpectedSize, checkpointMetaData, checkpointMetricsBuilder, operatorChain.isTaskDeployedAsFinished(), z, supplier);
            } else {
                cleanup(newHashMapWithExpectedSize, checkpointMetaData, checkpointMetricsBuilder, new Exception("Checkpoint declined"));
            }
        } catch (Exception e) {
            cleanup(newHashMapWithExpectedSize, checkpointMetaData, checkpointMetricsBuilder, e);
            throw e;
        }
    }

    private void registerAlignmentTimer(long j, OperatorChain<?, ?> operatorChain, CheckpointBarrier checkpointBarrier) {
        cancelAlignmentTimer();
        if (checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
            this.alignmentTimer = this.registerTimer.registerTask(() -> {
                try {
                    operatorChain.alignedBarrierTimeout(j);
                } catch (Exception e) {
                    ExceptionUtils.rethrowIOException(e);
                }
                this.alignmentTimer = null;
                return null;
            }, Duration.ofMillis(BarrierAlignmentUtil.getTimerDelay(this.clock, checkpointBarrier)));
            this.alignmentCheckpointId = j;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void notifyCheckpointComplete(long j, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        notifyCheckpoint(j, operatorChain, supplier, Task.NotifyCheckpointOperation.COMPLETE);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void notifyCheckpointAborted(long j, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        notifyCheckpoint(j, operatorChain, supplier, Task.NotifyCheckpointOperation.ABORT);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void notifyCheckpointSubsumed(long j, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        notifyCheckpoint(j, operatorChain, supplier, Task.NotifyCheckpointOperation.SUBSUME);
    }

    /* JADX WARN: Finally extract failed */
    private void notifyCheckpoint(long j, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier, Task.NotifyCheckpointOperation notifyCheckpointOperation) throws Exception {
        Exception exc = null;
        try {
            if (supplier.get().booleanValue()) {
                LOG.debug("Notification of checkpoint {} {} for task {}", new Object[]{notifyCheckpointOperation, Long.valueOf(j), this.taskName});
                if (notifyCheckpointOperation.equals(Task.NotifyCheckpointOperation.ABORT)) {
                    if (!cancelAsyncCheckpointRunnable(j) && j > this.lastCheckpointId) {
                        this.abortedCheckpointIds.add(Long.valueOf(j));
                        this.maxAbortedCheckpointId = Math.max(this.maxAbortedCheckpointId, j);
                    }
                    this.channelStateWriter.abort(j, new CancellationException("checkpoint aborted via notification"), false);
                }
                try {
                    switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[notifyCheckpointOperation.ordinal()]) {
                        case 1:
                            operatorChain.notifyCheckpointAborted(j);
                            break;
                        case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                            operatorChain.notifyCheckpointComplete(j);
                            break;
                        case 3:
                            operatorChain.notifyCheckpointSubsumed(j);
                            break;
                    }
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                }
            } else {
                LOG.debug("Ignoring notification of checkpoint {} {} for not-running task {}", new Object[]{notifyCheckpointOperation, Long.valueOf(j), this.taskName});
            }
            try {
                switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[notifyCheckpointOperation.ordinal()]) {
                    case 1:
                        this.env.getTaskStateManager().notifyCheckpointAborted(j);
                        break;
                    case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                        this.env.getTaskStateManager().notifyCheckpointComplete(j);
                        break;
                }
                notifyFileMergingSnapshotManagerCheckpoint(j, notifyCheckpointOperation);
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            ExceptionUtils.tryRethrowException(exc);
        } catch (Throwable th) {
            try {
                switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[notifyCheckpointOperation.ordinal()]) {
                    case 1:
                        this.env.getTaskStateManager().notifyCheckpointAborted(j);
                        break;
                    case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                        this.env.getTaskStateManager().notifyCheckpointComplete(j);
                        break;
                }
                notifyFileMergingSnapshotManagerCheckpoint(j, notifyCheckpointOperation);
            } catch (Exception e3) {
            }
            throw th;
        }
    }

    private void notifyFileMergingSnapshotManagerCheckpoint(long j, Task.NotifyCheckpointOperation notifyCheckpointOperation) throws Exception {
        if (this.fileMergingSnapshotManager != null) {
            switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$taskmanager$Task$NotifyCheckpointOperation[notifyCheckpointOperation.ordinal()]) {
                case 1:
                    this.fileMergingSnapshotManager.notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey.of(this.env), j);
                    return;
                case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                    this.fileMergingSnapshotManager.notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey.of(this.env), j);
                    return;
                case 3:
                    this.fileMergingSnapshotManager.notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey.of(this.env), j);
                    return;
                default:
                    return;
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void initInputsCheckpoint(long j, CheckpointOptions checkpointOptions) throws CheckpointException {
        if (checkpointOptions.isUnalignedCheckpoint()) {
            this.channelStateWriter.start(j, checkpointOptions);
            prepareInflightDataSnapshot(j);
        } else if (checkpointOptions.isTimeoutable()) {
            this.channelStateWriter.start(j, checkpointOptions);
            this.channelStateWriter.finishInput(j);
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void waitForPendingCheckpoints() throws Exception {
        ArrayList arrayList;
        if (this.enableCheckpointAfterTasksFinished) {
            synchronized (this.lock) {
                arrayList = new ArrayList(this.checkpoints.values());
            }
            arrayList.forEach(asyncCheckpointRunnable -> {
                try {
                    asyncCheckpointRunnable.getFinishedFuture().get();
                } catch (Exception e) {
                    LOG.debug("Async runnable for checkpoint " + asyncCheckpointRunnable.getCheckpointId() + " throws exception and exit", e);
                }
            });
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        cancelAlignmentTimer();
        cancel();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator
    public void cancel() throws IOException {
        ArrayList arrayList = null;
        synchronized (this.lock) {
            if (!this.closed) {
                this.closed = true;
                arrayList = new ArrayList(this.checkpoints.values());
                this.checkpoints.clear();
            }
        }
        IOUtils.closeAllQuietly(arrayList);
        this.channelStateWriter.close();
    }

    @VisibleForTesting
    int getAsyncCheckpointRunnableSize() {
        int size;
        synchronized (this.lock) {
            size = this.checkpoints.size();
        }
        return size;
    }

    @VisibleForTesting
    int getAbortedCheckpointSize() {
        return this.abortedCheckpointIds.size();
    }

    private boolean checkAndClearAbortedStatus(long j) {
        return this.abortedCheckpointIds.remove(Long.valueOf(j)) || j + ((long) this.maxRecordAbortedCheckpoints) < this.maxAbortedCheckpointId;
    }

    private void registerAsyncCheckpointRunnable(long j, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
        synchronized (this.lock) {
            if (this.closed) {
                LOG.debug("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
                IOUtils.closeQuietly(asyncCheckpointRunnable);
                Preconditions.checkState(!this.checkpoints.containsKey(Long.valueOf(j)), "SubtaskCheckpointCoordinator was closed without releasing asyncCheckpointRunnable for checkpoint %s", new Object[]{Long.valueOf(j)});
            } else {
                if (this.checkpoints.containsKey(Long.valueOf(j))) {
                    IOUtils.closeQuietly(asyncCheckpointRunnable);
                    throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", Long.valueOf(j)));
                }
                this.checkpoints.put(Long.valueOf(j), asyncCheckpointRunnable);
            }
        }
    }

    private boolean unregisterAsyncCheckpointRunnable(long j) {
        boolean z;
        synchronized (this.lock) {
            z = this.checkpoints.remove(Long.valueOf(j)) != null;
        }
        return z;
    }

    private boolean cancelAsyncCheckpointRunnable(long j) {
        AsyncCheckpointRunnable remove;
        synchronized (this.lock) {
            remove = this.checkpoints.remove(Long.valueOf(j));
        }
        if (remove != null) {
            this.asyncOperationsThreadPool.execute(() -> {
                IOUtils.closeQuietly(remove);
            });
        }
        return remove != null;
    }

    private void cleanup(Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetricsBuilder, Exception exc) {
        this.channelStateWriter.abort(checkpointMetaData.getCheckpointId(), exc, true);
        for (OperatorSnapshotFutures operatorSnapshotFutures : map.values()) {
            if (operatorSnapshotFutures != null) {
                try {
                    operatorSnapshotFutures.cancel();
                } catch (Exception e) {
                    LOG.warn("Could not properly cancel an operator snapshot result.", e);
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.taskName, Long.valueOf(checkpointMetaData.getCheckpointId()), Long.valueOf(checkpointMetricsBuilder.getAlignmentDurationNanosOrDefault() / 1000000), Long.valueOf(checkpointMetricsBuilder.getSyncDurationMillis())});
        }
    }

    private void prepareInflightDataSnapshot(long j) throws CheckpointException {
        ((CompletableFuture) this.prepareInputSnapshot.apply(this.channelStateWriter, Long.valueOf(j))).whenComplete((r9, th) -> {
            if (th != null) {
                this.channelStateWriter.abort(j, th, false);
            } else {
                this.channelStateWriter.finishInput(j);
            }
        });
    }

    private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetricsBuilder, boolean z, boolean z2, Supplier<Boolean> supplier) throws IOException {
        AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(map, checkpointMetaData, checkpointMetricsBuilder, System.nanoTime(), this.taskName, unregisterConsumer(), this.env, this.asyncExceptionHandler, z, z2, supplier);
        registerAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId(), asyncCheckpointRunnable);
        this.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
    }

    private Consumer<AsyncCheckpointRunnable> unregisterConsumer() {
        return asyncCheckpointRunnable -> {
            unregisterAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId());
        };
    }

    private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetricsBuilder, CheckpointOptions checkpointOptions, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        Preconditions.checkState(!operatorChain.isClosed(), "OperatorChain and Task should never be closed at this point");
        long checkpointId = checkpointMetaData.getCheckpointId();
        long nanoTime = System.nanoTime();
        try {
            operatorChain.snapshotState(map, checkpointMetaData, checkpointOptions, supplier, checkpointOptions.needsChannelState() ? this.channelStateWriter.getAndRemoveWriteResult(checkpointId) : ChannelStateWriter.ChannelStateWriteResult.EMPTY, applyFileMergingCheckpoint(this.checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation()), checkpointOptions));
            this.checkpointStorage.clearCacheFor(checkpointId);
            LOG.debug("{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms, is unaligned checkpoint : {}", new Object[]{this.taskName, Long.valueOf(checkpointId), Long.valueOf(checkpointMetricsBuilder.getAlignmentDurationNanosOrDefault() / 1000000), Long.valueOf(checkpointMetricsBuilder.getSyncDurationMillis()), Boolean.valueOf(checkpointOptions.isUnalignedCheckpoint())});
            checkpointMetricsBuilder.setSyncDurationMillis((System.nanoTime() - nanoTime) / 1000000);
            return true;
        } catch (Throwable th) {
            this.checkpointStorage.clearCacheFor(checkpointId);
            throw th;
        }
    }

    private CheckpointStreamFactory applyFileMergingCheckpoint(CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        return ((checkpointStreamFactory instanceof FsMergingCheckpointStorageLocation) && checkpointOptions.getCheckpointType().isSavepoint()) ? ((FsMergingCheckpointStorageLocation) checkpointStreamFactory).toNonFileMerging() : checkpointStreamFactory;
    }

    private Set<Long> createAbortedCheckpointSetWithLimitSize(final int i) {
        return Collections.newSetFromMap(new LinkedHashMap<Long, Boolean>() { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.1
            private static final long serialVersionUID = 1;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<Long, Boolean> entry) {
                return size() > i;
            }
        });
    }

    private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) {
        long currentTimeMillis = System.currentTimeMillis() - checkpointMetaData.getReceiveTimestamp();
        if (currentTimeMillis >= 30000) {
            LOG.warn("Time from receiving all checkpoint barriers/RPC for checkpoint {} to executing it exceeded threshold: {}ms", Long.valueOf(checkpointMetaData.getCheckpointId()), Long.valueOf(currentTimeMillis));
        }
    }
}
