/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.OutputStream;
import java.util.HashMap;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
    protected B backend;

    protected abstract B getStateBackend() throws Exception;

    protected abstract void cleanup() throws Exception;

    @Before
    public void setup() throws Exception {
        this.backend = this.getStateBackend();
    }

    @After
    public void teardown() throws Exception {
        this.backend.dispose();
        this.cleanup();
    }

    @Test
    public void testValueState() throws Exception {
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        kvId.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        this.backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        state.update((Object)"1");
        this.backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.value());
        state.update((Object)"2");
        this.backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.value());
        HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
        for (Object key : snapshot1.keySet()) {
            if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
            snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
        }
        this.backend.setCurrentKey((Object)1);
        state.update((Object)"u1");
        this.backend.setCurrentKey((Object)2);
        state.update((Object)"u2");
        this.backend.setCurrentKey((Object)3);
        state.update((Object)"u3");
        HashMap snapshot2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
        for (Object key : snapshot2.keySet()) {
            if (!(snapshot2.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
            snapshot2.put(key, ((AsynchronousKvStateSnapshot)snapshot2.get(key)).materialize());
        }
        this.backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)state.value());
        this.backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)state.value());
        this.backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.value());
        this.backend.dispose();
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshot1);
        for (Object key : snapshot1.keySet()) {
            ((KvStateSnapshot)snapshot1.get(key)).discardState();
        }
        ValueState restored1 = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        this.backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.value());
        this.backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.value());
        this.backend.dispose();
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshot2);
        for (String key : snapshot2.keySet()) {
            ((KvStateSnapshot)snapshot2.get(key)).discardState();
        }
        ValueState restored2 = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        this.backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)restored2.value());
        this.backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)restored2.value());
        this.backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2.value());
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()));
            Assert.fail((String)"Should faill with NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
        kvId.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        this.backend.setCurrentKey((Object)1);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)1L);
        Assert.assertEquals((long)1L, (long)((Long)state.value()));
        this.backend.setCurrentKey((Object)2);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        this.backend.setCurrentKey((Object)1);
        state.clear();
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)17L);
        Assert.assertEquals((long)17L, (long)((Long)state.value()));
        state.update(null);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
        for (String key : snapshot1.keySet()) {
            if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
            snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
        }
        this.backend.dispose();
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshot1);
        for (String key : snapshot1.keySet()) {
            ((KvStateSnapshot)snapshot1.get(key)).discardState();
        }
        this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
    }

    @Test
    public void testListState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            Joiner joiner = Joiner.on((String)",");
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"1");
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"2");
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)state.get()));
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (Object key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            this.backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            this.backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            HashMap snapshot2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (Object key : snapshot2.keySet()) {
                if (!(snapshot2.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot2.put(key, ((AsynchronousKvStateSnapshot)snapshot2.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)state.get()));
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)state.get()));
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)state.get()));
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (Object key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            ListState restored1 = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)restored1.get()));
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)joiner.join((Iterable)restored1.get()));
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot2);
            for (String key : snapshot2.keySet()) {
                ((KvStateSnapshot)snapshot2.get(key)).discardState();
            }
            ListState restored2 = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)restored2.get()));
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)restored2.get()));
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)restored2.get()));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReducingState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
            ReducingState state = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"1");
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"2");
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)state.get());
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (Object key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            this.backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            this.backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            HashMap snapshot2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (Object key : snapshot2.keySet()) {
                if (!(snapshot2.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot2.put(key, ((AsynchronousKvStateSnapshot)snapshot2.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)state.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)state.get());
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)state.get());
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (Object key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            ReducingState restored1 = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)restored1.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)restored1.get());
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot2);
            for (String key : snapshot2.keySet()) {
                ((KvStateSnapshot)snapshot2.get(key)).discardState();
            }
            ReducingState restored2 = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)restored2.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)restored2.get());
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)restored2.get());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFoldingState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new AppendingFold(), String.class);
            FoldingState state = (FoldingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)1);
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)2);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,1", (Object)state.get());
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (Object key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            state.clear();
            state.add((Object)101);
            this.backend.setCurrentKey((Object)2);
            state.add((Object)102);
            this.backend.setCurrentKey((Object)3);
            state.add((Object)103);
            HashMap snapshot2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (Object key : snapshot2.keySet()) {
                if (!(snapshot2.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot2.put(key, ((AsynchronousKvStateSnapshot)snapshot2.get(key)).materialize());
            }
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,101", (Object)state.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)state.get());
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"Fold-Initial:,103", (Object)state.get());
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (Object key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            FoldingState restored1 = (FoldingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,1", (Object)restored1.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2", (Object)restored1.get());
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot2);
            for (String key : snapshot2.keySet()) {
                ((KvStateSnapshot)snapshot2.get(key)).discardState();
            }
            FoldingState restored2 = (FoldingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,101", (Object)restored2.get());
            this.backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)restored2.get());
            this.backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"Fold-Initial:,103", (Object)restored2.get());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
            kvId.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            this.backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (String key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ValueStateDescriptor("id", (TypeSerializer)fakeStringSerializer, null);
                state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
                state.value();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail((String)("wrong exception " + e));
                }
            }
            catch (Exception e) {
                Assert.fail((String)("wrong exception " + e));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            this.backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (String key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ListStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail((String)("wrong exception " + e));
                }
            }
            catch (Exception e) {
                Assert.fail((String)("wrong exception " + e));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)StringSerializer.INSTANCE);
            ReducingState state = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            this.backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            this.backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            HashMap snapshot1 = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String key : snapshot1.keySet()) {
                if (!(snapshot1.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                snapshot1.put(key, ((AsynchronousKvStateSnapshot)snapshot1.get(key)).materialize());
            }
            this.backend.dispose();
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot1);
            for (String key : snapshot1.keySet()) {
                ((KvStateSnapshot)snapshot1.get(key)).discardState();
            }
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)fakeStringSerializer);
                state = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail((String)("wrong exception " + e));
                }
            }
            catch (Exception e) {
                Assert.fail((String)("wrong exception " + e));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        kvId.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        this.backend.setCurrentKey((Object)1);
        IntValue default1 = (IntValue)state.value();
        this.backend.setCurrentKey((Object)2);
        IntValue default2 = (IntValue)state.value();
        Assert.assertNotNull((Object)default1);
        Assert.assertNotNull((Object)default2);
        Assert.assertEquals((Object)default1, (Object)default2);
        Assert.assertFalse((default1 == default2 ? 1 : 0) != 0);
    }

    @Test
    public void testEmptyStateCheckpointing() {
        try {
            DummyEnvironment env = new DummyEnvironment("test", 1, 0);
            this.backend.initializeForJob((Environment)env, "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            HashMap snapshot = this.backend.snapshotPartitionedState(682375462379L, 1L);
            Assert.assertNull((Object)snapshot);
            this.backend.dispose();
            this.backend.initializeForJob((Environment)env, "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshot);
            this.backend.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class AppendingFold
    implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1L;

        private AppendingFold() {
        }

        public String fold(String acc, Integer value) throws Exception {
            return acc + "," + value;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

