/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.migration.runtime.state.memory;

import java.io.IOException;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.util.Preconditions;

@Deprecated
public abstract class AbstractMigrationRestoreStrategy<K, N, S>
implements MigrationRestoreSnapshot<K, N, S> {
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<N> namespaceSerializer;
    protected final TypeSerializer<S> stateSerializer;

    public AbstractMigrationRestoreStrategy(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<S> stateSerializer) {
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.namespaceSerializer = (TypeSerializer)Preconditions.checkNotNull(namespaceSerializer);
        this.stateSerializer = (TypeSerializer)Preconditions.checkNotNull(stateSerializer);
    }

    @Override
    public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException {
        Preconditions.checkNotNull((Object)stateName, (String)"State name is null. Cannot deserialize snapshot.");
        Preconditions.checkNotNull(stateBackend, (String)"State backend is null. Cannot deserialize snapshot.");
        KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange();
        Preconditions.checkState((1 == keyGroupRange.getNumberOfKeyGroups() ? 1 : 0) != 0, (Object)"Unexpected number of key-groups for restoring from Flink 1.1");
        Object patchedNamespaceSerializer = this.namespaceSerializer;
        if (patchedNamespaceSerializer instanceof VoidSerializer) {
            patchedNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        }
        RegisteredKeyedBackendStateMetaInfo<N, S> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<N, S>(StateDescriptor.Type.UNKNOWN, stateName, patchedNamespaceSerializer, this.stateSerializer);
        StateTable<Object, Object, Object> stateTable = stateBackend.newStateTable(registeredKeyedBackendStateMetaInfo);
        DataInputView inView = this.openDataInputView();
        int keyGroup = keyGroupRange.getStartKeyGroup();
        int numNamespaces = inView.readInt();
        for (int i = 0; i < numNamespaces; ++i) {
            Object namespace = this.namespaceSerializer.deserialize(inView);
            if (null == namespace) {
                namespace = VoidNamespace.INSTANCE;
            }
            int numKV = inView.readInt();
            for (int j = 0; j < numKV; ++j) {
                Object key = this.keySerializer.deserialize(inView);
                Object value = this.stateSerializer.deserialize(inView);
                stateTable.put(key, keyGroup, namespace, value);
            }
        }
        return stateTable;
    }

    protected abstract DataInputView openDataInputView() throws IOException;
}

