package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBMapState.class */
public class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>> implements InternalMapState<K, N, UK, UV> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
    private final TypeSerializer<UK> userKeySerializer;
    private final TypeSerializer<UV> userValueSerializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBMapState$RocksDBMapEntry.class */
    public class RocksDBMapEntry implements Map.Entry<UK, UV> {
        private final RocksDB db;
        private final byte[] rawKeyBytes;
        private byte[] rawValueBytes;
        private boolean deleted = false;
        private UK userKey;
        private UV userValue;
        private final int userKeyOffset;
        private TypeSerializer<UK> keySerializer;
        private TypeSerializer<UV> valueSerializer;

        RocksDBMapEntry(@Nonnull RocksDB rocksDB, @Nonnegative int i, @Nonnull byte[] bArr, @Nonnull byte[] bArr2, @Nonnull TypeSerializer<UK> typeSerializer, @Nonnull TypeSerializer<UV> typeSerializer2) {
            this.db = rocksDB;
            this.userKeyOffset = i;
            this.keySerializer = typeSerializer;
            this.valueSerializer = typeSerializer2;
            this.rawKeyBytes = bArr;
            this.rawValueBytes = bArr2;
        }

        public void remove() {
            this.deleted = true;
            this.rawValueBytes = null;
            try {
                this.db.delete(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes);
            } catch (RocksDBException e) {
                throw new FlinkRuntimeException("Error while removing data from RocksDB.", e);
            }
        }

        @Override // java.util.Map.Entry
        public UK getKey() {
            if (this.userKey == null) {
                try {
                    this.userKey = (UK) RocksDBMapState.this.deserializeUserKey(this.userKeyOffset, this.rawKeyBytes, this.keySerializer);
                } catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user key.", e);
                }
            }
            return this.userKey;
        }

        @Override // java.util.Map.Entry
        public UV getValue() {
            if (this.deleted) {
                return null;
            }
            if (this.userValue == null) {
                try {
                    this.userValue = (UV) RocksDBMapState.this.deserializeUserValue(this.rawValueBytes, this.valueSerializer);
                } catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user value.", e);
                }
            }
            return this.userValue;
        }

        @Override // java.util.Map.Entry
        public UV setValue(UV uv) {
            if (this.deleted) {
                throw new IllegalStateException("The value has already been deleted.");
            }
            UV uv2 = (UV) getValue();
            try {
                this.userValue = uv;
                this.rawValueBytes = RocksDBMapState.this.serializeUserValue(uv, this.valueSerializer);
                this.db.put(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes, this.rawValueBytes);
                return uv2;
            } catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while putting data into RocksDB.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBMapState$RocksDBMapIterator.class */
    public abstract class RocksDBMapIterator<T> implements Iterator<T> {
        private static final int CACHE_SIZE_LIMIT = 128;
        private final RocksDB db;

        @Nonnull
        private final byte[] keyPrefixBytes;
        private RocksDBMapState<K, N, UK, UV>.RocksDBMapEntry currentEntry;
        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;
        private boolean expired = false;
        private ArrayList<RocksDBMapState<K, N, UK, UV>.RocksDBMapEntry> cacheEntries = new ArrayList<>();
        private int cacheIndex = 0;

        RocksDBMapIterator(RocksDB rocksDB, byte[] bArr, TypeSerializer<UK> typeSerializer, TypeSerializer<UV> typeSerializer2) {
            this.db = rocksDB;
            this.keyPrefixBytes = bArr;
            this.keySerializer = typeSerializer;
            this.valueSerializer = typeSerializer2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            loadCache();
            return this.cacheIndex < this.cacheEntries.size();
        }

        @Override // java.util.Iterator
        public void remove() {
            if (this.currentEntry == null || ((RocksDBMapEntry) this.currentEntry).deleted) {
                throw new IllegalStateException("The remove operation must be called after a valid next operation.");
            }
            this.currentEntry.remove();
        }

        final RocksDBMapState<K, N, UK, UV>.RocksDBMapEntry nextEntry() {
            loadCache();
            if (this.cacheIndex == this.cacheEntries.size()) {
                if (this.expired) {
                    return null;
                }
                throw new IllegalStateException();
            }
            this.currentEntry = this.cacheEntries.get(this.cacheIndex);
            this.cacheIndex++;
            return this.currentEntry;
        }

        private void loadCache() {
            if (this.cacheIndex > this.cacheEntries.size()) {
                throw new IllegalStateException();
            }
            if (this.cacheIndex < this.cacheEntries.size() || this.expired) {
                return;
            }
            RocksIteratorWrapper rocksIterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, RocksDBMapState.this.columnFamily);
            Throwable th = null;
            try {
                byte[] bArr = this.currentEntry == null ? this.keyPrefixBytes : ((RocksDBMapEntry) this.currentEntry).rawKeyBytes;
                this.cacheEntries.clear();
                this.cacheIndex = 0;
                rocksIterator.seek(bArr);
                if (this.currentEntry != null && !((RocksDBMapEntry) this.currentEntry).deleted) {
                    rocksIterator.next();
                }
                while (rocksIterator.isValid() && RocksDBMapState.this.startWithKeyPrefix(this.keyPrefixBytes, rocksIterator.key())) {
                    if (this.cacheEntries.size() >= CACHE_SIZE_LIMIT) {
                        break;
                    }
                    this.cacheEntries.add(new RocksDBMapEntry(this.db, this.keyPrefixBytes.length, rocksIterator.key(), rocksIterator.value(), this.keySerializer, this.valueSerializer));
                    rocksIterator.next();
                }
                this.expired = true;
                if (rocksIterator != null) {
                    if (0 == 0) {
                        rocksIterator.close();
                        return;
                    }
                    try {
                        rocksIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (rocksIterator != null) {
                    if (0 != 0) {
                        try {
                            rocksIterator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        rocksIterator.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBMapState$StateSnapshotTransformerWrapper.class */
    static class StateSnapshotTransformerWrapper implements StateSnapshotTransformer<byte[]> {
        private static final byte[] NULL_VALUE;
        private static final byte NON_NULL_VALUE_PREFIX;
        private final StateSnapshotTransformer<byte[]> elementTransformer;
        private final ByteArrayDataInputView div = new ByteArrayDataInputView();

        /* JADX INFO: Access modifiers changed from: package-private */
        public StateSnapshotTransformerWrapper(StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
            this.elementTransformer = stateSnapshotTransformer;
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] bArr) {
            if (bArr == null || isNull(bArr)) {
                return NULL_VALUE;
            }
            byte[] copyOfRange = Arrays.copyOfRange(bArr, 1, bArr.length);
            byte[] bArr2 = (byte[]) this.elementTransformer.filterOrTransform(copyOfRange);
            return bArr2 == null ? NULL_VALUE : bArr2 != copyOfRange ? prependWithNonNullByte(bArr2, bArr) : bArr;
        }

        private boolean isNull(byte[] bArr) {
            try {
                this.div.setData(bArr, 0, 1);
                return this.div.readBoolean();
            } catch (IOException e) {
                throw new FlinkRuntimeException("Failed to deserialize boolean flag of map user null value", e);
            }
        }

        private static byte[] prependWithNonNullByte(byte[] bArr, byte[] bArr2) {
            int length = 1 + bArr.length;
            byte[] bArr3 = bArr2.length == length ? bArr2 : new byte[length];
            bArr3[0] = NON_NULL_VALUE_PREFIX;
            System.arraycopy(bArr, 0, bArr3, 1, bArr.length);
            return bArr3;
        }

        static {
            ByteArrayDataOutputView byteArrayDataOutputView = new ByteArrayDataOutputView(1);
            try {
                byteArrayDataOutputView.writeBoolean(true);
                NULL_VALUE = byteArrayDataOutputView.toByteArray();
                byteArrayDataOutputView.reset();
                byteArrayDataOutputView.writeBoolean(false);
                NON_NULL_VALUE_PREFIX = byteArrayDataOutputView.toByteArray()[0];
            } catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize boolean flag of map user null value", e);
            }
        }
    }

    private RocksDBMapState(ColumnFamilyHandle columnFamilyHandle, TypeSerializer<N> typeSerializer, TypeSerializer<Map<UK, UV>> typeSerializer2, Map<UK, UV> map, RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
        super(columnFamilyHandle, typeSerializer, typeSerializer2, map, rocksDBKeyedStateBackend);
        Preconditions.checkState(typeSerializer2 instanceof MapSerializer, "Unexpected serializer type.");
        MapSerializer mapSerializer = (MapSerializer) typeSerializer2;
        this.userKeySerializer = mapSerializer.getKeySerializer();
        this.userValueSerializer = mapSerializer.getValueSerializer();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return (TypeSerializer<Map<UK, UV>>) this.valueSerializer;
    }

    public UV get(UK uk) throws IOException, RocksDBException {
        byte[] bArr = this.backend.db.get(this.columnFamily, serializeUserKeyWithCurrentKeyAndNamespace(uk));
        if (bArr == null) {
            return null;
        }
        return deserializeUserValue(bArr);
    }

    public void put(UK uk, UV uv) throws IOException, RocksDBException {
        this.backend.db.put(this.columnFamily, this.writeOptions, serializeUserKeyWithCurrentKeyAndNamespace(uk), serializeUserValue(uv));
    }

    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }
        RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(this.backend.db, this.writeOptions);
        Throwable th = null;
        try {
            try {
                for (Map.Entry<UK, UV> entry : map.entrySet()) {
                    rocksDBWriteBatchWrapper.put(this.columnFamily, serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey()), serializeUserValue(entry.getValue()));
                }
                if (rocksDBWriteBatchWrapper != null) {
                    if (0 == 0) {
                        rocksDBWriteBatchWrapper.close();
                        return;
                    }
                    try {
                        rocksDBWriteBatchWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (rocksDBWriteBatchWrapper != null) {
                if (th != null) {
                    try {
                        rocksDBWriteBatchWrapper.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    rocksDBWriteBatchWrapper.close();
                }
            }
            throw th4;
        }
    }

    public void remove(UK uk) throws IOException, RocksDBException {
        this.backend.db.delete(this.columnFamily, this.writeOptions, serializeUserKeyWithCurrentKeyAndNamespace(uk));
    }

    public boolean contains(UK uk) throws IOException, RocksDBException {
        return this.backend.db.get(this.columnFamily, serializeUserKeyWithCurrentKeyAndNamespace(uk)) != null;
    }

    public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
        Iterator<Map.Entry<UK, UV>> it = iterator();
        if (it.hasNext()) {
            return () -> {
                return it;
            };
        }
        return null;
    }

    public Iterable<UK> keys() throws IOException {
        byte[] serializeCurrentKeyAndNamespace = serializeCurrentKeyAndNamespace();
        return () -> {
            return new RocksDBMapState<K, N, UK, UV>.RocksDBMapIterator<UK>(this.backend.db, serializeCurrentKeyAndNamespace, this.userKeySerializer, this.userValueSerializer) { // from class: org.apache.flink.contrib.streaming.state.RocksDBMapState.1
                @Override // java.util.Iterator
                public UK next() {
                    RocksDBMapState<K, N, UK, UV>.RocksDBMapEntry nextEntry = nextEntry();
                    if (nextEntry == null) {
                        return null;
                    }
                    return nextEntry.getKey();
                }
            };
        };
    }

    public Iterable<UV> values() throws IOException {
        byte[] serializeCurrentKeyAndNamespace = serializeCurrentKeyAndNamespace();
        return () -> {
            return new RocksDBMapState<K, N, UK, UV>.RocksDBMapIterator<UV>(this.backend.db, serializeCurrentKeyAndNamespace, this.userKeySerializer, this.userValueSerializer) { // from class: org.apache.flink.contrib.streaming.state.RocksDBMapState.2
                @Override // java.util.Iterator
                public UV next() {
                    RocksDBMapState<K, N, UK, UV>.RocksDBMapEntry nextEntry = nextEntry();
                    if (nextEntry == null) {
                        return null;
                    }
                    return nextEntry.getValue();
                }
            };
        };
    }

    public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
        return new RocksDBMapState<K, N, UK, UV>.RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, serializeCurrentKeyAndNamespace(), this.userKeySerializer, this.userValueSerializer) { // from class: org.apache.flink.contrib.streaming.state.RocksDBMapState.3
            @Override // java.util.Iterator
            public Map.Entry<UK, UV> next() {
                return nextEntry();
            }
        };
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.flink.contrib.streaming.state.AbstractRocksDBState
    public void clear() {
        try {
            RocksIteratorWrapper rocksIterator = RocksDBKeyedStateBackend.getRocksIterator(this.backend.db, this.columnFamily);
            Throwable th = null;
            try {
                WriteBatch writeBatch = new WriteBatch(128);
                Throwable th2 = null;
                try {
                    try {
                        byte[] serializeCurrentKeyAndNamespace = serializeCurrentKeyAndNamespace();
                        rocksIterator.seek(serializeCurrentKeyAndNamespace);
                        while (rocksIterator.isValid()) {
                            byte[] key = rocksIterator.key();
                            if (!startWithKeyPrefix(serializeCurrentKeyAndNamespace, key)) {
                                break;
                            }
                            writeBatch.remove(this.columnFamily, key);
                            rocksIterator.next();
                        }
                        this.backend.db.write(this.writeOptions, writeBatch);
                        if (writeBatch != null) {
                            if (0 != 0) {
                                try {
                                    writeBatch.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                writeBatch.close();
                            }
                        }
                        if (rocksIterator != null) {
                            if (0 != 0) {
                                try {
                                    rocksIterator.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                rocksIterator.close();
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (writeBatch != null) {
                        if (th2 != null) {
                            try {
                                writeBatch.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            writeBatch.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (rocksIterator != null) {
                    if (0 != 0) {
                        try {
                            rocksIterator.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        rocksIterator.close();
                    }
                }
                throw th8;
            }
        } catch (Exception e) {
            LOG.warn("Error while cleaning the state.", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.contrib.streaming.state.AbstractRocksDBState
    public byte[] getSerializedValue(byte[] bArr, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, TypeSerializer<Map<UK, UV>> typeSerializer3) throws Exception {
        Preconditions.checkNotNull(bArr);
        Preconditions.checkNotNull(typeSerializer);
        Preconditions.checkNotNull(typeSerializer2);
        Preconditions.checkNotNull(typeSerializer3);
        Tuple2 deserializeKeyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(bArr, typeSerializer, typeSerializer2);
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(deserializeKeyAndNamespace.f0, this.backend.getNumberOfKeyGroups());
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos(128);
        writeKeyWithGroupAndNamespace(assignToKeyGroup, deserializeKeyAndNamespace.f0, typeSerializer, deserializeKeyAndNamespace.f1, typeSerializer2, byteArrayOutputStreamWithPos, new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos));
        byte[] byteArray = byteArrayOutputStreamWithPos.toByteArray();
        MapSerializer mapSerializer = (MapSerializer) typeSerializer3;
        TypeSerializer keySerializer = mapSerializer.getKeySerializer();
        TypeSerializer valueSerializer = mapSerializer.getValueSerializer();
        RocksDBMapState<K, N, UK, UV>.RocksDBMapIterator<Map.Entry<UK, UV>> rocksDBMapIterator = new RocksDBMapState<K, N, UK, UV>.RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, byteArray, keySerializer, valueSerializer) { // from class: org.apache.flink.contrib.streaming.state.RocksDBMapState.4
            @Override // java.util.Iterator
            public Map.Entry<UK, UV> next() {
                return nextEntry();
            }
        };
        if (rocksDBMapIterator.hasNext()) {
            return KvStateSerializer.serializeMap(() -> {
                return rocksDBMapIterator;
            }, keySerializer, valueSerializer);
        }
        return null;
    }

    private byte[] serializeCurrentKeyAndNamespace() throws IOException {
        writeCurrentKeyWithGroupAndNamespace();
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK uk) throws IOException {
        serializeCurrentKeyAndNamespace();
        this.userKeySerializer.serialize(uk, this.keySerializationDataOutputView);
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserValue(UV uv) throws IOException {
        return serializeUserValue(uv, this.userValueSerializer);
    }

    private UV deserializeUserValue(byte[] bArr) throws IOException {
        return deserializeUserValue(bArr, this.userValueSerializer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] serializeUserValue(UV uv, TypeSerializer<UV> typeSerializer) throws IOException {
        this.keySerializationStream.reset();
        if (uv == null) {
            this.keySerializationDataOutputView.writeBoolean(true);
        } else {
            this.keySerializationDataOutputView.writeBoolean(false);
            typeSerializer.serialize(uv, this.keySerializationDataOutputView);
        }
        return this.keySerializationStream.toByteArray();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public UK deserializeUserKey(int i, byte[] bArr, TypeSerializer<UK> typeSerializer) throws IOException {
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(bArr));
        dataInputViewStreamWrapper.skipBytes(i);
        return (UK) typeSerializer.deserialize(dataInputViewStreamWrapper);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public UV deserializeUserValue(byte[] bArr, TypeSerializer<UV> typeSerializer) throws IOException {
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(bArr));
        if (dataInputViewStreamWrapper.readBoolean()) {
            return null;
        }
        return (UV) typeSerializer.deserialize(dataInputViewStreamWrapper);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startWithKeyPrefix(byte[] bArr, byte[] bArr2) {
        if (bArr2.length < bArr.length) {
            return false;
        }
        int length = bArr.length;
        do {
            length--;
            if (length < this.backend.getKeyGroupPrefixBytes()) {
                return true;
            }
        } while (bArr2[length] == bArr[length]);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Incorrect return type in method signature: <UK:Ljava/lang/Object;UV:Ljava/lang/Object;K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/api/java/tuple/Tuple2<Lorg/rocksdb/ColumnFamilyHandle;Lorg/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo<TN;TSV;>;>;Lorg/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend<TK;>;)TIS; */
    public static State create(StateDescriptor stateDescriptor, Tuple2 tuple2, RocksDBKeyedStateBackend rocksDBKeyedStateBackend) {
        return new RocksDBMapState((ColumnFamilyHandle) tuple2.f0, ((RegisteredKeyValueStateBackendMetaInfo) tuple2.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo) tuple2.f1).getStateSerializer(), (Map) stateDescriptor.getDefaultValue(), rocksDBKeyedStateBackend);
    }
}
