package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.class */
public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStreamOperator<OUT> implements AsyncStateProcessingOperator {
    private AsyncExecutionController asyncExecutionController;
    private RecordContext currentProcessingContext;
    private Environment environment;

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        super.initializeState(streamTaskStateInitializer);
        getRuntimeContext().setKeyedStateStoreV2(this.stateHandler.getKeyedStateStoreV2().orElse(null));
        this.environment = ((StreamTask) Preconditions.checkNotNull(getContainingTask())).getEnvironment();
        MailboxExecutor mainMailboxExecutor = this.environment.getMainMailboxExecutor();
        int maxNumberOfParallelSubtasks = this.environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
        int asyncInflightRecordsLimit = this.environment.getExecutionConfig().getAsyncInflightRecordsLimit();
        int asyncStateBufferSize = this.environment.getExecutionConfig().getAsyncStateBufferSize();
        long asyncStateBufferTimeout = this.environment.getExecutionConfig().getAsyncStateBufferTimeout();
        AsyncKeyedStateBackend asyncKeyedStateBackend = this.stateHandler.getAsyncKeyedStateBackend();
        if (asyncKeyedStateBackend != null) {
            this.asyncExecutionController = new AsyncExecutionController(mainMailboxExecutor, this::handleAsyncStateException, asyncKeyedStateBackend.createStateExecutor(), maxNumberOfParallelSubtasks, asyncStateBufferSize, asyncStateBufferTimeout, asyncInflightRecordsLimit);
            asyncKeyedStateBackend.setup(this.asyncExecutionController);
        } else if (this.stateHandler.getKeyedStateBackend() != null) {
            throw new UnsupportedOperationException("Current State Backend doesn't support async access, AsyncExecutionController could not work");
        }
    }

    private void handleAsyncStateException(String str, Throwable th) {
        this.environment.failExternally(new AsyncStateException(str, th));
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing
    public boolean isAsyncStateProcessingEnabled() {
        return true;
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public ElementOrder getElementOrder() {
        return ElementOrder.RECORD_ORDER;
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final <T> void setAsyncKeyedContextElement(StreamRecord<T> streamRecord, KeySelector<T, ?> keySelector) throws Exception {
        this.currentProcessingContext = this.asyncExecutionController.buildContext(streamRecord.getValue(), keySelector.getKey(streamRecord.getValue()));
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final void postProcessElement() {
        this.currentProcessingContext.release();
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> throwingRunnable) {
        this.asyncExecutionController.syncPointRequestWithCallback(throwingRunnable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing
    public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(int i) {
        if (this instanceof TwoInputStreamOperator) {
            switch (i) {
                case 1:
                    KeySelector<?, ?> keySelector = this.stateKeySelector1;
                    TwoInputStreamOperator twoInputStreamOperator = (TwoInputStreamOperator) this;
                    twoInputStreamOperator.getClass();
                    return AsyncStateProcessing.makeRecordProcessor(this, keySelector, twoInputStreamOperator::processElement1);
                case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                    KeySelector<?, ?> keySelector2 = this.stateKeySelector2;
                    TwoInputStreamOperator twoInputStreamOperator2 = (TwoInputStreamOperator) this;
                    twoInputStreamOperator2.getClass();
                    return AsyncStateProcessing.makeRecordProcessor(this, keySelector2, twoInputStreamOperator2::processElement2);
            }
        }
        if ((this instanceof Input) && i == 1) {
            KeySelector<?, ?> keySelector3 = this.stateKeySelector1;
            Input input = (Input) this;
            input.getClass();
            return AsyncStateProcessing.makeRecordProcessor(this, keySelector3, input::processElement);
        }
        throw new IllegalArgumentException(String.format("Unsupported operator type %s with input id %d", getClass().getName(), Integer.valueOf(i)));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
        return super.snapshotState(j, j2, checkpointOptions, checkpointStreamFactory);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public <K, N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<N> typeSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        if (!isAsyncStateProcessingEnabled()) {
            return super.getInternalTimerService(str, typeSerializer, triggerable);
        }
        InternalTimeServiceManager<?> internalTimeServiceManager = this.timeServiceManager;
        KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        Preconditions.checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
        return internalTimeServiceManager.getAsyncInternalTimerService(str, keyedStateBackend.getKeySerializer(), typeSerializer, triggerable, this.asyncExecutionController);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement1(StreamRecord streamRecord) throws Exception {
        super.setKeyContextElement1(streamRecord);
        if (this.stateKeySelector1 != null) {
            setAsyncKeyedContextElement(streamRecord, this.stateKeySelector1);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement2(StreamRecord streamRecord) throws Exception {
        super.setKeyContextElement2(streamRecord);
        if (this.stateKeySelector2 != null) {
            setAsyncKeyedContextElement(streamRecord, this.stateKeySelector2);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.KeyContext
    public Object getCurrentKey() {
        return this.currentProcessingContext.getKey();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.processNonRecord(() -> {
                super.processWatermark(watermark);
            });
        } else {
            super.processWatermark(watermark);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.processNonRecord(() -> {
                super.processWatermarkStatus(watermarkStatus);
            });
        } else {
            super.processWatermarkStatus(watermarkStatus);
        }
    }

    @VisibleForTesting
    AsyncExecutionController<?> getAsyncExecutionController() {
        return this.asyncExecutionController;
    }

    @VisibleForTesting
    RecordContext getCurrentProcessingContext() {
        return this.currentProcessingContext;
    }
}
