/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class EventTimeAllWindowCheckpointingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(EventTimeAllWindowCheckpointingITCase.getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
        config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
        config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
        return config;
    }

    @Test
    public void testTumblingTimeWindow() {
        int numElementsPerKey = 3000;
        int windowSize = 100;
        boolean numKeys = true;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().timeWindowAll(Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((AllWindowFunction)new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        int numElementsPerKey = 3000;
        int windowSize = 1000;
        int windowSlide = 100;
        boolean numKeys = true;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().timeWindowAll(Time.of((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS), Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((AllWindowFunction)new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            env.execute("Sliding Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        int numElementsPerKey = 3000;
        int windowSize = 100;
        boolean numKeys = true;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().timeWindowAll(Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (AllWindowFunction)new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow window, Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> in : input) {
                        out.collect((Object)new Tuple4(in.f0, (Object)window.getStart(), (Object)window.getEnd(), in.f1));
                    }
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedFoldingTumblingTimeWindow() {
        int numElementsPerKey = 3000;
        int windowSize = 100;
        boolean numKeys = true;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().timeWindowAll(Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).fold((Object)new Tuple4((Object)0L, (Object)0L, (Object)0L, (Object)new IntType(0)), (FoldFunction)new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>(){

                public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator, Tuple2<Long, IntType> value) throws Exception {
                    accumulator.f0 = value.f0;
                    accumulator.f3 = new IntType(((IntType)accumulator.f3).value + ((IntType)value.f1).value);
                    return accumulator;
                }
            }, (AllWindowFunction)new RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow window, Iterable<Tuple4<Long, Long, Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple4<Long, Long, Long, IntType> in : input) {
                        out.collect((Object)new Tuple4(in.f0, (Object)window.getStart(), (Object)window.getEnd(), in.f3));
                    }
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        int numElementsPerKey = 3000;
        int windowSize = 1000;
        int windowSlide = 100;
        boolean numKeys = true;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().timeWindowAll(Time.of((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS), Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (AllWindowFunction)new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow window, Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> in : input) {
                        out.collect((Object)new Tuple4(in.f0, (Object)window.getStart(), (Object)window.getEnd(), in.f1));
                    }
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

