/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.streamstatus;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.util.Preconditions;

@Internal
public class StatusWatermarkValve {
    private final ValveOutputHandler outputHandler;
    private final InputChannelStatus[] channelStatuses;
    private long lastOutputWatermark;
    private StreamStatus lastOutputStreamStatus;

    public StatusWatermarkValve(int numInputChannels, ValveOutputHandler outputHandler) {
        Preconditions.checkArgument((numInputChannels > 0 ? 1 : 0) != 0);
        this.channelStatuses = new InputChannelStatus[numInputChannels];
        for (int i = 0; i < numInputChannels; ++i) {
            this.channelStatuses[i] = new InputChannelStatus();
            this.channelStatuses[i].watermark = Long.MIN_VALUE;
            this.channelStatuses[i].streamStatus = StreamStatus.ACTIVE;
            this.channelStatuses[i].isWatermarkAligned = true;
        }
        this.outputHandler = (ValveOutputHandler)Preconditions.checkNotNull((Object)outputHandler);
        this.lastOutputWatermark = Long.MIN_VALUE;
        this.lastOutputStreamStatus = StreamStatus.ACTIVE;
    }

    public void inputWatermark(Watermark watermark, int channelIndex) {
        long watermarkMillis;
        if (this.lastOutputStreamStatus.isActive() && this.channelStatuses[channelIndex].streamStatus.isActive() && (watermarkMillis = watermark.getTimestamp()) > this.channelStatuses[channelIndex].watermark) {
            this.channelStatuses[channelIndex].watermark = watermarkMillis;
            if (!this.channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= this.lastOutputWatermark) {
                this.channelStatuses[channelIndex].isWatermarkAligned = true;
            }
            this.findAndOutputNewMinWatermarkAcrossAlignedChannels();
        }
    }

    public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) {
        if (streamStatus.isIdle() && this.channelStatuses[channelIndex].streamStatus.isActive()) {
            this.channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;
            this.channelStatuses[channelIndex].isWatermarkAligned = false;
            if (!InputChannelStatus.hasActiveChannels(this.channelStatuses)) {
                if (this.channelStatuses[channelIndex].watermark == this.lastOutputWatermark) {
                    this.findAndOutputMaxWatermarkAcrossAllChannels();
                }
                this.lastOutputStreamStatus = StreamStatus.IDLE;
                this.outputHandler.handleStreamStatus(this.lastOutputStreamStatus);
            } else if (this.channelStatuses[channelIndex].watermark == this.lastOutputWatermark) {
                this.findAndOutputNewMinWatermarkAcrossAlignedChannels();
            }
        } else if (streamStatus.isActive() && this.channelStatuses[channelIndex].streamStatus.isIdle()) {
            this.channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;
            if (this.channelStatuses[channelIndex].watermark >= this.lastOutputWatermark) {
                this.channelStatuses[channelIndex].isWatermarkAligned = true;
            }
            if (this.lastOutputStreamStatus.isIdle()) {
                this.lastOutputStreamStatus = StreamStatus.ACTIVE;
                this.outputHandler.handleStreamStatus(this.lastOutputStreamStatus);
            }
        }
    }

    private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
        long newMinWatermark = Long.MAX_VALUE;
        boolean hasAlignedChannels = false;
        for (InputChannelStatus channelStatus : this.channelStatuses) {
            if (!channelStatus.isWatermarkAligned) continue;
            hasAlignedChannels = true;
            newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
        }
        if (hasAlignedChannels && newMinWatermark > this.lastOutputWatermark) {
            this.lastOutputWatermark = newMinWatermark;
            this.outputHandler.handleWatermark(new Watermark(this.lastOutputWatermark));
        }
    }

    private void findAndOutputMaxWatermarkAcrossAllChannels() {
        long maxWatermark = Long.MIN_VALUE;
        for (InputChannelStatus channelStatus : this.channelStatuses) {
            maxWatermark = Math.max(channelStatus.watermark, maxWatermark);
        }
        if (maxWatermark > this.lastOutputWatermark) {
            this.lastOutputWatermark = maxWatermark;
            this.outputHandler.handleWatermark(new Watermark(this.lastOutputWatermark));
        }
    }

    @VisibleForTesting
    protected InputChannelStatus getInputChannelStatus(int channelIndex) {
        Preconditions.checkArgument((channelIndex >= 0 && channelIndex < this.channelStatuses.length ? 1 : 0) != 0, (Object)("Invalid channel index. Number of input channels: " + this.channelStatuses.length));
        return this.channelStatuses[channelIndex];
    }

    @VisibleForTesting
    protected static class InputChannelStatus {
        protected long watermark;
        protected StreamStatus streamStatus;
        protected boolean isWatermarkAligned;

        protected InputChannelStatus() {
        }

        private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) {
            for (InputChannelStatus status : channelStatuses) {
                if (!status.streamStatus.isActive()) continue;
                return true;
            }
            return false;
        }
    }

    public static interface ValveOutputHandler {
        public void handleWatermark(Watermark var1);

        public void handleStreamStatus(StreamStatus var1);
    }
}

