/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class LegacyStatefulJobSavepointMigrationITCase
extends SavepointMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    private final MigrationVersion flinkGenerateSavepointVersion = MigrationVersion.v1_4;
    private final String flinkGenerateSavepointBackendType = "rocksdb";
    private final MigrationVersion testMigrateVersion;
    private final String testStateBackend;

    @Parameterized.Parameters(name="Migrate Savepoint / Backend: {0}")
    public static Collection<Tuple2<MigrationVersion, String>> parameters() {
        return Arrays.asList(Tuple2.of((Object)MigrationVersion.v1_2, (Object)"jobmanager"), Tuple2.of((Object)MigrationVersion.v1_2, (Object)"rocksdb"), Tuple2.of((Object)MigrationVersion.v1_3, (Object)"jobmanager"), Tuple2.of((Object)MigrationVersion.v1_3, (Object)"rocksdb"), Tuple2.of((Object)MigrationVersion.v1_4, (Object)"jobmanager"), Tuple2.of((Object)MigrationVersion.v1_4, (Object)"rocksdb"));
    }

    public LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
        this.testMigrateVersion = (MigrationVersion)testMigrateVersionAndBackend.f0;
        this.testStateBackend = (String)testMigrateVersionAndBackend.f1;
    }

    @Test
    @Ignore
    public void writeSavepoint() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        switch ("rocksdb") {
            case "rocksdb": {
                env.setStateBackend((AbstractStateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
                break;
            }
            case "jobmanager": {
                env.setStateBackend((AbstractStateBackend)new MemoryStateBackend());
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        env.enableCheckpointing(500L);
        env.setParallelism(4);
        env.setMaxParallelism(4);
        env.addSource((SourceFunction)new LegacyCheckpointedSource(4)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap((FlatMapFunction)new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap((FlatMapFunction)new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap((FlatMapFunction)new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>(){}.getTypeInfo(), (OneInputStreamOperator)new CheckpointedUdfOperator((FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>)new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator").keyBy(new int[]{0}).transform("timely_stateful_operator", new TypeHint<Tuple2<Long, Long>>(){}.getTypeInfo(), (OneInputStreamOperator)new TimelyStatefulOperator()).uid("TimelyStatefulOperator").addSink(new AccumulatorCountingSink());
        this.executeAndSavepoint(env, "src/test/resources/" + this.getSavepointPath(this.flinkGenerateSavepointVersion, "rocksdb"), new Tuple2((Object)AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, (Object)4));
    }

    @Test
    public void testSavepointRestore() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        switch (this.testStateBackend) {
            case "rocksdb": {
                env.setStateBackend((AbstractStateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
                break;
            }
            case "jobmanager": {
                env.setStateBackend((AbstractStateBackend)new MemoryStateBackend());
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        env.enableCheckpointing(500L);
        env.setParallelism(4);
        env.setMaxParallelism(4);
        env.addSource((SourceFunction)new CheckingRestoringSource(4)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap((FlatMapFunction)new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap((FlatMapFunction)new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap((FlatMapFunction)new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>(){}.getTypeInfo(), (OneInputStreamOperator)new CheckingRestoringUdfOperator((FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>)new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator").keyBy(new int[]{0}).transform("timely_stateful_operator", new TypeHint<Tuple2<Long, Long>>(){}.getTypeInfo(), (OneInputStreamOperator)new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator").addSink(new AccumulatorCountingSink());
        this.restoreAndExecute(env, LegacyStatefulJobSavepointMigrationITCase.getResourceFilename(this.getSavepointPath(this.testMigrateVersion, this.testStateBackend)), new Tuple2((Object)CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)1), new Tuple2((Object)CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, (Object)4), new Tuple2((Object)AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, (Object)4));
    }

    private String getSavepointPath(MigrationVersion savepointVersion, String backendType) {
        switch (backendType) {
            case "rocksdb": {
                return "stateful-udf-migration-itcase-flink" + savepointVersion + "-rocksdb-savepoint";
            }
            case "jobmanager": {
                return "stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
            }
        }
        throw new UnsupportedOperationException();
    }

    private static class AccumulatorCountingSink<T>
    extends RichSinkFunction<T> {
        private static final long serialVersionUID = 1L;
        public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
        int count = 0;

        private AccumulatorCountingSink() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void invoke(T value) throws Exception {
            ++this.count;
            this.getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add((Object)1);
        }
    }

    private static class CheckingTimelyStatefulOperator
    extends AbstractStreamOperator<Tuple2<Long, Long>>
    implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
    Triggerable<Long, Long> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
        public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
        public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS";
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);
        private transient InternalTimerService<Long> timerService;

        private CheckingTimelyStatefulOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("timer", (TypeSerializer)LongSerializer.INSTANCE, this);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ValueState state = (ValueState)this.getKeyedStateBackend().getPartitionedState(((Tuple2)element.getValue()).f0, (TypeSerializer)LongSerializer.INSTANCE, this.stateDescriptor);
            Assert.assertEquals((Object)state.value(), (Object)((Tuple2)element.getValue()).f1);
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add((Object)1);
            this.output.collect(element);
        }

        public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
            ValueState state = (ValueState)this.getKeyedStateBackend().getPartitionedState(timer.getNamespace(), (TypeSerializer)LongSerializer.INSTANCE, this.stateDescriptor);
            Assert.assertEquals((Object)state.value(), (Object)timer.getNamespace());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add((Object)1);
        }

        public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
            ValueState state = (ValueState)this.getKeyedStateBackend().getPartitionedState(timer.getNamespace(), (TypeSerializer)LongSerializer.INSTANCE, this.stateDescriptor);
            Assert.assertEquals((Object)state.value(), (Object)timer.getNamespace());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add((Object)1);
        }
    }

    private static class TimelyStatefulOperator
    extends AbstractStreamOperator<Tuple2<Long, Long>>
    implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
    Triggerable<Long, Long> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);
        private transient InternalTimerService<Long> timerService;

        private TimelyStatefulOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("timer", (TypeSerializer)LongSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ValueState state = (ValueState)this.getKeyedStateBackend().getPartitionedState(((Tuple2)element.getValue()).f0, (TypeSerializer)LongSerializer.INSTANCE, this.stateDescriptor);
            state.update(((Tuple2)element.getValue()).f1);
            this.timerService.registerEventTimeTimer(((Tuple2)element.getValue()).f0, this.timerService.currentWatermark() + 10L);
            this.timerService.registerProcessingTimeTimer(((Tuple2)element.getValue()).f0, this.timerService.currentProcessingTime() + 30000L);
            this.output.collect(element);
        }

        public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
        }

        public void processWatermark(Watermark mark) throws Exception {
            this.output.emitWatermark(mark);
        }
    }

    private static class CheckingRestoringUdfOperator
    extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
    implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
        private String restoredState;

        public CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
            super(userFunction);
        }

        public void open() throws Exception {
            super.open();
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), (Collector)new TimestampedCollector(this.output));
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }

        public void processWatermark(Watermark mark) throws Exception {
            this.output.emitWatermark(mark);
        }
    }

    private static class CheckpointedUdfOperator
    extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
    implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";

        public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
            super(userFunction);
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), (Collector)new TimestampedCollector(this.output));
        }

        public void processWatermark(Watermark mark) throws Exception {
            this.output.emitWatermark(mark);
        }
    }

    private static class CheckingKeyedStateFlatMap
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);

        private CheckingKeyedStateFlatMap() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            ValueState state = this.getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }
            Assert.assertEquals((Object)value.f1, (Object)state.value());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }
    }

    private static class KeyedStateSettingFlatMap
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);

        private KeyedStateSettingFlatMap() {
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            this.getRuntimeContext().getState(this.stateDescriptor).update(value.f1);
        }
    }

    private static class CheckingRestoringFlatMapWithKeyedStateInOperator
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK";
        private transient Tuple2<String, Long> restoredState;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);

        private CheckingRestoringFlatMapWithKeyedStateInOperator() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            ValueState state = this.getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }
            Assert.assertEquals((Object)value.f1, (Object)state.value());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }
    }

    private static class CheckingRestoringFlatMapWithKeyedState
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK";
        private transient Tuple2<String, Long> restoredState;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);

        private CheckingRestoringFlatMapWithKeyedState() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            ValueState state = this.getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }
            Assert.assertEquals((Object)value.f1, (Object)state.value());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }
    }

    private static class LegacyCheckpointedFlatMapWithKeyedState
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static Tuple2<String, Long> checkpointedTuple = new Tuple2((Object)"hello", (Object)42L);
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor("state-name", (TypeSerializer)LongSerializer.INSTANCE);

        private LegacyCheckpointedFlatMapWithKeyedState() {
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            this.getRuntimeContext().getState(this.stateDescriptor).update(value.f1);
            Assert.assertEquals((Object)value.f1, (Object)this.getRuntimeContext().getState(this.stateDescriptor).value());
        }
    }

    private static class CheckingRestoringFlatMap
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
        private transient Tuple2<String, Long> restoredState;

        private CheckingRestoringFlatMap() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }
    }

    private static class LegacyCheckpointedFlatMap
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static Tuple2<String, Long> checkpointedTuple = new Tuple2((Object)"hello", (Object)42L);

        private LegacyCheckpointedFlatMap() {
        }

        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            out.collect(value);
        }
    }

    private static class CheckingRestoringSource
    extends RichSourceFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class + "_RESTORE_CHECK";
        private volatile boolean isRunning = true;
        private final int numElements;
        private String restoredState;

        public CheckingRestoringSource(int numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
            ctx.emitWatermark(new Watermark(1000L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    private static class LegacyCheckpointedSource
    implements SourceFunction<Tuple2<Long, Long>> {
        public static String checkpointedString = "Here be dragons!";
        private static final long serialVersionUID = 1L;
        private volatile boolean isRunning = true;
        private final int numElements;

        public LegacyCheckpointedSource(int numElements) {
            this.numElements = numElements;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            ctx.emitWatermark(new Watermark(0L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

