package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.class */
public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
    private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
    private volatile boolean running;
    private final WatermarkGauge input1WatermarkGauge;
    private final WatermarkGauge input2WatermarkGauge;
    private final MinWatermarkGauge minInputWatermarkGauge;

    public TwoInputStreamTask(Environment environment) {
        super(environment);
        this.running = true;
        this.input1WatermarkGauge = new WatermarkGauge();
        this.input2WatermarkGauge = new WatermarkGauge();
        this.minInputWatermarkGauge = new MinWatermarkGauge(this.input1WatermarkGauge, this.input2WatermarkGauge);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        TypeSerializer typeSerializerIn1 = configuration.getTypeSerializerIn1(userCodeClassLoader);
        TypeSerializer typeSerializerIn2 = configuration.getTypeSerializerIn2(userCodeClassLoader);
        int numberOfInputs = configuration.getNumberOfInputs();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List<StreamEdge> inPhysicalEdges = configuration.getInPhysicalEdges(userCodeClassLoader);
        for (int i = 0; i < numberOfInputs; i++) {
            int typeNumber = inPhysicalEdges.get(i).getTypeNumber();
            InputGate inputGate = getEnvironment().getInputGate(i);
            switch (typeNumber) {
                case 1:
                    arrayList.add(inputGate);
                    break;
                case 2:
                    arrayList2.add(inputGate);
                    break;
                default:
                    throw new RuntimeException("Invalid input type number: " + typeNumber);
            }
        }
        this.inputProcessor = new StreamTwoInputProcessor<>(arrayList, arrayList2, typeSerializerIn1, typeSerializerIn2, this, configuration.getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), (TwoInputStreamOperator) this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), this.input1WatermarkGauge, this.input2WatermarkGauge);
        ((TwoInputStreamOperator) this.headOperator).getMetricGroup().gauge("currentInputWatermark", this.minInputWatermarkGauge);
        ((TwoInputStreamOperator) this.headOperator).getMetricGroup().gauge("currentInput1Watermark", this.input1WatermarkGauge);
        ((TwoInputStreamOperator) this.headOperator).getMetricGroup().gauge("currentInput2Watermark", this.input2WatermarkGauge);
        TaskMetricGroup metricGroup = getEnvironment().getMetricGroup();
        MinWatermarkGauge minWatermarkGauge = this.minInputWatermarkGauge;
        minWatermarkGauge.getClass();
        metricGroup.gauge("currentInputWatermark", minWatermarkGauge::m63getValue);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        StreamTwoInputProcessor<IN1, IN2> streamTwoInputProcessor = this.inputProcessor;
        while (this.running && streamTwoInputProcessor.processInput()) {
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.cleanup();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        this.running = false;
    }
}
