package org.apache.flink.runtime.operators.coordination;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventValve.class */
final class OperatorEventValve {
    private static final long NO_CHECKPOINT = Long.MIN_VALUE;

    @GuardedBy("lock")
    private final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender;

    @GuardedBy("lock")
    private boolean shut;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap();

    @GuardedBy("lock")
    private long currentCheckpointId = NO_CHECKPOINT;

    @GuardedBy("lock")
    private long lastCheckpointId = NO_CHECKPOINT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventValve$BlockedEvent.class */
    public static final class BlockedEvent {
        final SerializedValue<OperatorEvent> event;
        final CompletableFuture<Acknowledge> future;
        final int subtask;

        BlockedEvent(SerializedValue<OperatorEvent> serializedValue, int i, CompletableFuture<Acknowledge> completableFuture) {
            this.event = serializedValue;
            this.future = completableFuture;
            this.subtask = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventValve$FuturePair.class */
    private static final class FuturePair {
        final CompletableFuture<Acknowledge> originalFuture;
        final CompletableFuture<Acknowledge> ackFuture;

        FuturePair(CompletableFuture<Acknowledge> completableFuture, CompletableFuture<Acknowledge> completableFuture2) {
            this.originalFuture = completableFuture;
            this.ackFuture = completableFuture2;
        }
    }

    public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> biFunction) {
        this.eventSender = biFunction;
    }

    public boolean isShut() {
        boolean z;
        synchronized (this.lock) {
            z = this.shut;
        }
        return z;
    }

    public CompletableFuture<Acknowledge> sendEvent(SerializedValue<OperatorEvent> serializedValue, int i) {
        synchronized (this.lock) {
            if (!this.shut) {
                return this.eventSender.apply(serializedValue, Integer.valueOf(i));
            }
            List<BlockedEvent> computeIfAbsent = this.blockedEvents.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ArrayList();
            });
            CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
            computeIfAbsent.add(new BlockedEvent(serializedValue, i, completableFuture));
            return completableFuture;
        }
    }

    public void markForCheckpoint(long j) {
        synchronized (this.lock) {
            if (this.currentCheckpointId != NO_CHECKPOINT && this.currentCheckpointId != j) {
                throw new IllegalStateException(String.format("Cannot mark for checkpoint %d, already marked for checkpoint %d", Long.valueOf(j), Long.valueOf(this.currentCheckpointId)));
            }
            if (j <= this.lastCheckpointId) {
                throw new IllegalStateException(String.format("Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", Long.valueOf(this.lastCheckpointId), Long.valueOf(j)));
            }
            this.currentCheckpointId = j;
            this.lastCheckpointId = j;
        }
    }

    public void shutValve(long j) {
        synchronized (this.lock) {
            if (j != this.currentCheckpointId) {
                Object[] objArr = new Object[2];
                objArr[0] = this.currentCheckpointId == NO_CHECKPOINT ? "(none)" : String.valueOf(this.currentCheckpointId);
                objArr[1] = Long.valueOf(j);
                throw new IllegalStateException(String.format("Cannot shut valve for non-prepared checkpoint. Prepared checkpoint = %s, attempting-to-close checkpoint = %d", objArr));
            }
            this.shut = true;
        }
    }

    public void openValveAndUnmarkCheckpoint() {
        synchronized (this.lock) {
            this.currentCheckpointId = NO_CHECKPOINT;
            if (this.shut) {
                ArrayList arrayList = new ArrayList(this.blockedEvents.size());
                Iterator<List<BlockedEvent>> it = this.blockedEvents.values().iterator();
                while (it.hasNext()) {
                    for (BlockedEvent blockedEvent : it.next()) {
                        arrayList.add(new FuturePair(blockedEvent.future, this.eventSender.apply(blockedEvent.event, Integer.valueOf(blockedEvent.subtask))));
                    }
                }
                this.blockedEvents.clear();
                this.shut = false;
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    FuturePair futurePair = (FuturePair) it2.next();
                    FutureUtils.forward(futurePair.ackFuture, futurePair.originalFuture);
                }
            }
        }
    }

    public void resetForTask(int i) {
        List<BlockedEvent> remove;
        synchronized (this.lock) {
            remove = this.blockedEvents.remove(Integer.valueOf(i));
        }
        failAllFutures(remove);
    }

    public void reset() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.lock) {
            for (List<BlockedEvent> list : this.blockedEvents.values()) {
                if (list != null) {
                    arrayList.addAll(list);
                }
            }
            this.blockedEvents.clear();
            this.shut = false;
            this.currentCheckpointId = NO_CHECKPOINT;
        }
        failAllFutures(arrayList);
    }

    private static void failAllFutures(@Nullable List<BlockedEvent> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        FlinkException flinkException = new FlinkException("Event discarded due to failure of target task");
        Iterator<BlockedEvent> it = list.iterator();
        while (it.hasNext()) {
            it.next().future.completeExceptionally(flinkException);
        }
    }
}
