package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.class */
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
    protected final ProcessingTimeService processingTimeService;
    protected final TaskIOMetricGroup taskIOMetricGroup;
    protected final KeyContext keyContext;
    protected final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
    protected final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    protected final StreamTaskCancellationContext cancellationContext;
    private final KeyGroupRange localKeyGroupRange;
    private final int localKeyGroupRangeStartIdx;
    protected long currentWatermark = Long.MIN_VALUE;
    protected ScheduledFuture<?> nextTimer;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<N> namespaceSerializer;
    protected Triggerable<K, N> triggerTarget;
    private volatile boolean isInitialized;
    private TypeSerializer<K> keyDeserializer;
    private TypeSerializer<N> namespaceDeserializer;
    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InternalTimerServiceImpl(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue2, StreamTaskCancellationContext streamTaskCancellationContext) {
        this.taskIOMetricGroup = taskIOMetricGroup;
        this.keyContext = (KeyContext) Preconditions.checkNotNull(keyContext);
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.localKeyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        this.processingTimeTimersQueue = (KeyGroupedInternalPriorityQueue) Preconditions.checkNotNull(keyGroupedInternalPriorityQueue);
        this.eventTimeTimersQueue = (KeyGroupedInternalPriorityQueue) Preconditions.checkNotNull(keyGroupedInternalPriorityQueue2);
        this.cancellationContext = streamTaskCancellationContext;
        int i = Integer.MAX_VALUE;
        Iterator it = keyGroupRange.iterator();
        while (it.hasNext()) {
            i = Math.min(((Integer) it.next()).intValue(), i);
        }
        this.localKeyGroupRangeStartIdx = i;
    }

    public void startTimerService(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<K, N> triggerable) {
        if (this.isInitialized) {
            if (!this.keySerializer.equals(typeSerializer) || !this.namespaceSerializer.equals(typeSerializer2)) {
                throw new IllegalArgumentException("Already initialized Timer Service tried to be initialized with different key and namespace serializers.");
            }
            return;
        }
        if (typeSerializer == null || typeSerializer2 == null) {
            throw new IllegalArgumentException("The TimersService serializers cannot be null.");
        }
        if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
            throw new IllegalStateException("The TimerService has already been initialized.");
        }
        if (this.restoredTimersSnapshot != null) {
            TypeSerializerSchemaCompatibility resolveSchemaCompatibility = typeSerializer.snapshotConfiguration().resolveSchemaCompatibility(this.restoredTimersSnapshot.getKeySerializerSnapshot());
            if (resolveSchemaCompatibility.isIncompatible() || resolveSchemaCompatibility.isCompatibleAfterMigration()) {
                throw new IllegalStateException("Tried to initialize restored TimerService with new key serializer that requires migration or is incompatible.");
            }
            TypeSerializerSchemaCompatibility resolveSchemaCompatibility2 = typeSerializer2.snapshotConfiguration().resolveSchemaCompatibility(this.restoredTimersSnapshot.getNamespaceSerializerSnapshot());
            this.restoredTimersSnapshot = null;
            if (resolveSchemaCompatibility2.isIncompatible() || resolveSchemaCompatibility2.isCompatibleAfterMigration()) {
                throw new IllegalStateException("Tried to initialize restored TimerService with new namespace serializer that requires migration or is incompatible.");
            }
            this.keySerializer = resolveSchemaCompatibility.isCompatibleAsIs() ? typeSerializer : resolveSchemaCompatibility.getReconfiguredSerializer();
            this.namespaceSerializer = resolveSchemaCompatibility2.isCompatibleAsIs() ? typeSerializer2 : resolveSchemaCompatibility2.getReconfiguredSerializer();
        } else {
            this.keySerializer = typeSerializer;
            this.namespaceSerializer = typeSerializer2;
        }
        this.keyDeserializer = null;
        this.namespaceDeserializer = null;
        this.triggerTarget = (Triggerable) Preconditions.checkNotNull(triggerable);
        InternalTimer internalTimer = (InternalTimer) this.processingTimeTimersQueue.peek();
        if (internalTimer != null) {
            this.nextTimer = this.processingTimeService.registerTimer(internalTimer.getTimestamp(), this::onProcessingTime);
        }
        this.isInitialized = true;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerProcessingTimeTimer(N n, long j) {
        InternalTimer internalTimer = (InternalTimer) this.processingTimeTimersQueue.peek();
        if (this.processingTimeTimersQueue.add(new TimerHeapInternalTimer(j, this.keyContext.getCurrentKey(), n))) {
            if (j < (internalTimer != null ? internalTimer.getTimestamp() : Long.MAX_VALUE)) {
                if (this.nextTimer != null) {
                    this.nextTimer.cancel(false);
                }
                this.nextTimer = this.processingTimeService.registerTimer(j, this::onProcessingTime);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerEventTimeTimer(N n, long j) {
        this.eventTimeTimersQueue.add(new TimerHeapInternalTimer(j, this.keyContext.getCurrentKey(), n));
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteProcessingTimeTimer(N n, long j) {
        this.processingTimeTimersQueue.remove(new TimerHeapInternalTimer(j, this.keyContext.getCurrentKey(), n));
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteEventTimeTimer(N n, long j) {
        this.eventTimeTimersQueue.remove(new TimerHeapInternalTimer(j, this.keyContext.getCurrentKey(), n));
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException) throws Exception {
        foreachTimer(biConsumerWithException, this.eventTimeTimersQueue);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException) throws Exception {
        foreachTimer(biConsumerWithException, this.processingTimeTimersQueue);
    }

    protected void foreachTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue) throws Exception {
        CloseableIterator it = keyGroupedInternalPriorityQueue.iterator();
        Throwable th = null;
        while (it.hasNext()) {
            try {
                try {
                    TimerHeapInternalTimer timerHeapInternalTimer = (TimerHeapInternalTimer) it.next();
                    this.keyContext.setCurrentKey(timerHeapInternalTimer.getKey());
                    biConsumerWithException.accept(timerHeapInternalTimer.getNamespace(), Long.valueOf(timerHeapInternalTimer.getTimestamp()));
                } catch (Throwable th2) {
                    if (it != null) {
                        if (th != null) {
                            try {
                                it.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            it.close();
                        }
                    }
                    throw th2;
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        }
        if (it != null) {
            if (0 == 0) {
                it.close();
                return;
            }
            try {
                it.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    void onProcessingTime(long j) throws Exception {
        InternalTimer<K, N> internalTimer;
        this.nextTimer = null;
        while (true) {
            internalTimer = (InternalTimer) this.processingTimeTimersQueue.peek();
            if (internalTimer == null || internalTimer.getTimestamp() > j || this.cancellationContext.isCancelled()) {
                break;
            }
            this.keyContext.setCurrentKey(internalTimer.getKey());
            this.processingTimeTimersQueue.poll();
            this.triggerTarget.onProcessingTime(internalTimer);
            this.taskIOMetricGroup.getNumFiredTimers().inc();
        }
        if (internalTimer == null || this.nextTimer != null) {
            return;
        }
        this.nextTimer = this.processingTimeService.registerTimer(internalTimer.getTimestamp(), this::onProcessingTime);
    }

    public void advanceWatermark(long j) throws Exception {
        Preconditions.checkState(tryAdvanceWatermark(j, () -> {
            return false;
        }));
    }

    public boolean tryAdvanceWatermark(long j, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) throws Exception {
        boolean z;
        this.currentWatermark = j;
        boolean z2 = false;
        while (true) {
            z = z2;
            InternalTimer<K, N> internalTimer = (InternalTimer) this.eventTimeTimersQueue.peek();
            if (internalTimer == null || internalTimer.getTimestamp() > j || this.cancellationContext.isCancelled() || z) {
                break;
            }
            this.keyContext.setCurrentKey(internalTimer.getKey());
            this.eventTimeTimersQueue.poll();
            this.triggerTarget.onEventTime(internalTimer);
            this.taskIOMetricGroup.getNumFiredTimers().inc();
            z2 = shouldStopAdvancingFn.test();
        }
        return !z;
    }

    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int i) {
        return new InternalTimersSnapshot<>(this.keySerializer, this.namespaceSerializer, this.eventTimeTimersQueue.getSubsetForKeyGroup(i), this.processingTimeTimersQueue.getSubsetForKeyGroup(i));
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> internalTimersSnapshot, int i) {
        this.restoredTimersSnapshot = internalTimersSnapshot;
        TypeSerializer<K> restoreSerializer = this.restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer();
        if (this.keyDeserializer != null && !this.keyDeserializer.equals(restoreSerializer)) {
            throw new IllegalArgumentException("Tried to restore timers for the same service with different key serializers.");
        }
        this.keyDeserializer = restoreSerializer;
        TypeSerializer<N> restoreSerializer2 = this.restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer();
        if (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoreSerializer2)) {
            throw new IllegalArgumentException("Tried to restore timers for the same service with different namespace serializers.");
        }
        this.namespaceDeserializer = restoreSerializer2;
        Preconditions.checkArgument(this.localKeyGroupRange.contains(i), "Key Group " + i + " does not belong to the local range.");
        this.eventTimeTimersQueue.addAll(this.restoredTimersSnapshot.getEventTimeTimers());
        this.processingTimeTimersQueue.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers());
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numProcessingTimeTimers(N n) {
        return countTimersInNamespaceInternal(n, this.processingTimeTimersQueue);
    }

    @VisibleForTesting
    public int numEventTimeTimers(N n) {
        return countTimersInNamespaceInternal(n, this.eventTimeTimersQueue);
    }

    private int countTimersInNamespaceInternal(N n, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> internalPriorityQueue) {
        int i = 0;
        try {
            CloseableIterator it = internalPriorityQueue.iterator();
            Throwable th = null;
            while (it.hasNext()) {
                try {
                    try {
                        if (((TimerHeapInternalTimer) it.next()).getNamespace().equals(n)) {
                            i++;
                        }
                    } finally {
                    }
                } finally {
                }
            }
            if (it != null) {
                if (0 != 0) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    it.close();
                }
            }
            return i;
        } catch (Exception e) {
            throw new FlinkRuntimeException("Exception when closing iterator.", e);
        }
    }

    @VisibleForTesting
    int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
        return (List<Set<TimerHeapInternalTimer<K, N>>>) partitionElementsByKeyGroup(this.eventTimeTimersQueue);
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
        return (List<Set<TimerHeapInternalTimer<K, N>>>) partitionElementsByKeyGroup(this.processingTimeTimersQueue);
    }

    private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedInternalPriorityQueue) {
        ArrayList arrayList = new ArrayList(this.localKeyGroupRange.getNumberOfKeyGroups());
        Iterator it = this.localKeyGroupRange.iterator();
        while (it.hasNext()) {
            arrayList.add(Collections.unmodifiableSet(keyGroupedInternalPriorityQueue.getSubsetForKeyGroup(((Integer) it.next()).intValue())));
        }
        return arrayList;
    }
}
