/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheKeyValueStoreIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class CachingKeyValueStore<K, V>
implements WrappedStateStore,
KeyValueStore<K, V>,
CachedStateStore<K, V> {
    private final KeyValueStore<Bytes, byte[]> underlying;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private CacheFlushListener<K, V> flushListener;
    private String cacheName;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;
    private Thread streamThread;

    CachingKeyValueStore(KeyValueStore<Bytes, byte[]> underlying, Serde<K> keySerde, Serde<V> valueSerde) {
        this.underlying = underlying;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @Override
    public String name() {
        return this.underlying.name();
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.underlying.init(context, root);
        this.initInternal(context);
        this.streamThread = Thread.currentThread();
    }

    void initInternal(final ProcessorContext context) {
        this.context = (InternalProcessorContext)context;
        this.serdes = new StateSerdes(ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.underlying.name()), (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.cacheName = context.taskId() + "-" + this.underlying.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.cacheName, new ThreadCache.DirtyEntryFlushListener(){

            @Override
            public void apply(List<ThreadCache.DirtyEntry> entries) {
                for (ThreadCache.DirtyEntry entry : entries) {
                    CachingKeyValueStore.this.putAndMaybeForward(entry, (InternalProcessorContext)context);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        RecordContext current = context.recordContext();
        try {
            context.setRecordContext(entry.recordContext());
            if (this.flushListener != null) {
                this.flushListener.apply(this.serdes.keyFrom(entry.key().get()), this.serdes.valueFrom(entry.newValue()), this.serdes.valueFrom((byte[])this.underlying.get(entry.key())));
            }
            this.underlying.put(entry.key(), entry.newValue());
        }
        finally {
            context.setRecordContext(current);
        }
    }

    @Override
    public void setFlushListener(CacheFlushListener<K, V> flushListener) {
        this.flushListener = flushListener;
    }

    @Override
    public synchronized void flush() {
        this.cache.flush(this.cacheName);
        this.underlying.flush();
    }

    @Override
    public void close() {
        this.flush();
        this.underlying.close();
        this.cache.close(this.cacheName);
    }

    @Override
    public boolean persistent() {
        return this.underlying.persistent();
    }

    @Override
    public boolean isOpen() {
        return this.underlying.isOpen();
    }

    @Override
    public synchronized V get(K key) {
        this.validateStoreOpen();
        if (key == null) {
            return null;
        }
        byte[] rawKey = this.serdes.rawKey(key);
        return this.get(rawKey);
    }

    private void validateStoreOpen() {
        if (!this.isOpen()) {
            throw new InvalidStateStoreException("Store " + this.name() + " is currently closed");
        }
    }

    @Override
    private V get(byte[] rawKey) {
        LRUCacheEntry entry = this.cache.get(this.cacheName, rawKey);
        if (entry == null) {
            byte[] rawValue = (byte[])this.underlying.get(Bytes.wrap((byte[])rawKey));
            if (rawValue == null) {
                return null;
            }
            if (Thread.currentThread().equals(this.streamThread)) {
                this.cache.put(this.cacheName, rawKey, new LRUCacheEntry(rawValue));
            }
            return this.serdes.valueFrom(rawValue);
        }
        if (entry.value == null) {
            return null;
        }
        return this.serdes.valueFrom(entry.value);
    }

    @Override
    public KeyValueIterator<K, V> range(K from, K to) {
        this.validateStoreOpen();
        byte[] origFrom = this.serdes.rawKey(from);
        byte[] origTo = this.serdes.rawKey(to);
        KeyValueIterator<Bytes, byte[]> storeIterator = this.underlying.range(Bytes.wrap((byte[])origFrom), Bytes.wrap((byte[])origTo));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.cacheName, origFrom, origTo);
        return new MergedSortedCacheKeyValueStoreIterator<K, V>((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, storeIterator, this.serdes);
    }

    @Override
    public KeyValueIterator<K, V> all() {
        this.validateStoreOpen();
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.name(), this.underlying.all());
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all(this.cacheName);
        return new MergedSortedCacheKeyValueStoreIterator<K, V>((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator, this.serdes);
    }

    @Override
    public synchronized long approximateNumEntries() {
        this.validateStoreOpen();
        return this.underlying.approximateNumEntries();
    }

    @Override
    public synchronized void put(K key, V value) {
        this.validateStoreOpen();
        this.put(this.serdes.rawKey(key), value);
    }

    @Override
    private synchronized void put(byte[] rawKey, V value) {
        byte[] rawValue = this.serdes.rawValue(value);
        this.cache.put(this.cacheName, rawKey, new LRUCacheEntry(rawValue, true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override
    public synchronized V putIfAbsent(K key, V value) {
        this.validateStoreOpen();
        byte[] rawKey = this.serdes.rawKey(key);
        V v = this.get(rawKey);
        if (v == null) {
            this.put(rawKey, value);
        }
        return v;
    }

    @Override
    public synchronized void putAll(List<KeyValue<K, V>> entries) {
        for (KeyValue<K, V> entry : entries) {
            this.put(entry.key, entry.value);
        }
    }

    @Override
    public synchronized V delete(K key) {
        this.validateStoreOpen();
        byte[] rawKey = this.serdes.rawKey(key);
        V v = this.get(rawKey);
        this.cache.delete(this.cacheName, this.serdes.rawKey(key));
        this.underlying.delete(Bytes.wrap((byte[])rawKey));
        return v;
    }

    KeyValueStore<Bytes, byte[]> underlying() {
        return this.underlying;
    }

    @Override
    public StateStore inner() {
        if (this.underlying instanceof WrappedStateStore) {
            return ((WrappedStateStore)((Object)this.underlying)).inner();
        }
        return this.underlying;
    }
}

