/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.FilteredCacheIterator;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreIterator;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedCacheFunction;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class CachingWindowStore<K, V>
extends WrappedStateStore.AbstractStateStore
implements WindowStore<K, V>,
CachedStateStore<Windowed<K>, V> {
    private final WindowStore<Bytes, byte[]> underlying;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final long windowSize;
    private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
    private String name;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;
    private CacheFlushListener<Windowed<K>, V> flushListener;
    private final SegmentedCacheFunction cacheFunction;

    CachingWindowStore(WindowStore<Bytes, byte[]> underlying, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, long segmentInterval) {
        super(underlying);
        this.underlying = underlying;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.windowSize = windowSize;
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, segmentInterval);
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.underlying.init(context, root);
        this.initInternal(context);
        this.keySchema.init(context.applicationId());
    }

    private void initInternal(final ProcessorContext context) {
        this.context = (InternalProcessorContext)context;
        this.serdes = new StateSerdes(ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.underlying.name()), (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.name = context.taskId() + "-" + this.underlying.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.name, new ThreadCache.DirtyEntryFlushListener(){

            @Override
            public void apply(List<ThreadCache.DirtyEntry> entries) {
                for (ThreadCache.DirtyEntry entry : entries) {
                    byte[] binaryWindowKey = CachingWindowStore.this.cacheFunction.key(entry.key()).get();
                    long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
                    Windowed windowedKey = new Windowed(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, CachingWindowStore.this.serdes), WindowStoreUtils.timeWindowForSize(timestamp, CachingWindowStore.this.windowSize));
                    Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
                    CachingWindowStore.this.maybeForward(entry, key, windowedKey, (InternalProcessorContext)context);
                    CachingWindowStore.this.underlying.put(key, entry.newValue(), timestamp);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void maybeForward(ThreadCache.DirtyEntry entry, Bytes key, Windowed<K> windowedKey, InternalProcessorContext context) {
        if (this.flushListener != null) {
            RecordContext current = context.recordContext();
            context.setRecordContext(entry.recordContext());
            try {
                this.flushListener.apply(windowedKey, this.serdes.valueFrom(entry.newValue()), this.fetchPrevious(key, windowedKey.window().start()));
            }
            finally {
                context.setRecordContext(current);
            }
        }
    }

    @Override
    public void setFlushListener(CacheFlushListener<Windowed<K>, V> flushListener) {
        this.flushListener = flushListener;
    }

    @Override
    public synchronized void flush() {
        this.cache.flush(this.name);
        this.underlying.flush();
    }

    @Override
    public void close() {
        this.flush();
        this.cache.close(this.name);
        this.underlying.close();
    }

    @Override
    public synchronized void put(K key, V value) {
        this.put(key, value, this.context.timestamp());
    }

    @Override
    public synchronized void put(K key, V value, long timestamp) {
        this.validateStoreOpen();
        Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, this.serdes);
        LRUCacheEntry entry = new LRUCacheEntry(this.serdes.rawValue(value), true, this.context.offset(), timestamp, this.context.partition(), this.context.topic());
        this.cache.put(this.name, this.cacheFunction.cacheKey(keyBytes), entry);
    }

    @Override
    public synchronized WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        Bytes keyBytes = Bytes.wrap((byte[])this.serdes.rawKey(key));
        WindowStoreIterator<byte[]> underlyingIterator = this.underlying.fetch(keyBytes, timeFrom, timeTo);
        Bytes cacheKeyFrom = this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(keyBytes, timeFrom));
        Bytes cacheKeyTo = this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(keyBytes, timeTo));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.name, cacheKeyFrom, cacheKeyTo);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(keyBytes, keyBytes, timeFrom, timeTo);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreIterator<V>(filteredCacheIterator, underlyingIterator, new StateSerdes(this.serdes.topic(), Serdes.Long(), this.serdes.valueSerde()));
    }

    @Override
    public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        Bytes keyFromBytes = Bytes.wrap((byte[])this.serdes.rawKey(from));
        Bytes keyToBytes = Bytes.wrap((byte[])this.serdes.rawKey(to));
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = this.underlying.fetch(keyFromBytes, keyToBytes, timeFrom, timeTo);
        Bytes cacheKeyFrom = this.cacheFunction.cacheKey(this.keySchema.lowerRange(keyFromBytes, timeFrom));
        Bytes cacheKeyTo = this.cacheFunction.cacheKey(this.keySchema.upperRange(keyToBytes, timeTo));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.name, cacheKeyFrom, cacheKeyTo);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(keyFromBytes, keyToBytes, timeFrom, timeTo);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator<K, V>(filteredCacheIterator, underlyingIterator, this.serdes, this.windowSize, this.cacheFunction);
    }

    private V fetchPrevious(Bytes key, long timestamp) {
        try (WindowStoreIterator iter = this.underlying.fetch(key, timestamp, timestamp);){
            if (!iter.hasNext()) {
                V v = null;
                return v;
            }
            V v = this.serdes.valueFrom((byte[])((KeyValue)iter.next()).value);
            return v;
        }
    }
}

