/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyOnWriteStateTable<K, N, S>
extends StateTable<K, N, S>
implements Iterable<StateEntry<K, N, S>> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private static final int MINIMUM_CAPACITY = 4;
    private static final int MAXIMUM_CAPACITY = 0x40000000;
    public static final int DEFAULT_CAPACITY = 1024;
    private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
    private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[2];
    private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<Object, Object, Object>(new Object(), new Object(), new Object(), 0, null, 0, 0);
    private final TreeSet<Integer> snapshotVersions;
    private StateTableEntry<K, N, S>[] primaryTable = EMPTY_TABLE;
    private StateTableEntry<K, N, S>[] incrementalRehashTable = EMPTY_TABLE;
    private int primaryTableSize = 0;
    private int incrementalRehashTableSize = 0;
    private int rehashIndex = 0;
    private int stateTableVersion = 0;
    private int highestRequiredSnapshotVersion = 0;
    private N lastNamespace;
    private int threshold;
    private int modCount;

    CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
        this(keyContext, metaInfo, 1024);
    }

    private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo, int capacity) {
        super(keyContext, metaInfo);
        this.snapshotVersions = new TreeSet();
        if (capacity < 0) {
            throw new IllegalArgumentException("Capacity: " + capacity);
        }
        if (capacity == 0) {
            this.threshold = -1;
            return;
        }
        capacity = capacity < 4 ? 4 : (capacity > 0x40000000 ? 0x40000000 : MathUtils.roundUpToPowerOfTwo((int)capacity));
        this.primaryTable = this.makeTable(capacity);
    }

    @Override
    public int size() {
        return this.primaryTableSize + this.incrementalRehashTableSize;
    }

    @Override
    public S get(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        int requiredVersion = this.highestRequiredSnapshotVersion;
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                if (e.stateVersion < requiredVersion) {
                    if (e.entryVersion < requiredVersion) {
                        e = this.handleChainedEntryCopyOnWrite(tab, hash & tab.length - 1, e);
                    }
                    e.stateVersion = this.stateTableVersion;
                    e.state = this.getStateSerializer().copy(e.state);
                }
                return e.state;
            }
            e = e.next;
        }
        return null;
    }

    @Override
    public Stream<K> getKeys(N namespace) {
        return StreamSupport.stream(this.spliterator(), false).filter(entry -> entry.getNamespace().equals(namespace)).map(StateEntry::getKey);
    }

    @Override
    public void put(K key, int keyGroup, N namespace, S state) {
        this.put(key, namespace, state);
    }

    @Override
    public S get(N namespace) {
        return this.get(this.keyContext.getCurrentKey(), namespace);
    }

    @Override
    public boolean containsKey(N namespace) {
        return this.containsKey(this.keyContext.getCurrentKey(), namespace);
    }

    @Override
    public void put(N namespace, S state) {
        this.put(this.keyContext.getCurrentKey(), namespace, state);
    }

    @Override
    public S putAndGetOld(N namespace, S state) {
        return this.putAndGetOld(this.keyContext.getCurrentKey(), namespace, state);
    }

    @Override
    public void remove(N namespace) {
        this.remove(this.keyContext.getCurrentKey(), namespace);
    }

    @Override
    public S removeAndGetOld(N namespace) {
        return this.removeAndGetOld(this.keyContext.getCurrentKey(), namespace);
    }

    @Override
    public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        this.transform(this.keyContext.getCurrentKey(), namespace, value, transformation);
    }

    boolean containsKey(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                return true;
            }
            e = e.next;
        }
        return false;
    }

    void put(K key, N namespace, S value) {
        StateTableEntry<K, N, S> e = this.putEntry(key, namespace);
        e.state = value;
        e.stateVersion = this.stateTableVersion;
    }

    S putAndGetOld(K key, N namespace, S value) {
        StateTableEntry<K, N, S> e = this.putEntry(key, namespace);
        Object oldState = e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : e.state;
        e.state = value;
        e.stateVersion = this.stateTableVersion;
        return oldState;
    }

    void remove(K key, N namespace) {
        this.removeEntry(key, namespace);
    }

    S removeAndGetOld(K key, N namespace) {
        StateTableEntry<K, N, S> e = this.removeEntry(key, namespace);
        return (S)(e != null ? (e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : (Object)e.state) : null);
    }

    <T> void transform(K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        StateTableEntry<K, N, S> entry = this.putEntry(key, namespace);
        entry.state = transformation.apply(entry.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(entry.state) : entry.state, value);
        entry.stateVersion = this.stateTableVersion;
    }

    private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (e.entryVersion < this.highestRequiredSnapshotVersion) {
                    e = this.handleChainedEntryCopyOnWrite(tab, index, e);
                }
                return e;
            }
            e = e.next;
        }
        ++this.modCount;
        if (this.size() > this.threshold) {
            this.doubleCapacity();
        }
        return this.addNewStateTableEntry(tab, key, namespace, hash);
    }

    private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        StateTableEntry<K, N, S> prev = null;
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (prev == null) {
                    tab[index] = e.next;
                } else {
                    if (prev.entryVersion < this.highestRequiredSnapshotVersion) {
                        prev = this.handleChainedEntryCopyOnWrite(tab, index, prev);
                    }
                    prev.next = e.next;
                }
                ++this.modCount;
                if (tab == this.primaryTable) {
                    --this.primaryTableSize;
                } else {
                    --this.incrementalRehashTableSize;
                }
                return e;
            }
            prev = e;
            e = e.next;
        }
        return null;
    }

    private void checkKeyNamespacePreconditions(K key, N namespace) {
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        Preconditions.checkNotNull(namespace, (String)"Provided namespace is null.");
    }

    @Override
    public TypeSerializer<S> getStateSerializer() {
        return this.metaInfo.getStateSerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return this.metaInfo.getNamespaceSerializer();
    }

    @Override
    public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
        return this.metaInfo;
    }

    @Override
    public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
        this.metaInfo = metaInfo;
    }

    @Override
    @Nonnull
    public Iterator<StateEntry<K, N, S>> iterator() {
        return new StateEntryIterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void releaseSnapshot(int snapshotVersion) {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            Preconditions.checkState((boolean)this.snapshotVersions.remove(snapshotVersion), (Object)"Attempt to release unknown snapshot version");
            this.highestRequiredSnapshotVersion = this.snapshotVersions.isEmpty() ? 0 : this.snapshotVersions.last();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    StateTableEntry<K, N, S>[] snapshotTableArrays() {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            if (++this.stateTableVersion < 0) {
                throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
            }
            this.highestRequiredSnapshotVersion = this.stateTableVersion;
            this.snapshotVersions.add(this.highestRequiredSnapshotVersion);
        }
        StateTableEntry<K, N, S>[] table = this.primaryTable;
        int totalTableIndexSize = this.rehashIndex + table.length;
        int copiedArraySize = Math.max(totalTableIndexSize, this.size());
        StateTableEntry[] copy = new StateTableEntry[copiedArraySize];
        if (this.isRehashing()) {
            int localRehashIndex = this.rehashIndex;
            int localCopyLength = table.length - localRehashIndex;
            System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
            table = this.incrementalRehashTable;
            System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
            System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
        } else {
            System.arraycopy(table, 0, copy, 0, table.length);
        }
        return copy;
    }

    private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
        if (newCapacity < 0x40000000) {
            this.threshold = (newCapacity >> 1) + (newCapacity >> 2);
        } else {
            if (this.size() > 0x7FFFFFF7) {
                throw new IllegalStateException("Maximum capacity of CopyOnWriteStateTable is reached and the job cannot continue. Please consider scaling-out your job or using a different keyed state backend implementation!");
            }
            LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead to more collisions and lower performance. Please consider scaling-out your job or using a different keyed state backend implementation!");
            this.threshold = 0x7FFFFFF7;
        }
        StateTableEntry[] newTable = new StateTableEntry[newCapacity];
        return newTable;
    }

    private StateTableEntry<K, N, S> addNewStateTableEntry(StateTableEntry<K, N, S>[] table, K key, N namespace, int hash) {
        if (namespace.equals(this.lastNamespace)) {
            namespace = this.lastNamespace;
        } else {
            this.lastNamespace = namespace;
        }
        int index = hash & table.length - 1;
        StateTableEntry<K, N, Object> newEntry = new StateTableEntry<K, N, Object>(key, namespace, null, hash, table[index], this.stateTableVersion, this.stateTableVersion);
        table[index] = newEntry;
        if (table == this.primaryTable) {
            ++this.primaryTableSize;
        } else {
            ++this.incrementalRehashTableSize;
        }
        return newEntry;
    }

    private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
        return (hashCode & this.primaryTable.length - 1) >= this.rehashIndex ? this.primaryTable : this.incrementalRehashTable;
    }

    private void doubleCapacity() {
        Preconditions.checkState((!this.isRehashing() ? 1 : 0) != 0, (Object)"There is already a rehash in progress.");
        StateTableEntry<K, N, S>[] oldTable = this.primaryTable;
        int oldCapacity = oldTable.length;
        if (oldCapacity == 0x40000000) {
            return;
        }
        this.incrementalRehashTable = this.makeTable(oldCapacity * 2);
    }

    @VisibleForTesting
    boolean isRehashing() {
        return EMPTY_TABLE != this.incrementalRehashTable;
    }

    private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        if (this.isRehashing()) {
            this.incrementalRehash();
        }
        return CopyOnWriteStateTable.compositeHash(key, namespace);
    }

    private void incrementalRehash() {
        StateTableEntry<K, N, S>[] oldTable = this.primaryTable;
        StateTableEntry<K, N, S>[] newTable = this.incrementalRehashTable;
        int oldCapacity = oldTable.length;
        int newMask = newTable.length - 1;
        int requiredVersion = this.highestRequiredSnapshotVersion;
        int rhIdx = this.rehashIndex;
        int transferred = 0;
        while (transferred < 4) {
            StateTableEntry<K, N, S> e = oldTable[rhIdx];
            while (e != null) {
                if (e.entryVersion < requiredVersion) {
                    e = new StateTableEntry<K, N, S>(e, this.stateTableVersion);
                }
                StateTableEntry n = e.next;
                int pos = e.hash & newMask;
                e.next = newTable[pos];
                newTable[pos] = e;
                e = n;
                ++transferred;
            }
            oldTable[rhIdx] = null;
            if (++rhIdx != oldCapacity) continue;
            this.primaryTable = newTable;
            this.incrementalRehashTable = EMPTY_TABLE;
            this.primaryTableSize += this.incrementalRehashTableSize;
            this.incrementalRehashTableSize = 0;
            this.rehashIndex = 0;
            return;
        }
        this.primaryTableSize -= transferred;
        this.incrementalRehashTableSize += transferred;
        this.rehashIndex = rhIdx;
    }

    private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(StateTableEntry<K, N, S>[] tab, int tableIdx, StateTableEntry<K, N, S> untilEntry) {
        StateTableEntry<K, N, S> copy;
        int required = this.highestRequiredSnapshotVersion;
        StateTableEntry<K, N, S> current = tab[tableIdx];
        if (current.entryVersion < required) {
            tab[tableIdx] = copy = new StateTableEntry<K, N, S>(current, this.stateTableVersion);
        } else {
            copy = current;
        }
        while (current != untilEntry) {
            current = current.next;
            if (current.entryVersion < required) {
                copy.next = new StateTableEntry<K, N, S>(current, this.stateTableVersion);
                copy = copy.next;
                continue;
            }
            copy = current;
        }
        return copy;
    }

    private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
        return ITERATOR_BOOTSTRAP_ENTRY;
    }

    private static int compositeHash(Object key, Object namespace) {
        return MathUtils.bitMix((int)(key.hashCode() ^ namespace.hashCode()));
    }

    int getStateTableVersion() {
        return this.stateTableVersion;
    }

    @Override
    @Nonnull
    public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
        return new CopyOnWriteStateTableSnapshot(this);
    }

    void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
        Preconditions.checkArgument((boolean)snapshotToRelease.isOwner(this), (Object)"Cannot release snapshot which is owned by a different state table.");
        this.releaseSnapshot(snapshotToRelease.getSnapshotVersion());
    }

    @Override
    public int sizeOfNamespace(Object namespace) {
        int count = 0;
        for (StateEntry<K, N, S> entry : this) {
            if (null == entry || !namespace.equals(entry.getNamespace())) continue;
            ++count;
        }
        return count;
    }

    @Override
    public InternalKvState.StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
        return new StateIncrementalVisitorImpl(recommendedMaxNumberOfReturnedRecords);
    }

    class StateIncrementalVisitorImpl
    implements InternalKvState.StateIncrementalVisitor<K, N, S> {
        private final StateEntryChainIterator chainIterator;
        private final Collection<StateEntry<K, N, S>> chainToReturn = new ArrayList(5);

        StateIncrementalVisitorImpl(int recommendedMaxNumberOfReturnedRecords) {
            this.chainIterator = new StateEntryChainIterator(recommendedMaxNumberOfReturnedRecords);
        }

        @Override
        public boolean hasNext() {
            return this.chainIterator.hasNext();
        }

        @Override
        public Collection<StateEntry<K, N, S>> nextEntries() {
            if (!this.hasNext()) {
                return null;
            }
            this.chainToReturn.clear();
            StateTableEntry nextEntry = this.chainIterator.next();
            while (nextEntry != null) {
                this.chainToReturn.add(nextEntry);
                nextEntry = nextEntry.next;
            }
            return this.chainToReturn;
        }

        @Override
        public void remove(StateEntry<K, N, S> stateEntry) {
            CopyOnWriteStateTable.this.remove(stateEntry.getKey(), stateEntry.getNamespace());
        }

        @Override
        public void update(StateEntry<K, N, S> stateEntry, S newValue) {
            CopyOnWriteStateTable.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue);
        }
    }

    class StateEntryIterator
    implements Iterator<StateEntry<K, N, S>> {
        private final StateEntryChainIterator chainIterator;
        private StateTableEntry<K, N, S> nextEntry;
        private final int expectedModCount;

        StateEntryIterator() {
            this.chainIterator = new StateEntryChainIterator();
            this.expectedModCount = CopyOnWriteStateTable.this.modCount;
            this.nextEntry = CopyOnWriteStateTable.getBootstrapEntry();
            this.advanceIterator();
        }

        @Override
        public boolean hasNext() {
            return this.nextEntry != null;
        }

        @Override
        public StateEntry<K, N, S> next() {
            if (CopyOnWriteStateTable.this.modCount != this.expectedModCount) {
                throw new ConcurrentModificationException();
            }
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.advanceIterator();
        }

        StateTableEntry<K, N, S> advanceIterator() {
            StateTableEntry entryToReturn = this.nextEntry;
            StateTableEntry next = this.nextEntry.next;
            if (next == null) {
                next = this.chainIterator.next();
            }
            this.nextEntry = next;
            return entryToReturn;
        }
    }

    class StateEntryChainIterator
    implements Iterator<StateTableEntry<K, N, S>> {
        StateTableEntry<K, N, S>[] activeTable;
        private int nextTablePosition;
        private final int maxTraversedTablePositions;

        StateEntryChainIterator() {
            this(Integer.MAX_VALUE);
        }

        StateEntryChainIterator(int maxTraversedTablePositions) {
            this.maxTraversedTablePositions = maxTraversedTablePositions;
            this.activeTable = CopyOnWriteStateTable.this.primaryTable;
            this.nextTablePosition = 0;
        }

        @Override
        public boolean hasNext() {
            return CopyOnWriteStateTable.this.size() > 0 && (this.nextTablePosition < this.activeTable.length || this.activeTable == CopyOnWriteStateTable.this.primaryTable);
        }

        @Override
        public StateTableEntry<K, N, S> next() {
            StateTableEntry next;
            while ((next = this.nextActiveTablePosition()) == null && this.nextTablePosition >= this.activeTable.length && this.activeTable != CopyOnWriteStateTable.this.incrementalRehashTable && this.activeTable == CopyOnWriteStateTable.this.primaryTable) {
                this.activeTable = CopyOnWriteStateTable.this.incrementalRehashTable;
                this.nextTablePosition = 0;
            }
            return next;
        }

        private StateTableEntry<K, N, S> nextActiveTablePosition() {
            StateTableEntry<K, N, S>[] tab = this.activeTable;
            for (int traversedPositions = 0; this.nextTablePosition < tab.length && traversedPositions < this.maxTraversedTablePositions; ++traversedPositions) {
                StateTableEntry next;
                if ((next = tab[this.nextTablePosition++]) == null) continue;
                return next;
            }
            return null;
        }
    }

    @VisibleForTesting
    protected static class StateTableEntry<K, N, S>
    implements StateEntry<K, N, S> {
        @Nonnull
        final K key;
        @Nonnull
        final N namespace;
        @Nullable
        S state;
        @Nullable
        StateTableEntry<K, N, S> next;
        int entryVersion;
        int stateVersion;
        final int hash;

        StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
            this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
        }

        StateTableEntry(@Nonnull K key, @Nonnull N namespace, @Nullable S state, int hash, @Nullable StateTableEntry<K, N, S> next, int entryVersion, int stateVersion) {
            this.key = key;
            this.namespace = namespace;
            this.hash = hash;
            this.next = next;
            this.entryVersion = entryVersion;
            this.state = state;
            this.stateVersion = stateVersion;
        }

        public final void setState(@Nullable S value, int mapVersion) {
            if (value != this.state) {
                this.state = value;
                this.stateVersion = mapVersion;
            }
        }

        @Override
        @Nonnull
        public K getKey() {
            return this.key;
        }

        @Override
        @Nonnull
        public N getNamespace() {
            return this.namespace;
        }

        @Override
        @Nullable
        public S getState() {
            return this.state;
        }

        public final boolean equals(Object o) {
            if (!(o instanceof StateTableEntry)) {
                return false;
            }
            StateEntry e = (StateEntry)o;
            return e.getKey().equals(this.key) && e.getNamespace().equals(this.namespace) && Objects.equals(e.getState(), this.state);
        }

        public final int hashCode() {
            return this.key.hashCode() ^ this.namespace.hashCode() ^ Objects.hashCode(this.state);
        }

        public final String toString() {
            return "(" + this.key + "|" + this.namespace + ")=" + this.state;
        }
    }
}

