/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.OutputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
    protected B backend;

    protected abstract B getStateBackend() throws Exception;

    protected abstract void cleanup() throws Exception;

    @Before
    public void setup() throws Exception {
        this.backend = this.getStateBackend();
    }

    @After
    public void teardown() throws Exception {
        this.backend.dispose();
        this.cleanup();
    }

    @Test
    public void testValueState() throws Exception {
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        kvId.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        KvState kv = (KvState)state;
        kv.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        state.update((Object)"1");
        kv.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.value());
        state.update((Object)"2");
        kv.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.value());
        KvStateSnapshot snapshot1 = kv.snapshot(682375462378L, 2L);
        if (snapshot1 instanceof AsynchronousKvStateSnapshot) {
            snapshot1 = ((AsynchronousKvStateSnapshot)snapshot1).materialize();
        }
        kv.setCurrentKey((Object)1);
        state.update((Object)"u1");
        kv.setCurrentKey((Object)2);
        state.update((Object)"u2");
        kv.setCurrentKey((Object)3);
        state.update((Object)"u3");
        KvStateSnapshot snapshot2 = kv.snapshot(682375462379L, 4L);
        if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
            snapshot2 = ((AsynchronousKvStateSnapshot)snapshot2).materialize();
        }
        kv.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)state.value());
        kv.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)state.value());
        kv.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.value());
        kv.dispose();
        KvState restored1 = snapshot1.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
        snapshot1.discardState();
        ValueState restored1State = (ValueState)restored1;
        restored1.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1State.value());
        restored1.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1State.value());
        restored1.dispose();
        KvState restored2 = snapshot2.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
        snapshot2.discardState();
        ValueState restored2State = (ValueState)restored2;
        restored2.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)restored2State.value());
        restored2.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)restored2State.value());
        restored2.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2State.value());
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()));
            Assert.fail((String)"Should faill with NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
        kvId.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
        KvState kv = (KvState)state;
        kv.setCurrentKey((Object)1);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)1L);
        Assert.assertEquals((long)1L, (long)((Long)state.value()));
        kv.setCurrentKey((Object)2);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        kv.setCurrentKey((Object)1);
        state.clear();
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)17L);
        Assert.assertEquals((long)17L, (long)((Long)state.value()));
        state.update(null);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        KvStateSnapshot snapshot1 = kv.snapshot(682375462378L, 2L);
        if (snapshot1 instanceof AsynchronousKvStateSnapshot) {
            snapshot1 = ((AsynchronousKvStateSnapshot)snapshot1).materialize();
        }
        kv.dispose();
        snapshot1.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
    }

    @Test
    public void testListState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            Joiner joiner = Joiner.on((String)",");
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"", (Object)joiner.join((Iterable)state.get()));
            state.add((Object)"1");
            kv.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"", (Object)joiner.join((Iterable)state.get()));
            state.add((Object)"2");
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)state.get()));
            KvStateSnapshot snapshot1 = kv.snapshot(682375462378L, 2L);
            if (snapshot1 instanceof AsynchronousKvStateSnapshot) {
                snapshot1 = ((AsynchronousKvStateSnapshot)snapshot1).materialize();
            }
            kv.setCurrentKey((Object)1);
            state.add((Object)"u1");
            kv.setCurrentKey((Object)2);
            state.add((Object)"u2");
            kv.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KvStateSnapshot snapshot2 = kv.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot)snapshot2).materialize();
            }
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)state.get()));
            kv.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)state.get()));
            kv.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)state.get()));
            kv.dispose();
            KvState restored1 = snapshot1.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
            snapshot1.discardState();
            ListState restored1State = (ListState)restored1;
            restored1.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)restored1State.get()));
            restored1.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)joiner.join((Iterable)restored1State.get()));
            restored1.dispose();
            KvState restored2 = snapshot2.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            ListState restored2State = (ListState)restored2;
            restored2.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)restored2State.get()));
            restored2.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)restored2State.get()));
            restored2.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)restored2State.get()));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReducingState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new ReduceFunction<String>(){
                private static final long serialVersionUID = 1L;

                public String reduce(String value1, String value2) throws Exception {
                    return value1 + "," + value2;
                }
            }, String.class);
            ReducingState state = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            Joiner joiner = Joiner.on((String)",");
            kv.setCurrentKey((Object)1);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"1");
            kv.setCurrentKey((Object)2);
            Assert.assertEquals(null, (Object)state.get());
            state.add((Object)"2");
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)state.get());
            KvStateSnapshot snapshot1 = kv.snapshot(682375462378L, 2L);
            if (snapshot1 instanceof AsynchronousKvStateSnapshot) {
                snapshot1 = ((AsynchronousKvStateSnapshot)snapshot1).materialize();
            }
            kv.setCurrentKey((Object)1);
            state.add((Object)"u1");
            kv.setCurrentKey((Object)2);
            state.add((Object)"u2");
            kv.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KvStateSnapshot snapshot2 = kv.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot)snapshot2).materialize();
            }
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)state.get());
            kv.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)state.get());
            kv.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)state.get());
            kv.dispose();
            KvState restored1 = snapshot1.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
            snapshot1.discardState();
            ReducingState restored1State = (ReducingState)restored1;
            restored1.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)restored1State.get());
            restored1.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)restored1State.get());
            restored1.dispose();
            KvState restored2 = snapshot2.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            ReducingState restored2State = (ReducingState)restored2;
            restored2.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)restored2State.get());
            restored2.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)restored2State.get());
            restored2.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)restored2State.get());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFoldingState() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new FoldFunction<Integer, String>(){
                private static final long serialVersionUID = 1L;

                public String fold(String acc, Integer value) throws Exception {
                    return acc + "," + value;
                }
            }, String.class);
            FoldingState state = (FoldingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            Joiner joiner = Joiner.on((String)",");
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:", (Object)state.get());
            state.add((Object)1);
            kv.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:", (Object)state.get());
            state.add((Object)2);
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,1", (Object)state.get());
            KvStateSnapshot snapshot1 = kv.snapshot(682375462378L, 2L);
            if (snapshot1 instanceof AsynchronousKvStateSnapshot) {
                snapshot1 = ((AsynchronousKvStateSnapshot)snapshot1).materialize();
            }
            kv.setCurrentKey((Object)1);
            state.clear();
            state.add((Object)101);
            kv.setCurrentKey((Object)2);
            state.add((Object)102);
            kv.setCurrentKey((Object)3);
            state.add((Object)103);
            KvStateSnapshot snapshot2 = kv.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot)snapshot2).materialize();
            }
            kv.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,101", (Object)state.get());
            kv.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)state.get());
            kv.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"Fold-Initial:,103", (Object)state.get());
            kv.dispose();
            KvState restored1 = snapshot1.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 10L);
            snapshot1.discardState();
            FoldingState restored1State = (FoldingState)restored1;
            restored1.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,1", (Object)restored1State.get());
            restored1.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2", (Object)restored1State.get());
            restored1.dispose();
            KvState restored2 = snapshot2.restoreState(this.backend, (TypeSerializer)IntSerializer.INSTANCE, this.getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            FoldingState restored2State = (FoldingState)restored2;
            restored2.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Fold-Initial:,101", (Object)restored2State.get());
            restored2.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)restored2State.get());
            restored2.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"Fold-Initial:,103", (Object)restored2State.get());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
            kvId.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            kv.setCurrentKey((Object)1);
            state.update((Object)"1");
            kv.setCurrentKey((Object)2);
            state.update((Object)"2");
            KvStateSnapshot snapshot = kv.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot)snapshot).materialize();
            }
            FloatSerializer fakeIntSerializer = FloatSerializer.INSTANCE;
            try {
                snapshot.restoreState(this.backend, (TypeSerializer)fakeIntSerializer, this.getClass().getClassLoader(), 1L);
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (IllegalArgumentException illegalArgumentException) {
            }
            catch (Exception e) {
                Assert.fail((String)"wrong exception");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            kv.setCurrentKey((Object)1);
            state.add((Object)"1");
            kv.setCurrentKey((Object)2);
            state.add((Object)"2");
            KvStateSnapshot snapshot = kv.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot)snapshot).materialize();
            }
            kv.dispose();
            FloatSerializer fakeIntSerializer = FloatSerializer.INSTANCE;
            try {
                snapshot.restoreState(this.backend, (TypeSerializer)fakeIntSerializer, this.getClass().getClassLoader(), 1L);
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (IllegalArgumentException illegalArgumentException) {
            }
            catch (Exception e) {
                Assert.fail((String)("wrong exception " + e));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new ReduceFunction<String>(){

                public String reduce(String value1, String value2) throws Exception {
                    return value1 + "," + value2;
                }
            }, String.class);
            ReducingState state = (ReducingState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            kv.setCurrentKey((Object)1);
            state.add((Object)"1");
            kv.setCurrentKey((Object)2);
            state.add((Object)"2");
            KvStateSnapshot snapshot = kv.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot)snapshot).materialize();
            }
            kv.dispose();
            FloatSerializer fakeIntSerializer = FloatSerializer.INSTANCE;
            try {
                snapshot.restoreState(this.backend, (TypeSerializer)fakeIntSerializer, this.getClass().getClassLoader(), 1L);
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (IllegalArgumentException illegalArgumentException) {
            }
            catch (Exception e) {
                Assert.fail((String)("wrong exception " + e));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCopyDefaultValue() {
        try {
            this.backend.initializeForJob((Environment)new DummyEnvironment("test", 1, 0), "test_op", (TypeSerializer)IntSerializer.INSTANCE);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
            kvId.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState state = (ValueState)this.backend.getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)kvId);
            KvState kv = (KvState)state;
            kv.setCurrentKey((Object)1);
            IntValue default1 = (IntValue)state.value();
            kv.setCurrentKey((Object)2);
            IntValue default2 = (IntValue)state.value();
            Assert.assertNotNull((Object)default1);
            Assert.assertNotNull((Object)default2);
            Assert.assertEquals((Object)default1, (Object)default2);
            Assert.assertFalse((default1 == default2 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

