package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.class */
public class HeapSnapshotStrategy<K> extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
    private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
    private final StreamCompressionDecorator keyGroupCompressionDecorator;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final KeyGroupRange keyGroupRange;
    private final CloseableRegistry cancelStreamRegistry;
    private final StateSerializerProvider<K> keySerializerProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeapSnapshotStrategy(SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityBehavior, Map<String, StateTable<K, ?, ?>> map, Map<String, HeapPriorityQueueSnapshotRestoreWrapper> map2, StreamCompressionDecorator streamCompressionDecorator, LocalRecoveryConfig localRecoveryConfig, KeyGroupRange keyGroupRange, CloseableRegistry closeableRegistry, StateSerializerProvider<K> stateSerializerProvider) {
        super("Heap backend snapshot");
        this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityBehavior;
        this.registeredKVStates = map;
        this.registeredPQStates = map2;
        this.keyGroupCompressionDecorator = streamCompressionDecorator;
        this.localRecoveryConfig = localRecoveryConfig;
        this.keyGroupRange = keyGroupRange;
        this.cancelStreamRegistry = closeableRegistry;
        this.keySerializerProvider = stateSerializerProvider;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull final CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
        if (!hasRegisteredState()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        int size = this.registeredKVStates.size() + this.registeredPQStates.size();
        Preconditions.checkState(size <= 32767, "Too many states: " + size + ". Currently at most 32767 states are supported");
        ArrayList arrayList = new ArrayList(size);
        final HashMap hashMap = new HashMap(size);
        final HashMap hashMap2 = new HashMap(size);
        processSnapshotMetaInfoForAllStates(arrayList, hashMap2, hashMap, this.registeredKVStates, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
        processSnapshotMetaInfoForAllStates(arrayList, hashMap2, hashMap, this.registeredPQStates, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        final KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(getKeySerializer(), arrayList, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.keyGroupCompressionDecorator));
        final SupplierWithException supplierWithException = (!this.localRecoveryConfig.isLocalRecoveryEnabled() || checkpointOptions.getCheckpointType().isSavepoint()) ? () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        } : () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider());
        };
        AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>>.AsyncSnapshotTask asyncSnapshotFutureTask = new AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>>() { // from class: org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v7, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            public SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
                CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = (CheckpointStreamWithResultProvider) supplierWithException.get();
                this.snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
                ?? checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
                DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream) checkpointOutputStream);
                keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
                long[] jArr = new long[HeapSnapshotStrategy.this.keyGroupRange.getNumberOfKeyGroups()];
                for (int i = 0; i < HeapSnapshotStrategy.this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                    int keyGroupId = HeapSnapshotStrategy.this.keyGroupRange.getKeyGroupId(i);
                    jArr[i] = checkpointOutputStream.getPos();
                    dataOutputViewStreamWrapper.writeInt(keyGroupId);
                    for (Map.Entry entry : hashMap2.entrySet()) {
                        StateSnapshot.StateKeyGroupWriter keyGroupWriter = ((StateSnapshot) entry.getValue()).getKeyGroupWriter();
                        OutputStream decorateWithCompression = HeapSnapshotStrategy.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream) checkpointOutputStream);
                        Throwable th = null;
                        try {
                            try {
                                DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(decorateWithCompression);
                                dataOutputViewStreamWrapper2.writeShort(((Integer) hashMap.get(entry.getKey())).intValue());
                                keyGroupWriter.writeStateInKeyGroup(dataOutputViewStreamWrapper2, keyGroupId);
                                if (decorateWithCompression != null) {
                                    if (0 != 0) {
                                        try {
                                            decorateWithCompression.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        decorateWithCompression.close();
                                    }
                                }
                            } catch (Throwable th3) {
                                if (decorateWithCompression != null) {
                                    if (th != null) {
                                        try {
                                            decorateWithCompression.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        decorateWithCompression.close();
                                    }
                                }
                                throw th3;
                            }
                        } finally {
                        }
                    }
                }
                if (!this.snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
                    throw new IOException("Stream already unregistered.");
                }
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), new KeyGroupRangeOffsets(HeapSnapshotStrategy.this.keyGroupRange, jArr));
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void cleanupProvidedResources() {
                Iterator it = hashMap2.values().iterator();
                while (it.hasNext()) {
                    ((StateSnapshot) it.next()).release();
                }
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void logAsyncSnapshotComplete(long j3) {
                if (HeapSnapshotStrategy.this.snapshotStrategySynchronicityTrait.isAsynchronous()) {
                    HeapSnapshotStrategy.this.logAsyncCompleted(checkpointStreamFactory, j3);
                }
            }
        }.toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
        finalizeSnapshotBeforeReturnHook(asyncSnapshotFutureTask);
        return asyncSnapshotFutureTask;
    }

    @Override // org.apache.flink.runtime.state.heap.SnapshotStrategySynchronicityBehavior
    public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
        this.snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
    }

    @Override // org.apache.flink.runtime.state.heap.SnapshotStrategySynchronicityBehavior
    public boolean isAsynchronous() {
        return this.snapshotStrategySynchronicityTrait.isAsynchronous();
    }

    @Override // org.apache.flink.runtime.state.heap.SnapshotStrategySynchronicityBehavior
    public <N, V> StateTable<K, N, V> newStateTable(InternalKeyContext<K> internalKeyContext, RegisteredKeyValueStateBackendMetaInfo<N, V> registeredKeyValueStateBackendMetaInfo, TypeSerializer<K> typeSerializer) {
        return this.snapshotStrategySynchronicityTrait.newStateTable(internalKeyContext, registeredKeyValueStateBackendMetaInfo, typeSerializer);
    }

    private void processSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> list, Map<StateUID, StateSnapshot> map, Map<StateUID, Integer> map2, Map<String, ? extends StateSnapshotRestore> map3, StateMetaInfoSnapshot.BackendStateType backendStateType) {
        for (Map.Entry<String, ? extends StateSnapshotRestore> entry : map3.entrySet()) {
            StateUID of = StateUID.of(entry.getKey(), backendStateType);
            map2.put(of, Integer.valueOf(map2.size()));
            StateSnapshotRestore value = entry.getValue();
            if (null != value) {
                StateSnapshot stateSnapshot = value.stateSnapshot();
                list.add(stateSnapshot.getMetaInfoSnapshot());
                map.put(of, stateSnapshot);
            }
        }
    }

    private boolean hasRegisteredState() {
        return (this.registeredKVStates.isEmpty() && this.registeredPQStates.isEmpty()) ? false : true;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializerProvider.currentSchemaSerializer();
    }
}
