package org.apache.flink.runtime.state.heap;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.beam.repackaged.beam_sdks_java_core.net.bytebuddy.asm.Advice;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.class */
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HeapKeyedStateBackend.class);
    private final Map<String, StateTable<K, ?, ?>> stateTables;
    private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapKeyedStateBackend<K>.HeapSnapshotStrategy snapshotStrategy;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$AsyncSnapshotStrategySynchronicityBehavior.class */
    private class AsyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
        private AsyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
            HeapKeyedStateBackend.LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", checkpointStreamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - j));
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return true;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo) {
            return new CopyOnWriteStateTable(HeapKeyedStateBackend.this, registeredKeyedBackendStateMetaInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$HeapSnapshotStrategy.class */
    public class HeapSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, SnapshotStrategySynchronicityBehavior<K> {
        private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;

        public HeapSnapshotStrategy(SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityBehavior) {
            this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityBehavior;
        }

        @Override // org.apache.flink.runtime.state.SnapshotStrategy
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long j, long j2, final CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
            if (!HeapKeyedStateBackend.this.hasRegisteredState()) {
                return DoneFuture.of(SnapshotResult.empty());
            }
            long currentTimeMillis = System.currentTimeMillis();
            Preconditions.checkState(HeapKeyedStateBackend.this.stateTables.size() <= 32767, "Too many KV-States: " + HeapKeyedStateBackend.this.stateTables.size() + ". Currently at most " + Advice.MethodSizeHandler.UNDEFINED_SIZE + " states are supported");
            ArrayList arrayList = new ArrayList(HeapKeyedStateBackend.this.stateTables.size());
            final HashMap hashMap = new HashMap(HeapKeyedStateBackend.this.stateTables.size());
            final HashMap hashMap2 = new HashMap(HeapKeyedStateBackend.this.stateTables.size());
            for (Map.Entry entry : HeapKeyedStateBackend.this.stateTables.entrySet()) {
                String str = (String) entry.getKey();
                hashMap.put(str, Integer.valueOf(hashMap.size()));
                StateTable stateTable = (StateTable) entry.getValue();
                if (null != stateTable) {
                    arrayList.add(stateTable.getMetaInfo().snapshot());
                    hashMap2.put(str, stateTable.createSnapshot());
                }
            }
            final KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(HeapKeyedStateBackend.this.keySerializer, arrayList, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapKeyedStateBackend.this.keyGroupCompressionDecorator));
            final SupplierWithException supplierWithException = HeapKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> {
                return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, HeapKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider());
            } : () -> {
                return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
            };
            AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { // from class: org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.HeapSnapshotStrategy.1
                CheckpointStreamWithResultProvider streamAndResultExtractor = null;

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void acquireResources() throws Exception {
                    this.streamAndResultExtractor = (CheckpointStreamWithResultProvider) supplierWithException.get();
                    HeapKeyedStateBackend.this.cancelStreamRegistry.registerCloseable(this.streamAndResultExtractor);
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void releaseResources() {
                    unregisterAndCloseStreamAndResultExtractor();
                    Iterator it = hashMap2.values().iterator();
                    while (it.hasNext()) {
                        ((StateTableSnapshot) it.next()).release();
                    }
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void stopOperation() {
                    unregisterAndCloseStreamAndResultExtractor();
                }

                private void unregisterAndCloseStreamAndResultExtractor() {
                    if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                        IOUtils.closeQuietly(this.streamAndResultExtractor);
                        this.streamAndResultExtractor = null;
                    }
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                @Nonnull
                public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(checkpointOutputStream);
                    keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
                    long[] jArr = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                    for (int i = 0; i < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                        int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(i);
                        jArr[i] = checkpointOutputStream.getPos();
                        dataOutputViewStreamWrapper.writeInt(keyGroupId);
                        for (Map.Entry entry2 : hashMap2.entrySet()) {
                            OutputStream decorateWithCompression = HeapKeyedStateBackend.this.keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
                            Throwable th = null;
                            try {
                                try {
                                    String str2 = (String) entry2.getKey();
                                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(decorateWithCompression);
                                    dataOutputViewStreamWrapper2.writeShort(((Integer) hashMap.get(str2)).intValue());
                                    ((StateTableSnapshot) entry2.getValue()).writeMappingsInKeyGroup(dataOutputViewStreamWrapper2, keyGroupId);
                                    if (decorateWithCompression != null) {
                                        if (0 != 0) {
                                            try {
                                                decorateWithCompression.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            decorateWithCompression.close();
                                        }
                                    }
                                } catch (Throwable th3) {
                                    if (decorateWithCompression != null) {
                                        if (th != null) {
                                            try {
                                                decorateWithCompression.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            decorateWithCompression.close();
                                        }
                                    }
                                    throw th3;
                                }
                            } finally {
                            }
                        }
                    }
                    if (!HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                        return SnapshotResult.empty();
                    }
                    KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, jArr);
                    SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                    this.streamAndResultExtractor = null;
                    HeapSnapshotStrategy.this.logOperationCompleted(checkpointStreamFactory, currentTimeMillis2);
                    return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(closeAndFinalizeCheckpointStreamResult, keyGroupRangeOffsets);
                }
            });
            finalizeSnapshotBeforeReturnHook(from);
            HeapKeyedStateBackend.LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
            return from;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            this.snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
            this.snapshotStrategySynchronicityTrait.logOperationCompleted(checkpointStreamFactory, j);
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return this.snapshotStrategySynchronicityTrait.isAsynchronous();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo) {
            return this.snapshotStrategySynchronicityTrait.newStateTable(registeredKeyedBackendStateMetaInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$SnapshotStrategySynchronicityBehavior.class */
    public interface SnapshotStrategySynchronicityBehavior<K> {
        default void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
        }

        default void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
        }

        boolean isAsynchronous();

        <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo);
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$SyncSnapshotStrategySynchronicityBehavior.class */
    private class SyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
        private SyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            runnable.run();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return false;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo) {
            return new NestedMapsStateTable(HeapKeyedStateBackend.this, registeredKeyedBackendStateMetaInfo);
        }
    }

    public HeapKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange, boolean z, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange, executionConfig);
        this.stateTables = new HashMap();
        this.localRecoveryConfig = (LocalRecoveryConfig) Preconditions.checkNotNull(localRecoveryConfig);
        this.snapshotStrategy = new HeapSnapshotStrategy(z ? new AsyncSnapshotStrategySynchronicityBehavior() : new SyncSnapshotStrategySynchronicityBehavior());
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.restoredKvStateMetaInfos = new HashMap();
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> typeSerializer, StateDescriptor<?, V> stateDescriptor) throws StateMigrationException {
        StateTable<K, ?, ?> stateTable = this.stateTables.get(stateDescriptor.getName());
        if (stateTable != null) {
            RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> snapshot = this.restoredKvStateMetaInfos.get(stateDescriptor.getName());
            Preconditions.checkState(snapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            stateTable.setMetaInfo(RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(snapshot, typeSerializer, stateDescriptor));
        } else {
            stateTable = this.snapshotStrategy.newStateTable(new RegisteredKeyedBackendStateMetaInfo<>(stateDescriptor.getType(), stateDescriptor.getName(), typeSerializer, stateDescriptor.getSerializer()));
            this.stateTables.put(stateDescriptor.getName(), stateTable);
        }
        return (StateTable<K, N, V>) stateTable;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N> Stream<K> getKeys(String str, N n) {
        return !this.stateTables.containsKey(str) ? Stream.empty() : this.stateTables.get(str).getKeys(n);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, V> InternalValueState<K, N, V> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, valueStateDescriptor);
        return new HeapValueState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), valueStateDescriptor.getDefaultValue());
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> InternalListState<K, N, T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, listStateDescriptor);
        return new HeapListState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), (List) listStateDescriptor.getDefaultValue());
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> InternalReducingState<K, N, T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, reducingStateDescriptor);
        return new HeapReducingState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), reducingStateDescriptor.getDefaultValue(), reducingStateDescriptor.getReduceFunction());
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(TypeSerializer<N> typeSerializer, AggregatingStateDescriptor<T, ACC, R> aggregatingStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, aggregatingStateDescriptor);
        return new HeapAggregatingState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), aggregatingStateDescriptor.getDefaultValue(), aggregatingStateDescriptor.getAggregateFunction());
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, foldingStateDescriptor);
        return new HeapFoldingState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), foldingStateDescriptor.getDefaultValue(), foldingStateDescriptor.getFoldFunction());
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(TypeSerializer<N> typeSerializer, MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception {
        StateTable<K, N, V> tryRegisterStateTable = tryRegisterStateTable(typeSerializer, mapStateDescriptor);
        return new HeapMapState(tryRegisterStateTable, this.keySerializer, tryRegisterStateTable.getStateSerializer(), tryRegisterStateTable.getNamespaceSerializer(), mapStateDescriptor.getDefaultValue());
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        return this.snapshotStrategy.performSnapshot(j, j2, checkpointStreamFactory, checkpointOptions);
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", collection);
        }
        restorePartitionedState(collection);
    }

    private void restorePartitionedState(Collection<KeyedStateHandle> collection) throws Exception {
        HashMap hashMap = new HashMap();
        int i = 0;
        this.stateTables.clear();
        boolean z = false;
        for (KeyedStateHandle keyedStateHandle : collection) {
            if (keyedStateHandle != null) {
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerCloseable(openInputStream);
                try {
                    DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader, true);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    if (!z) {
                        if (CompatibilityUtil.resolveCompatibilityResult(keyedBackendSerializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, keyedBackendSerializationProxy.getKeySerializerConfigSnapshot(), this.keySerializer).isRequiresMigration()) {
                            throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                        }
                        z = true;
                    }
                    List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = keyedBackendSerializationProxy.getStateMetaInfoSnapshots();
                    for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> snapshot : stateMetaInfoSnapshots) {
                        this.restoredKvStateMetaInfos.put(snapshot.getName(), snapshot);
                        if (null == this.stateTables.get(snapshot.getName())) {
                            this.stateTables.put(snapshot.getName(), this.snapshotStrategy.newStateTable(new RegisteredKeyedBackendStateMetaInfo(snapshot.getStateType(), snapshot.getName(), snapshot.getNamespaceSerializer(), snapshot.getStateSerializer())));
                            hashMap.put(Integer.valueOf(i), snapshot.getName());
                            i++;
                        }
                    }
                    StreamCompressionDecorator streamCompressionDecorator = keyedBackendSerializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                    Iterator<Tuple2<Integer, Long>> it = keyGroupsStateHandle.getGroupRangeOffsets().iterator();
                    while (it.hasNext()) {
                        Tuple2<Integer, Long> next = it.next();
                        int intValue = next.f0.intValue();
                        long longValue = next.f1.longValue();
                        Preconditions.checkState(this.keyGroupRange.contains(intValue), "The key group must belong to the backend.");
                        openInputStream.seek(longValue);
                        int readInt = dataInputViewStreamWrapper.readInt();
                        InputStream decorateWithCompression = streamCompressionDecorator.decorateWithCompression(openInputStream);
                        Throwable th = null;
                        try {
                            try {
                                DataInputViewStreamWrapper dataInputViewStreamWrapper2 = new DataInputViewStreamWrapper(decorateWithCompression);
                                Preconditions.checkState(readInt == intValue, "Unexpected key-group in restore.");
                                for (int i2 = 0; i2 < stateMetaInfoSnapshots.size(); i2++) {
                                    StateTableByKeyGroupReaders.readerForVersion(this.stateTables.get(hashMap.get(Integer.valueOf(dataInputViewStreamWrapper2.readShort()))), keyedBackendSerializationProxy.getReadVersion()).readMappingsInKeyGroup(dataInputViewStreamWrapper2, intValue);
                                }
                                if (decorateWithCompression != null) {
                                    if (0 != 0) {
                                        try {
                                            decorateWithCompression.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        decorateWithCompression.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    }
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                } catch (Throwable th3) {
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                    throw th3;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend, org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        Stream keys = getKeys(stateDescriptor.getName(), n);
        Throwable th = null;
        try {
            try {
                List list = (List) keys.collect(Collectors.toList());
                State partitionedState = getPartitionedState(n, typeSerializer, stateDescriptor);
                for (Object obj : list) {
                    setCurrentKey(obj);
                    keyedStateFunction.process(obj, partitionedState);
                }
                if (keys != null) {
                    if (0 == 0) {
                        keys.close();
                        return;
                    }
                    try {
                        keys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keys != null) {
                if (th != null) {
                    try {
                        keys.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keys.close();
                }
            }
            throw th4;
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    @VisibleForTesting
    public int numStateEntries() {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @VisibleForTesting
    public int numStateEntries(Object obj) {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().sizeOfNamespace(obj);
        }
        return i;
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public boolean supportsAsynchronousSnapshots() {
        return this.snapshotStrategy.isAsynchronous();
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }
}
