/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataOutputStream;
import java.util.HashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.state.AbstractHeapKvState;
import org.apache.flink.runtime.state.filesystem.FsHeapKvStateSnapshot;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

public class FsHeapKvState<K, V>
extends AbstractHeapKvState<K, V, FsStateBackend> {
    private final FsStateBackend backend;

    public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, FsStateBackend backend) {
        super(keySerializer, valueSerializer, defaultValue);
        this.backend = backend;
    }

    public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
        super(keySerializer, valueSerializer, defaultValue, state);
        this.backend = backend;
    }

    public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
        try (FsStateBackend.FsCheckpointStateOutputStream out = this.backend.createCheckpointStateOutputStream(checkpointId, timestamp);){
            OutputViewDataOutputStreamWrapper outView = new OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
            outView.writeInt(this.size());
            this.writeStateToOutputView((DataOutputView)outView);
            outView.flush();
            FsHeapKvStateSnapshot fsHeapKvStateSnapshot = new FsHeapKvStateSnapshot(this.getKeySerializer(), this.getValueSerializer(), out.closeAndGetPath());
            return fsHeapKvStateSnapshot;
        }
    }
}

