package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend.class */
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final ClassLoader userClassloader;
    private final ExecutionConfig executionConfig;
    private final boolean asynchronousSnapshots;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer = new JavaSerializer<>();
    private final Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap();
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = new HashMap();
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName = new HashMap<>();
    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName = new HashMap();
    private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();

    /* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.class */
    private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
        protected DefaultOperatorStateBackendSnapshotStrategy() {
            super("DefaultOperatorStateBackend snapshot");
        }

        @Override // org.apache.flink.runtime.state.SnapshotStrategy
        @Nonnull
        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull final CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
            if (DefaultOperatorStateBackend.this.registeredOperatorStates.isEmpty() && DefaultOperatorStateBackend.this.registeredBroadcastStates.isEmpty()) {
                return DoneFuture.of(SnapshotResult.empty());
            }
            final HashMap hashMap = new HashMap(DefaultOperatorStateBackend.this.registeredOperatorStates.size());
            final HashMap hashMap2 = new HashMap(DefaultOperatorStateBackend.this.registeredBroadcastStates.size());
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(DefaultOperatorStateBackend.this.userClassloader);
            try {
                if (!DefaultOperatorStateBackend.this.registeredOperatorStates.isEmpty()) {
                    for (Map.Entry entry : DefaultOperatorStateBackend.this.registeredOperatorStates.entrySet()) {
                        PartitionableListState partitionableListState = (PartitionableListState) entry.getValue();
                        if (null != partitionableListState) {
                            partitionableListState = partitionableListState.deepCopy();
                        }
                        hashMap.put(entry.getKey(), partitionableListState);
                    }
                }
                if (!DefaultOperatorStateBackend.this.registeredBroadcastStates.isEmpty()) {
                    for (Map.Entry entry2 : DefaultOperatorStateBackend.this.registeredBroadcastStates.entrySet()) {
                        BackendWritableBroadcastState backendWritableBroadcastState = (BackendWritableBroadcastState) entry2.getValue();
                        if (null != backendWritableBroadcastState) {
                            backendWritableBroadcastState = backendWritableBroadcastState.deepCopy();
                        }
                        hashMap2.put(entry2.getKey(), backendWritableBroadcastState);
                    }
                }
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>.AsyncSnapshotTask asyncSnapshotFutureTask = new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() { // from class: org.apache.flink.runtime.state.DefaultOperatorStateBackend.DefaultOperatorStateBackendSnapshotStrategy.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    /* JADX WARN: Type inference failed for: r0v2, types: [java.io.OutputStream, org.apache.flink.core.fs.FSDataOutputStream, java.io.Closeable, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
                    @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                    public SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                        ?? createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                        registerCloseableForCancellation(createCheckpointStateOutputStream);
                        ArrayList arrayList = new ArrayList(hashMap.size());
                        Iterator it = hashMap.entrySet().iterator();
                        while (it.hasNext()) {
                            arrayList.add(((PartitionableListState) ((Map.Entry) it.next()).getValue()).getStateMetaInfo().snapshot());
                        }
                        ArrayList arrayList2 = new ArrayList(hashMap2.size());
                        Iterator it2 = hashMap2.entrySet().iterator();
                        while (it2.hasNext()) {
                            arrayList2.add(((BackendWritableBroadcastState) ((Map.Entry) it2.next()).getValue()).getStateMetaInfo().snapshot());
                        }
                        new OperatorBackendSerializationProxy(arrayList, arrayList2).write(new DataOutputViewStreamWrapper((OutputStream) createCheckpointStateOutputStream));
                        HashMap hashMap3 = new HashMap(hashMap.size() + hashMap2.size());
                        for (Map.Entry entry3 : hashMap.entrySet()) {
                            PartitionableListState partitionableListState2 = (PartitionableListState) entry3.getValue();
                            hashMap3.put(entry3.getKey(), new OperatorStateHandle.StateMetaInfo(partitionableListState2.write(createCheckpointStateOutputStream), partitionableListState2.getStateMetaInfo().getAssignmentMode()));
                        }
                        for (Map.Entry entry4 : hashMap2.entrySet()) {
                            BackendWritableBroadcastState backendWritableBroadcastState2 = (BackendWritableBroadcastState) entry4.getValue();
                            hashMap3.put(entry4.getKey(), new OperatorStateHandle.StateMetaInfo(new long[]{backendWritableBroadcastState2.write(createCheckpointStateOutputStream)}, backendWritableBroadcastState2.getStateMetaInfo().getAssignmentMode()));
                        }
                        if (!unregisterCloseableFromCancellation(createCheckpointStateOutputStream)) {
                            throw new IOException("Stream was already unregistered.");
                        }
                        StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
                        return SnapshotResult.of(closeAndGetHandle != null ? new OperatorStreamStateHandle(hashMap3, closeAndGetHandle) : null);
                    }

                    @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                    protected void cleanupProvidedResources() {
                    }

                    @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                    protected void logAsyncSnapshotComplete(long j3) {
                        if (DefaultOperatorStateBackend.this.asynchronousSnapshots) {
                            DefaultOperatorStateBackendSnapshotStrategy.this.logAsyncCompleted(checkpointStreamFactory, j3);
                        }
                    }
                }.toAsyncSnapshotFutureTask(DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry);
                if (!DefaultOperatorStateBackend.this.asynchronousSnapshots) {
                    asyncSnapshotFutureTask.run();
                }
                return asyncSnapshotFutureTask;
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend$PartitionableListState.class */
    public static final class PartitionableListState<S> implements ListState<S> {
        private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
        private final ArrayList<S> internalList;
        private final ArrayListSerializer<S> internalListCopySerializer;

        PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> registeredOperatorStateBackendMetaInfo) {
            this(registeredOperatorStateBackendMetaInfo, new ArrayList());
        }

        private PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> registeredOperatorStateBackendMetaInfo, ArrayList<S> arrayList) {
            this.stateMetaInfo = (RegisteredOperatorStateBackendMetaInfo) Preconditions.checkNotNull(registeredOperatorStateBackendMetaInfo);
            this.internalList = (ArrayList) Preconditions.checkNotNull(arrayList);
            this.internalListCopySerializer = new ArrayListSerializer<>(registeredOperatorStateBackendMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> partitionableListState) {
            this(partitionableListState.stateMetaInfo.deepCopy(), partitionableListState.internalListCopySerializer.copy(partitionableListState.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> registeredOperatorStateBackendMetaInfo) {
            this.stateMetaInfo = registeredOperatorStateBackendMetaInfo;
        }

        public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
            return this.stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<>(this);
        }

        public void clear() {
            this.internalList.clear();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Iterable<S> m2138get() {
            return this.internalList;
        }

        public void add(S s) {
            Preconditions.checkNotNull(s, "You cannot add null to a ListState.");
            this.internalList.add(s);
        }

        public String toString() {
            return "PartitionableListState{stateMetaInfo=" + this.stateMetaInfo + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream fSDataOutputStream) throws IOException {
            long[] jArr = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(fSDataOutputStream);
            for (int i = 0; i < this.internalList.size(); i++) {
                S s = this.internalList.get(i);
                jArr[i] = fSDataOutputStream.getPos();
                getStateMetaInfo().getPartitionStateSerializer().serialize(s, dataOutputViewStreamWrapper);
            }
            return jArr;
        }

        public void update(List<S> list) {
            this.internalList.clear();
            addAll(list);
        }

        public void addAll(List<S> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            this.internalList.addAll(list);
        }
    }

    public DefaultOperatorStateBackend(ClassLoader classLoader, ExecutionConfig executionConfig, boolean z) {
        this.userClassloader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.executionConfig = executionConfig;
        this.asynchronousSnapshots = z;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredOperatorStates.keySet();
    }

    public Set<String> getRegisteredBroadcastStateNames() {
        return this.registeredBroadcastStates.keySet();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override // org.apache.flink.runtime.state.OperatorStateBackend
    public void dispose() {
        IOUtils.closeQuietly(this.closeStreamOnCancelRegistry);
        this.registeredOperatorStates.clear();
        this.registeredBroadcastStates.clear();
    }

    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) throws StateMigrationException {
        Preconditions.checkNotNull(mapStateDescriptor);
        String str = (String) Preconditions.checkNotNull(mapStateDescriptor.getName());
        BackendWritableBroadcastState<?, ?> backendWritableBroadcastState = this.accessedBroadcastStatesByName.get(str);
        if (backendWritableBroadcastState != null) {
            checkStateNameAndMode(backendWritableBroadcastState.getStateMetaInfo().getName(), str, backendWritableBroadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            return backendWritableBroadcastState;
        }
        mapStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer<?> typeSerializer = (TypeSerializer) Preconditions.checkNotNull(mapStateDescriptor.getKeySerializer());
        TypeSerializer<?> typeSerializer2 = (TypeSerializer) Preconditions.checkNotNull(mapStateDescriptor.getValueSerializer());
        BackendWritableBroadcastState<?, ?> backendWritableBroadcastState2 = this.registeredBroadcastStates.get(str);
        if (backendWritableBroadcastState2 == null) {
            backendWritableBroadcastState2 = new HeapBroadcastState((RegisteredBroadcastStateBackendMetaInfo<?, ?>) new RegisteredBroadcastStateBackendMetaInfo(str, OperatorStateHandle.Mode.BROADCAST, typeSerializer, typeSerializer2));
            this.registeredBroadcastStates.put(str, backendWritableBroadcastState2);
        } else {
            checkStateNameAndMode(backendWritableBroadcastState2.getStateMetaInfo().getName(), str, backendWritableBroadcastState2.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            RegisteredBroadcastStateBackendMetaInfo<?, ?> stateMetaInfo = backendWritableBroadcastState2.getStateMetaInfo();
            if (stateMetaInfo.updateKeySerializer(typeSerializer).isIncompatible()) {
                throw new StateMigrationException("The new key serializer for broadcast state must not be incompatible.");
            }
            if (stateMetaInfo.updateValueSerializer(typeSerializer2).isIncompatible()) {
                throw new StateMigrationException("The new value serializer for broadcast state must not be incompatible.");
            }
            backendWritableBroadcastState2.setStateMetaInfo(stateMetaInfo);
        }
        this.accessedBroadcastStatesByName.put(str, backendWritableBroadcastState2);
        return backendWritableBroadcastState2;
    }

    public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Deprecated
    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor);
    }

    @Deprecated
    public <T extends Serializable> ListState<T> getSerializableListState(String str) throws Exception {
        return getListState(new ListStateDescriptor(str, this.javaSerializer));
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<OperatorStateHandle> collection) throws Exception {
        if (null == collection || collection.isEmpty()) {
            return;
        }
        for (OperatorStateHandle operatorStateHandle : collection) {
            if (operatorStateHandle != null) {
                FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
                this.closeStreamOnCancelRegistry.registerCloseable(openInputStream);
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(this.userClassloader);
                    OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                    operatorBackendSerializationProxy.read(new DataInputViewStreamWrapper(openInputStream));
                    for (StateMetaInfoSnapshot stateMetaInfoSnapshot : operatorBackendSerializationProxy.getOperatorStateMetaInfoSnapshots()) {
                        RegisteredOperatorStateBackendMetaInfo registeredOperatorStateBackendMetaInfo = new RegisteredOperatorStateBackendMetaInfo(stateMetaInfoSnapshot);
                        if (registeredOperatorStateBackendMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
                            throw new IOException("Unable to restore operator state [" + stateMetaInfoSnapshot.getName() + "]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        if (null == this.registeredOperatorStates.get(stateMetaInfoSnapshot.getName())) {
                            PartitionableListState<?> partitionableListState = new PartitionableListState<>((RegisteredOperatorStateBackendMetaInfo<?>) registeredOperatorStateBackendMetaInfo);
                            this.registeredOperatorStates.put(partitionableListState.getStateMetaInfo().getName(), partitionableListState);
                        }
                    }
                    for (StateMetaInfoSnapshot stateMetaInfoSnapshot2 : operatorBackendSerializationProxy.getBroadcastStateMetaInfoSnapshots()) {
                        RegisteredBroadcastStateBackendMetaInfo registeredBroadcastStateBackendMetaInfo = new RegisteredBroadcastStateBackendMetaInfo(stateMetaInfoSnapshot2);
                        if ((registeredBroadcastStateBackendMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer) || (registeredBroadcastStateBackendMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer)) {
                            throw new IOException("Unable to restore broadcast state [" + stateMetaInfoSnapshot2.getName() + "]. The previous key and value serializers of the state must be present; the serializers could have been removed from the classpath, or their implementations have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        if (this.registeredBroadcastStates.get(stateMetaInfoSnapshot2.getName()) == null) {
                            HeapBroadcastState heapBroadcastState = new HeapBroadcastState(registeredBroadcastStateBackendMetaInfo);
                            this.registeredBroadcastStates.put(heapBroadcastState.getStateMetaInfo().getName(), heapBroadcastState);
                        }
                    }
                    for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                        String key = entry.getKey();
                        PartitionableListState<?> partitionableListState2 = this.registeredOperatorStates.get(key);
                        if (partitionableListState2 == null) {
                            BackendWritableBroadcastState<?, ?> backendWritableBroadcastState = this.registeredBroadcastStates.get(key);
                            Preconditions.checkState(backendWritableBroadcastState != null, "Found state without corresponding meta info: " + key);
                            deserializeBroadcastStateValues(backendWritableBroadcastState, openInputStream, entry.getValue());
                        } else {
                            deserializeOperatorStateValues(partitionableListState2, openInputStream, entry.getValue());
                        }
                    }
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        RunnableFuture snapshot = this.snapshotStrategy.snapshot(j, j2, checkpointStreamFactory, checkpointOptions);
        this.snapshotStrategy.logSyncCompleted(checkpointStreamFactory, currentTimeMillis);
        return snapshot;
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException {
        Preconditions.checkNotNull(listStateDescriptor);
        String str = (String) Preconditions.checkNotNull(listStateDescriptor.getName());
        PartitionableListState<?> partitionableListState = this.accessedStatesByName.get(str);
        if (partitionableListState != null) {
            checkStateNameAndMode(partitionableListState.getStateMetaInfo().getName(), str, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode);
            return partitionableListState;
        }
        listStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer typeSerializer = (TypeSerializer) Preconditions.checkNotNull(listStateDescriptor.getElementSerializer());
        PartitionableListState<?> partitionableListState2 = this.registeredOperatorStates.get(str);
        if (null == partitionableListState2) {
            partitionableListState2 = new PartitionableListState<>((RegisteredOperatorStateBackendMetaInfo<?>) new RegisteredOperatorStateBackendMetaInfo(str, typeSerializer, mode));
            this.registeredOperatorStates.put(str, partitionableListState2);
        } else {
            checkStateNameAndMode(partitionableListState2.getStateMetaInfo().getName(), str, partitionableListState2.getStateMetaInfo().getAssignmentMode(), mode);
            RegisteredOperatorStateBackendMetaInfo<?> stateMetaInfo = partitionableListState2.getStateMetaInfo();
            if (stateMetaInfo.updatePartitionStateSerializer(typeSerializer.duplicate()).isIncompatible()) {
                throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
            }
            partitionableListState2.setStateMetaInfo(stateMetaInfo);
        }
        this.accessedStatesByName.put(str, partitionableListState2);
        return partitionableListState2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <S> void deserializeOperatorStateValues(PartitionableListState<S> partitionableListState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws IOException {
        long[] offsets;
        if (null == stateMetaInfo || null == (offsets = stateMetaInfo.getOffsets())) {
            return;
        }
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        TypeSerializer<S> partitionStateSerializer = partitionableListState.getStateMetaInfo().getPartitionStateSerializer();
        for (long j : offsets) {
            fSDataInputStream.seek(j);
            partitionableListState.add(partitionStateSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }

    private static <K, V> void deserializeBroadcastStateValues(BackendWritableBroadcastState<K, V> backendWritableBroadcastState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws Exception {
        long[] offsets;
        if (stateMetaInfo == null || (offsets = stateMetaInfo.getOffsets()) == null) {
            return;
        }
        TypeSerializer<K> keySerializer = backendWritableBroadcastState.getStateMetaInfo().getKeySerializer();
        TypeSerializer<V> valueSerializer = backendWritableBroadcastState.getStateMetaInfo().getValueSerializer();
        fSDataInputStream.seek(offsets[0]);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        int readInt = dataInputViewStreamWrapper.readInt();
        for (int i = 0; i < readInt; i++) {
            backendWritableBroadcastState.put(keySerializer.deserialize(dataInputViewStreamWrapper), valueSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }

    private static void checkStateNameAndMode(String str, String str2, OperatorStateHandle.Mode mode, OperatorStateHandle.Mode mode2) {
        Preconditions.checkState(str.equals(str2), "Incompatible state names. Was [" + str + "], registered with [" + str2 + "].");
        Preconditions.checkState(mode.equals(mode2), "Incompatible state assignment modes. Was [" + mode + "], registered with [" + mode2 + "].");
    }
}
