/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

@Internal
public class OneInputStreamTask<IN, OUT>
extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    private StreamInputProcessor<IN> inputProcessor;
    private volatile boolean running = true;
    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

    public OneInputStreamTask(Environment env) {
        super(env);
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment env, @Nullable ProcessingTimeService timeProvider) {
        super(env, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = this.getConfiguration();
        TypeSerializer inSerializer = configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());
        int numberOfInputs = configuration.getNumberOfInputs();
        if (numberOfInputs > 0) {
            InputGate[] inputGates = this.getEnvironment().getAllInputGates();
            this.inputProcessor = new StreamInputProcessor(inputGates, inSerializer, this, configuration.getCheckpointMode(), this.getCheckpointLock(), this.getEnvironment().getIOManager(), this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getStreamStatusMaintainer(), (OneInputStreamOperator)this.headOperator, this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.inputWatermarkGauge);
        }
        ((OneInputStreamOperator)this.headOperator).getMetricGroup().gauge("currentInputWatermark", (Gauge)this.inputWatermarkGauge);
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge::getValue);
    }

    @Override
    protected void run() throws Exception {
        StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
        while (this.running && inputProcessor.processInput()) {
        }
    }

    @Override
    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.cleanup();
        }
    }

    @Override
    protected void cancelTask() {
        this.running = false;
    }
}

