package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Objects;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.class */
public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> extends AbstractUdfStreamOperator<OUT, F> implements OneInputStreamOperator<IN, OUT>, Triggerable {
    private static final long serialVersionUID = 3245500864882459867L;
    private static final long MIN_SLIDE_TIME = 50;
    private final Function function;
    private final KeySelector<IN, KEY> keySelector;
    private final TypeSerializer<KEY> keySerializer;
    private final TypeSerializer<STATE> stateTypeSerializer;
    private final long windowSize;
    private final long windowSlide;
    private final long paneSize;
    private final int numPanesPerWindow;
    private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
    private transient TimestampedCollector<OUT> out;
    private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
    private transient long nextEvaluationTime;
    private transient long nextSlideTime;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator$RestoredState.class */
    private static final class RestoredState<IN, KEY, STATE, OUT> {
        final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
        final long nextEvaluationTime;
        final long nextSlideTime;

        RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> abstractKeyedTimePanes, long j, long j2) {
            this.panes = abstractKeyedTimePanes;
            this.nextEvaluationTime = j;
            this.nextSlideTime = j2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractAlignedProcessingTimeWindowOperator(F f, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> typeSerializer, TypeSerializer<STATE> typeSerializer2, long j, long j2) {
        super(f);
        if (j < MIN_SLIDE_TIME) {
            throw new IllegalArgumentException("Window length must be at least 50 msecs");
        }
        if (j2 < MIN_SLIDE_TIME) {
            throw new IllegalArgumentException("Window slide must be at least 50 msecs");
        }
        if (j < j2) {
            throw new IllegalArgumentException("The window size must be larger than the window slide");
        }
        long gcd = ArithmeticUtils.gcd(j, j2);
        if (gcd < MIN_SLIDE_TIME) {
            throw new IllegalArgumentException(String.format("Cannot compute window of size %d msecs sliding by %d msecs. The unit of grouping is too small: %d msecs", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(gcd)));
        }
        this.function = (Function) Objects.requireNonNull(f);
        this.keySelector = (KeySelector) Objects.requireNonNull(keySelector);
        this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer);
        this.stateTypeSerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer2);
        this.windowSize = j;
        this.windowSlide = j2;
        this.paneSize = gcd;
        this.numPanesPerWindow = MathUtils.checkedDownCast(j / gcd);
    }

    protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function);

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.out = new TimestampedCollector<>(this.output);
        long currentTimeMillis = System.currentTimeMillis();
        this.nextEvaluationTime = (currentTimeMillis + this.windowSlide) - (currentTimeMillis % this.windowSlide);
        this.nextSlideTime = (currentTimeMillis + this.paneSize) - (currentTimeMillis % this.paneSize);
        long min = Math.min(this.nextEvaluationTime, this.nextSlideTime);
        if (this.restoredState == null) {
            this.panes = createPanes(this.keySelector, this.function);
        } else {
            this.panes = this.restoredState.panes;
            long j = this.restoredState.nextEvaluationTime;
            long j2 = this.restoredState.nextSlideTime;
            int numPanes = this.panes.getNumPanes();
            for (long min2 = Math.min(j, j2); numPanes > 0 && min2 < min; min2 = Math.min(j, j2)) {
                if (min2 == j) {
                    computeWindow(min2);
                    j += this.windowSlide;
                }
                if (min2 == j2) {
                    this.panes.slidePanes(this.numPanesPerWindow);
                    numPanes--;
                    j2 += this.paneSize;
                }
            }
        }
        registerTimer(min, this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        stopTriggers();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() {
        super.dispose();
        stopTriggers();
        if (this.panes != null) {
            this.panes.dispose();
        }
    }

    private void stopTriggers() {
        this.nextEvaluationTime = -1L;
        this.nextSlideTime = -1L;
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.panes.addElementToLatestPane(streamRecord.getValue());
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) {
    }

    @Override // org.apache.flink.streaming.runtime.operators.Triggerable
    public void trigger(long j) throws Exception {
        if (j == this.nextEvaluationTime) {
            computeWindow(j);
            this.nextEvaluationTime += this.windowSlide;
        }
        if (j == this.nextSlideTime) {
            this.panes.slidePanes(this.numPanesPerWindow);
            this.nextSlideTime += this.paneSize;
        }
        registerTimer(Math.min(this.nextEvaluationTime, this.nextSlideTime), this);
    }

    private void computeWindow(long j) throws Exception {
        this.out.setAbsoluteTimestamp(j);
        this.panes.truncatePanes(this.numPanesPerWindow);
        this.panes.evaluateWindow(this.out, new TimeWindow(j, j + this.windowSize), this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
        createCheckpointStateOutputView.writeLong(this.nextEvaluationTime);
        createCheckpointStateOutputView.writeLong(this.nextSlideTime);
        this.panes.writeToOutput(createCheckpointStateOutputView, this.keySerializer, this.stateTypeSerializer);
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState, long j) throws Exception {
        super.restoreState(streamTaskState, j);
        DataInputView dataInputView = (DataInputView) streamTaskState.getOperatorState().getState(getUserCodeClassloader());
        long readLong = dataInputView.readLong();
        long readLong2 = dataInputView.readLong();
        AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes = createPanes(this.keySelector, this.function);
        createPanes.readFromInput(dataInputView, this.keySerializer, this.stateTypeSerializer);
        this.restoredState = new RestoredState<>(createPanes, readLong, readLong2);
    }

    public long getWindowSize() {
        return this.windowSize;
    }

    public long getWindowSlide() {
        return this.windowSlide;
    }

    public long getPaneSize() {
        return this.paneSize;
    }

    public int getNumPanesPerWindow() {
        return this.numPanesPerWindow;
    }

    public long getNextEvaluationTime() {
        return this.nextEvaluationTime;
    }

    public long getNextSlideTime() {
        return this.nextSlideTime;
    }

    public String toString() {
        return "Window (processing time) (length=" + this.windowSize + ", slide=" + this.windowSlide + ')';
    }
}
