/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.MemoryLRUCache;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class RocksDBStore<K, V>
implements KeyValueStore<K, V> {
    private static final int TTL_NOT_USED = -1;
    private static final int DEFAULT_UNENCODED_CACHE_SIZE = 1000;
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 0x2000000L;
    private static final long BLOCK_CACHE_SIZE = 0x6400000L;
    private static final long BLOCK_SIZE = 4096L;
    private static final int TTL_SECONDS = -1;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    private final String name;
    private final String parentDir;
    protected File dbDir;
    private StateSerdes<K, V> serdes;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private RocksDB db;
    private Options options;
    private WriteOptions wOptions;
    private FlushOptions fOptions;
    private boolean loggingEnabled = false;
    private int cacheSize = 1000;
    private Set<K> cacheDirtyKeys;
    private MemoryLRUCache<K, RocksDBCacheEntry> cache;
    private StoreChangeLogger<Bytes, byte[]> changeLogger;
    private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;

    public KeyValueStore<K, V> enableLogging() {
        this.loggingEnabled = true;
        return this;
    }

    public RocksDBStore<K, V> withCacheSize(int cacheSize) {
        this.cacheSize = cacheSize;
        return this;
    }

    public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
        this(name, DB_FILE_DIR, keySerde, valueSerde);
    }

    public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) {
        this.name = name;
        this.parentDir = parentDir;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(0x6400000L);
        tableConfig.setBlockSize(4096L);
        this.options = new Options();
        this.options.setTableFormatConfig((TableFormatConfig)tableConfig);
        this.options.setWriteBufferSize(0x2000000L);
        this.options.setCompressionType(COMPRESSION_TYPE);
        this.options.setCompactionStyle(COMPACTION_STYLE);
        this.options.setMaxWriteBufferNumber(3);
        this.options.setCreateIfMissing(true);
        this.options.setErrorIfExists(false);
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
    }

    public void openDB(ProcessorContext context) {
        this.serdes = new StateSerdes(this.name, (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.dbDir = new File(new File(context.stateDir(), this.parentDir), this.name);
        this.db = this.openDB(this.dbDir, this.options, -1);
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.openDB(context);
        StoreChangeLogger<Bytes, byte[]> storeChangeLogger = this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<Bytes, byte[]>(this.name, context, WindowStoreUtils.INNER_SERDES) : null;
        if (this.cacheSize > 0) {
            this.cache = new MemoryLRUCache(this.name, this.cacheSize, null, null).whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>(){

                @Override
                public void apply(K key, RocksDBCacheEntry entry) {
                    if (entry.isDirty) {
                        RocksDBStore.this.flushCache();
                    }
                }
            });
            this.cacheDirtyKeys = new HashSet<K>();
        } else {
            this.cache = null;
            this.cacheDirtyKeys = null;
        }
        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>(){

            @Override
            public byte[] get(Bytes key) {
                return RocksDBStore.this.getInternal(key.get());
            }
        };
        context.register(root, this.loggingEnabled, new StateRestoreCallback(){

            @Override
            public void restore(byte[] key, byte[] value) {
                RocksDBStore.this.putInternal(key, value);
            }
        });
    }

    private RocksDB openDB(File dir, Options options, int ttl) {
        try {
            if (ttl == -1) {
                dir.getParentFile().mkdirs();
                return RocksDB.open((Options)options, (String)dir.getAbsolutePath());
            }
            throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
        }
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public V get(K key) {
        if (this.cache != null) {
            RocksDBCacheEntry entry = this.cache.get(key);
            if (entry == null) {
                byte[] byteValue = this.getInternal(this.serdes.rawKey(key));
                if (byteValue == null) {
                    return null;
                }
                V value = this.serdes.valueFrom(byteValue);
                this.cache.put(key, new RocksDBCacheEntry(value));
                return value;
            }
            return entry.value;
        }
        byte[] byteValue = this.getInternal(this.serdes.rawKey(key));
        if (byteValue == null) {
            return null;
        }
        return this.serdes.valueFrom(byteValue);
    }

    private byte[] getInternal(byte[] rawKey) {
        try {
            return this.db.get(rawKey);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key " + this.serdes.keyFrom(rawKey) + " from store " + this.name, e);
        }
    }

    @Override
    public void put(K key, V value) {
        if (this.cache != null) {
            this.cacheDirtyKeys.add(key);
            this.cache.put(key, new RocksDBCacheEntry(value, true));
        } else {
            byte[] rawKey = this.serdes.rawKey(key);
            byte[] rawValue = this.serdes.rawValue(value);
            this.putInternal(rawKey, rawValue);
            if (this.loggingEnabled) {
                this.changeLogger.add(Bytes.wrap((byte[])rawKey));
                this.changeLogger.maybeLogChange(this.getter);
            }
        }
    }

    @Override
    public V putIfAbsent(K key, V value) {
        V originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    private void putInternal(byte[] rawKey, byte[] rawValue) {
        if (rawValue == null) {
            try {
                this.db.remove(this.wOptions, rawKey);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key " + this.serdes.keyFrom(rawKey) + " from store " + this.name, e);
            }
        }
        try {
            this.db.put(this.wOptions, rawKey, rawValue);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing put key " + this.serdes.keyFrom(rawKey) + " and value " + this.serdes.keyFrom(rawValue) + " from store " + this.name, e);
        }
    }

    @Override
    public void putAll(List<KeyValue<K, V>> entries) {
        for (KeyValue<K, V> entry : entries) {
            this.put(entry.key, entry.value);
        }
    }

    private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
        WriteBatch batch = new WriteBatch();
        try {
            for (KeyValue<byte[], byte[]> entry : entries) {
                batch.put((byte[])entry.key, (byte[])entry.value);
            }
            this.db.write(this.wOptions, batch);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
        finally {
            batch.dispose();
        }
    }

    @Override
    public V delete(K key) {
        V value = this.get(key);
        this.put(key, null);
        return value;
    }

    @Override
    public KeyValueIterator<K, V> range(K from, K to) {
        if (this.cache != null) {
            this.flushCache();
        }
        return new RocksDBRangeIterator<K, V>(this.db.newIterator(), this.serdes, from, to);
    }

    @Override
    public KeyValueIterator<K, V> all() {
        if (this.cache != null) {
            this.flushCache();
        }
        RocksIterator innerIter = this.db.newIterator();
        innerIter.seekToFirst();
        return new RocksDbIterator<K, V>(innerIter, this.serdes);
    }

    private void flushCache() {
        if (this.cache != null) {
            ArrayList<KeyValue<byte[], byte[]>> putBatch = new ArrayList<KeyValue<byte[], byte[]>>(this.cache.keys.size());
            ArrayList<byte[]> deleteBatch = new ArrayList<byte[]>(this.cache.keys.size());
            for (K k : this.cacheDirtyKeys) {
                RocksDBCacheEntry entry = this.cache.get(k);
                if (entry == null) continue;
                entry.isDirty = false;
                byte[] rawKey = this.serdes.rawKey(k);
                if (entry.value != null) {
                    putBatch.add(new KeyValue<byte[], byte[]>(rawKey, this.serdes.rawValue(entry.value)));
                    continue;
                }
                deleteBatch.add(rawKey);
            }
            this.putAllInternal(putBatch);
            if (this.loggingEnabled) {
                for (KeyValue keyValue : putBatch) {
                    this.changeLogger.add(Bytes.wrap((byte[])((byte[])keyValue.key)));
                }
            }
            for (byte[] byArray : deleteBatch) {
                try {
                    this.db.remove(this.wOptions, byArray);
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while deleting with key " + this.serdes.keyFrom(byArray) + " from store " + this.name, e);
                }
                if (!this.loggingEnabled) continue;
                this.changeLogger.delete(Bytes.wrap((byte[])byArray));
            }
            this.cacheDirtyKeys.clear();
        }
        if (this.loggingEnabled) {
            this.changeLogger.logChange(this.getter);
        }
    }

    @Override
    public void flush() {
        if (this.db == null) {
            return;
        }
        this.flushCache();
        this.flushInternal();
    }

    public void flushInternal() {
        try {
            this.db.flush(this.fOptions);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override
    public void close() {
        if (this.db == null) {
            return;
        }
        this.flush();
        this.options.dispose();
        this.wOptions.dispose();
        this.fOptions.dispose();
        this.db.close();
        this.options = null;
        this.wOptions = null;
        this.fOptions = null;
        this.db = null;
    }

    private static class RocksDBRangeIterator<K, V>
    extends RocksDbIterator<K, V> {
        private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
        private byte[] rawToKey;

        public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
            super(iter, serdes);
            iter.seek(serdes.rawKey(from));
            this.rawToKey = serdes.rawKey(to);
        }

        @Override
        public boolean hasNext() {
            return super.hasNext() && this.comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
        }
    }

    private static class RocksDbIterator<K, V>
    implements KeyValueIterator<K, V> {
        private final RocksIterator iter;
        private final StateSerdes<K, V> serdes;

        public RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
            this.iter = iter;
            this.serdes = serdes;
        }

        protected byte[] peekRawKey() {
            return this.iter.key();
        }

        protected KeyValue<K, V> getKeyValue() {
            return new KeyValue<K, V>(this.serdes.keyFrom(this.iter.key()), this.serdes.valueFrom(this.iter.value()));
        }

        @Override
        public boolean hasNext() {
            return this.iter.isValid();
        }

        @Override
        public KeyValue<K, V> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<K, V> entry = this.getKeyValue();
            this.iter.next();
            return entry;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
        }

        @Override
        public void close() {
            this.iter.dispose();
        }
    }

    private class RocksDBCacheEntry {
        public V value;
        public boolean isDirty;

        public RocksDBCacheEntry(V value) {
            this(value, false);
        }

        public RocksDBCacheEntry(V value, boolean isDirty) {
            this.value = value;
            this.isDirty = isDirty;
        }
    }
}

