/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Public
public class WindowedStream<T, K, W extends Window> {
    private final KeyedStream<T, K> input;
    private final WindowAssigner<? super T, W> windowAssigner;
    private Trigger<? super T, ? super W> trigger;
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0L;
    private OutputTag<T> lateDataOutputTag;

    @PublicEvolving
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
        this.input = input;
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }

    @PublicEvolving
    public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
        if (this.windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        if (this.windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + this.windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
        }
        this.trigger = trigger;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        long millis = lateness.toMilliseconds();
        Preconditions.checkArgument((millis >= 0L ? 1 : 0) != 0, (Object)"The allowed lateness cannot be negative.");
        this.allowedLateness = millis;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, (String)"Side output tag must not be null.");
        this.lateDataOutputTag = this.input.getExecutionEnvironment().clean(outputTag);
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
        if (this.windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + this.windowAssigner.getClass().getSimpleName() + " with an Evictor.");
        }
        this.evictor = evictor;
        return this;
    }

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. Please use reduce(ReduceFunction, WindowFunction) instead.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        return this.reduce(function, new PassThroughWindowFunction());
    }

    public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        TypeInformation inType = this.input.getType();
        TypeInformation<R> resultType = WindowedStream.getWindowFunctionReturnType(function, inType);
        return this.reduce(reduceFunction, function, resultType);
    }

    public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, T, R, ? super W> operator;
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        reduceFunction = this.input.getExecutionEnvironment().clean(reduceFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, reduceFunction, function);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, W, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction<K, W, T, R>(new ReduceApplyWindowFunction<K, W, T, R>(reduceFunction, function)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", reduceFunction, this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<T, R, K, W>(function), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
        TypeInformation<R> resultType = WindowedStream.getProcessWindowFunctionReturnType(function, this.input.getType(), null);
        return this.reduce(reduceFunction, function, resultType);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, T, R, ? super W> operator;
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        reduceFunction = this.input.getExecutionEnvironment().clean(reduceFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, reduceFunction, function);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, W, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableProcessWindowFunction<K, W, T, R>(new ReduceApplyProcessWindowFunction<K, W, T, R>(reduceFunction, function)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", reduceFunction, this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueProcessWindowFunction<T, R, K, W>(function), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. Please use fold(FoldFunction, WindowFunction) instead.");
        }
        TypeInformation resultType = TypeExtractor.getFoldReturnTypes(function, this.input.getType(), (String)Utils.getCallLocationName(), (boolean)true);
        return this.fold(initialValue, function, resultType);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. Please use fold(FoldFunction, WindowFunction) instead.");
        }
        return this.fold(initialValue, function, new PassThroughWindowFunction(), resultType, resultType);
    }

    @Deprecated
    @PublicEvolving
    public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) {
        TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, this.input.getType(), (String)Utils.getCallLocationName(), (boolean)true);
        TypeInformation<R> resultType = WindowedStream.getWindowFunctionReturnType(function, foldAccumulatorType);
        return this.fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
    }

    @Deprecated
    @PublicEvolving
    public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function, TypeInformation<ACC> foldAccumulatorType, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, ACC, R, ? super W> operator;
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction.");
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
        }
        if (this.windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Fold cannot be used with a " + this.windowAssigner.getClass().getSimpleName() + " assigner.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        foldFunction = this.input.getExecutionEnvironment().clean(foldFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, foldFunction, function);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction(new FoldApplyWindowFunction<K, W, T, ACC, R>(initialValue, foldFunction, function, foldAccumulatorType)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            FoldingStateDescriptor stateDesc = new FoldingStateDescriptor("window-contents", initialValue, foldFunction, foldAccumulatorType.createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, ACC, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<ACC, R, K, W>(function), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @Deprecated
    @PublicEvolving
    public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
        }
        TypeInformation foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, this.input.getType(), (String)Utils.getCallLocationName(), (boolean)true);
        TypeInformation<R> windowResultType = WindowedStream.getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName());
        return this.fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
    }

    @Deprecated
    @Internal
    public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> foldResultType, TypeInformation<R> windowResultType) {
        WindowOperator<K, ? super T, ACC, R, ? super W> operator;
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
        }
        windowFunction = this.input.getExecutionEnvironment().clean(windowFunction);
        foldFunction = this.input.getExecutionEnvironment().clean(foldFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, foldFunction, windowFunction);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableProcessWindowFunction(new FoldApplyProcessWindowFunction<K, W, T, ACC, R>(initialValue, foldFunction, windowFunction, foldResultType)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            FoldingStateDescriptor stateDesc = new FoldingStateDescriptor("window-contents", initialValue, foldFunction, foldResultType.createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, ACC, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueProcessWindowFunction<ACC, R, K, W>(windowFunction), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, windowResultType, operator);
    }

    @PublicEvolving
    public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
        Preconditions.checkNotNull(function, (String)"function");
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
        }
        TypeInformation accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(function, this.input.getType(), null, (boolean)false);
        TypeInformation resultType = TypeExtractor.getAggregateFunctionReturnType(function, this.input.getType(), null, (boolean)false);
        return this.aggregate(function, accumulatorType, resultType);
    }

    @PublicEvolving
    public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function, TypeInformation<ACC> accumulatorType, TypeInformation<R> resultType) {
        Preconditions.checkNotNull(function, (String)"function");
        Preconditions.checkNotNull(accumulatorType, (String)"accumulatorType");
        Preconditions.checkNotNull(resultType, (String)"resultType");
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
        }
        return this.aggregate(function, new PassThroughWindowFunction(), accumulatorType, resultType, resultType);
    }

    @PublicEvolving
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction) {
        Preconditions.checkNotNull(aggFunction, (String)"aggFunction");
        Preconditions.checkNotNull(windowFunction, (String)"windowFunction");
        TypeInformation accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(aggFunction, this.input.getType(), null, (boolean)false);
        TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType(aggFunction, this.input.getType(), null, (boolean)false);
        TypeInformation<R> resultType = WindowedStream.getWindowFunctionReturnType(windowFunction, aggResultType);
        return this.aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
    }

    @PublicEvolving
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<V> aggregateResultType, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, V, R, ? super W> operator;
        Preconditions.checkNotNull(aggregateFunction, (String)"aggregateFunction");
        Preconditions.checkNotNull(windowFunction, (String)"windowFunction");
        Preconditions.checkNotNull(accumulatorType, (String)"accumulatorType");
        Preconditions.checkNotNull(aggregateResultType, (String)"aggregateResultType");
        Preconditions.checkNotNull(resultType, (String)"resultType");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        windowFunction = this.input.getExecutionEnvironment().clean(windowFunction);
        aggregateFunction = this.input.getExecutionEnvironment().clean(aggregateFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, aggregateFunction, windowFunction);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction(new AggregateApplyWindowFunction<K, W, T, ACC, V, R>(aggregateFunction, windowFunction)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            AggregatingStateDescriptor stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, V, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<V, R, K, W>(windowFunction), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @PublicEvolving
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction) {
        Preconditions.checkNotNull(aggFunction, (String)"aggFunction");
        Preconditions.checkNotNull(windowFunction, (String)"windowFunction");
        TypeInformation accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(aggFunction, this.input.getType(), null, (boolean)false);
        TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType(aggFunction, this.input.getType(), null, (boolean)false);
        TypeInformation<R> resultType = WindowedStream.getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);
        return this.aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
    }

    private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType(WindowFunction<IN, OUT, KEY, ?> function, TypeInformation<IN> inType) {
        return TypeExtractor.getUnaryOperatorReturnType(function, WindowFunction.class, (int)0, (int)1, (int[])new int[]{2, 0}, (int[])new int[]{3, 0}, inType, null, (boolean)false);
    }

    private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType(ProcessWindowFunction<IN, OUT, KEY, ?> function, TypeInformation<IN> inType, String functionName) {
        return TypeExtractor.getUnaryOperatorReturnType(function, ProcessWindowFunction.class, (int)0, (int)1, (int[])TypeExtractor.NO_INDEX, (int[])TypeExtractor.NO_INDEX, inType, (String)functionName, (boolean)false);
    }

    @PublicEvolving
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<V> aggregateResultType, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, V, R, ? super W> operator;
        Preconditions.checkNotNull(aggregateFunction, (String)"aggregateFunction");
        Preconditions.checkNotNull(windowFunction, (String)"windowFunction");
        Preconditions.checkNotNull(accumulatorType, (String)"accumulatorType");
        Preconditions.checkNotNull(aggregateResultType, (String)"aggregateResultType");
        Preconditions.checkNotNull(resultType, (String)"resultType");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        windowFunction = this.input.getExecutionEnvironment().clean(windowFunction);
        aggregateFunction = this.input.getExecutionEnvironment().clean(aggregateFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, aggregateFunction, windowFunction);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W>(aggregateFunction, windowFunction), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            AggregatingStateDescriptor stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, V, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueProcessWindowFunction<V, R, K, W>(windowFunction), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
        TypeInformation<R> resultType = WindowedStream.getWindowFunctionReturnType(function, this.getInputType());
        return this.apply(function, resultType);
    }

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = this.input.getExecutionEnvironment().clean(function);
        return this.apply(new InternalIterableWindowFunction<T, R, K, W>(function), resultType, function);
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
        TypeInformation<R> resultType = WindowedStream.getProcessWindowFunctionReturnType(function, this.getInputType(), null);
        return this.process(function, resultType);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = this.input.getExecutionEnvironment().clean(function);
        return this.apply((InternalWindowFunction<Iterable<T>, R, K, W>)new InternalIterableProcessWindowFunction<T, R, K, W>(function), resultType, (Function)function);
    }

    private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
        WindowOperator<K, ? super T, Iterable<T>, R, ? super W> operator;
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, originalFunction, null);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, function, this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, Iterable<T>, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        TypeInformation inType = this.input.getType();
        TypeInformation<R> resultType = WindowedStream.getWindowFunctionReturnType(function, inType);
        return this.apply(reduceFunction, function, resultType);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, T, R, ? super W> operator;
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        reduceFunction = this.input.getExecutionEnvironment().clean(reduceFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, reduceFunction, function);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, W, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction<K, W, T, R>(new ReduceApplyWindowFunction<K, W, T, R>(reduceFunction, function)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", reduceFunction, this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<T, R, K, W>(function), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) {
        TypeInformation resultType = TypeExtractor.getFoldReturnTypes(foldFunction, this.input.getType(), (String)Utils.getCallLocationName(), (boolean)true);
        return this.apply(initialValue, foldFunction, function, resultType);
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) {
        WindowOperator<K, ? super T, R, R, ? super W> operator;
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
        }
        function = this.input.getExecutionEnvironment().clean(function);
        foldFunction = this.input.getExecutionEnvironment().clean(foldFunction);
        String opName = WindowedStream.generateOperatorName(this.windowAssigner, this.trigger, this.evictor, foldFunction, function);
        KeySelector<T, K> keySel = this.input.getKeySelector();
        if (this.evictor != null) {
            StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(this.input.getType().createSerializer(this.getExecutionEnvironment().getConfig()));
            ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", streamRecordSerializer);
            operator = new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction(new FoldApplyWindowFunction<K, W, T, R, R>(initialValue, foldFunction, function, resultType)), this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
        } else {
            FoldingStateDescriptor stateDesc = new FoldingStateDescriptor("window-contents", initialValue, foldFunction, resultType.createSerializer(this.getExecutionEnvironment().getConfig()));
            operator = new WindowOperator<K, T, R, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<R, R, K, W>(function), this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
        return this.input.transform(opName, resultType, operator);
    }

    private static String generateFunctionName(Function function) {
        Class<?> functionClass = function.getClass();
        if (functionClass.isAnonymousClass()) {
            Class<?>[] interfaces = functionClass.getInterfaces();
            if (interfaces.length == 0) {
                Class<?> functionSuperClass = functionClass.getSuperclass();
                return functionSuperClass.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
            }
            Class<?> functionInterface = functionClass.getInterfaces()[0];
            return functionInterface.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
        }
        return functionClass.getSimpleName();
    }

    private static String generateOperatorName(WindowAssigner<?, ?> assigner, Trigger<?, ?> trigger, @Nullable Evictor<?, ?> evictor, Function function1, @Nullable Function function2) {
        return "Window(" + assigner + ", " + trigger.getClass().getSimpleName() + ", " + (evictor == null ? "" : evictor.getClass().getSimpleName() + ", ") + WindowedStream.generateFunctionName(function1) + (function2 == null ? "" : ", " + WindowedStream.generateFunctionName(function2)) + ")";
    }

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return this.aggregate(new SumAggregator(positionToSum, this.input.getType(), this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return this.aggregate(new SumAggregator(field, this.input.getType(), this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return this.aggregate(new ComparableAggregator(positionToMin, this.input.getType(), AggregationFunction.AggregationType.MIN, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return this.aggregate(new ComparableAggregator(field, this.input.getType(), AggregationFunction.AggregationType.MIN, false, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String field) {
        return this.minBy(field, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMinBy, this.input.getType(), AggregationFunction.AggregationType.MINBY, first, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.input.getType(), AggregationFunction.AggregationType.MINBY, first, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return this.aggregate(new ComparableAggregator(positionToMax, this.input.getType(), AggregationFunction.AggregationType.MAX, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return this.aggregate(new ComparableAggregator(field, this.input.getType(), AggregationFunction.AggregationType.MAX, false, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String field) {
        return this.maxBy(field, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMaxBy, this.input.getType(), AggregationFunction.AggregationType.MAXBY, first, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.input.getType(), AggregationFunction.AggregationType.MAXBY, first, this.input.getExecutionConfig()));
    }

    private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
        return this.reduce(aggregator);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.input.getExecutionEnvironment();
    }

    public TypeInformation<T> getInputType() {
        return this.input.getType();
    }
}

