/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.ExceptionUtils;
import org.apache.kafka.streams.state.internals.FilteredCacheIterator;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheSessionStoreIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.SegmentedCacheFunction;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CachingSessionStore
extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
implements SessionStore<Bytes, byte[]>,
CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(CachingSessionStore.class);
    private final SessionKeySchema keySchema = new SessionKeySchema();
    private final SegmentedCacheFunction cacheFunction;
    private String cacheName;
    private InternalProcessorContext context;
    private CacheFlushListener<byte[], byte[]> flushListener;
    private boolean sendOldValues;
    private long maxObservedTimestamp;

    CachingSessionStore(SessionStore<Bytes, byte[]> bytesStore, long segmentInterval) {
        super(bytesStore);
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, segmentInterval);
        this.maxObservedTimestamp = -1L;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.initInternal((InternalProcessorContext)context);
        super.init(context, root);
    }

    private void initInternal(InternalProcessorContext context) {
        this.context = context;
        this.cacheName = context.taskId() + "-" + this.name();
        context.registerCacheFlushListener(this.cacheName, entries -> {
            for (ThreadCache.DirtyEntry entry : entries) {
                this.putAndMaybeForward(entry, context);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        Bytes binaryKey = this.cacheFunction.key(entry.key());
        Windowed<Bytes> bytesKey = SessionKeySchema.from(binaryKey);
        if (this.flushListener != null) {
            byte[] oldValueBytes;
            byte[] newValueBytes = entry.newValue();
            byte[] byArray = oldValueBytes = newValueBytes == null || this.sendOldValues ? (byte[])((SessionStore)this.wrapped()).fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null;
            if (newValueBytes != null || oldValueBytes != null) {
                ((SessionStore)this.wrapped()).put(bytesKey, entry.newValue());
                ProcessorRecordContext current = context.recordContext();
                context.setRecordContext(entry.entry().context());
                try {
                    this.flushListener.apply(binaryKey.get(), newValueBytes, (byte[])(this.sendOldValues ? oldValueBytes : null), entry.entry().context().timestamp());
                }
                finally {
                    context.setRecordContext(current);
                }
            }
        } else {
            ((SessionStore)this.wrapped()).put(bytesKey, entry.newValue());
        }
    }

    @Override
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> flushListener, boolean sendOldValues) {
        this.flushListener = flushListener;
        this.sendOldValues = sendOldValues;
        return true;
    }

    @Override
    public void put(Windowed<Bytes> key, byte[] value) {
        this.validateStoreOpen();
        Bytes binaryKey = SessionKeySchema.toBinary(key);
        LRUCacheEntry entry = new LRUCacheEntry(value, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic());
        this.context.cache().put(this.cacheName, this.cacheFunction.cacheKey(binaryKey), entry);
        this.maxObservedTimestamp = Math.max(this.keySchema.segmentTimestamp(binaryKey), this.maxObservedTimestamp);
    }

    @Override
    public void remove(Windowed<Bytes> sessionKey) {
        this.validateStoreOpen();
        this.put(sessionKey, null);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
        this.validateStoreOpen();
        CacheIteratorWrapper cacheIterator = ((SessionStore)this.wrapped()).persistent() ? new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) : this.context.cache().range(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(key, latestSessionStartTime)));
        KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = ((SessionStore)this.wrapped()).findSessions(key, earliestSessionEndTime, latestSessionStartTime);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(key, key, earliestSessionEndTime, latestSessionStartTime);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, this.cacheFunction);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        if (keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        Bytes cacheKeyFrom = this.cacheFunction.cacheKey(this.keySchema.lowerRange(keyFrom, earliestSessionEndTime));
        Bytes cacheKeyTo = this.cacheFunction.cacheKey(this.keySchema.upperRange(keyTo, latestSessionStartTime));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.context.cache().range(this.cacheName, cacheKeyFrom, cacheKeyTo);
        KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = ((SessionStore)this.wrapped()).findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, this.cacheFunction);
    }

    @Override
    public byte[] fetchSession(Bytes key, long startTime, long endTime) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        if (this.context.cache() == null) {
            return (byte[])((SessionStore)this.wrapped()).fetchSession(key, startTime, endTime);
        }
        Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime);
        Bytes cacheKey = this.cacheFunction.cacheKey(bytesKey);
        LRUCacheEntry entry = this.context.cache().get(this.cacheName, cacheKey);
        if (entry == null) {
            return (byte[])((SessionStore)this.wrapped()).fetchSession(key, startTime, endTime);
        }
        return entry.value();
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        return this.findSessions(key, 0L, Long.MAX_VALUE);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to) {
        Objects.requireNonNull(from, "from cannot be null");
        Objects.requireNonNull(to, "to cannot be null");
        return this.findSessions(from, to, 0L, Long.MAX_VALUE);
    }

    @Override
    public void flush() {
        this.context.cache().flush(this.cacheName);
        ((SessionStore)this.wrapped()).flush();
    }

    @Override
    public void close() {
        Runnable[] runnableArray = new Runnable[3];
        runnableArray[0] = () -> this.context.cache().flush(this.cacheName);
        runnableArray[1] = () -> this.context.cache().close(this.cacheName);
        runnableArray[2] = ((SessionStore)this.wrapped())::close;
        LinkedList<RuntimeException> suppressed = ExceptionUtils.executeAll(runnableArray);
        if (!suppressed.isEmpty()) {
            ExceptionUtils.throwSuppressed("Caught an exception while closing caching session store for store " + this.name(), suppressed);
        }
    }

    private class CacheIteratorWrapper
    implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
        private final long segmentInterval;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long latestSessionStartTime;
        private long lastSegmentId;
        private long currentSegmentId;
        private Bytes cacheKeyFrom;
        private Bytes cacheKeyTo;
        private ThreadCache.MemoryLRUCacheBytesIterator current;

        private CacheIteratorWrapper(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
            this(key, key, earliestSessionEndTime, latestSessionStartTime);
        }

        private CacheIteratorWrapper(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
            this.keyFrom = keyFrom;
            this.keyTo = keyTo;
            this.latestSessionStartTime = latestSessionStartTime;
            this.lastSegmentId = CachingSessionStore.this.cacheFunction.segmentId(CachingSessionStore.this.maxObservedTimestamp);
            this.segmentInterval = CachingSessionStore.this.cacheFunction.getSegmentInterval();
            this.currentSegmentId = CachingSessionStore.this.cacheFunction.segmentId(earliestSessionEndTime);
            this.setCacheKeyRange(earliestSessionEndTime, this.currentSegmentLastTime());
            this.current = CachingSessionStore.this.context.cache().range(CachingSessionStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
        }

        @Override
        public boolean hasNext() {
            if (this.current == null) {
                return false;
            }
            if (this.current.hasNext()) {
                return true;
            }
            while (!this.current.hasNext()) {
                this.getNextSegmentIterator();
                if (this.current != null) continue;
                return false;
            }
            return true;
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.peekNextKey();
        }

        @Override
        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.peekNext();
        }

        @Override
        public KeyValue<Bytes, LRUCacheEntry> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.next();
        }

        @Override
        public void close() {
            this.current.close();
        }

        private long currentSegmentBeginTime() {
            return this.currentSegmentId * this.segmentInterval;
        }

        private long currentSegmentLastTime() {
            return this.currentSegmentBeginTime() + this.segmentInterval - 1L;
        }

        private void getNextSegmentIterator() {
            ++this.currentSegmentId;
            this.lastSegmentId = CachingSessionStore.this.cacheFunction.segmentId(CachingSessionStore.this.maxObservedTimestamp);
            if (this.currentSegmentId > this.lastSegmentId) {
                this.current = null;
                return;
            }
            this.setCacheKeyRange(this.currentSegmentBeginTime(), this.currentSegmentLastTime());
            this.current.close();
            this.current = CachingSessionStore.this.context.cache().range(CachingSessionStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
        }

        private void setCacheKeyRange(long lowerRangeEndTime, long upperRangeEndTime) {
            if (CachingSessionStore.this.cacheFunction.segmentId(lowerRangeEndTime) != CachingSessionStore.this.cacheFunction.segmentId(upperRangeEndTime)) {
                throw new IllegalStateException("Error iterating over segments: segment interval has changed");
            }
            if (this.keyFrom == this.keyTo) {
                this.cacheKeyFrom = CachingSessionStore.this.cacheFunction.cacheKey(this.segmentLowerRangeFixedSize(this.keyFrom, lowerRangeEndTime));
                this.cacheKeyTo = CachingSessionStore.this.cacheFunction.cacheKey(this.segmentUpperRangeFixedSize(this.keyTo, upperRangeEndTime));
            } else {
                this.cacheKeyFrom = CachingSessionStore.this.cacheFunction.cacheKey(CachingSessionStore.this.keySchema.lowerRange(this.keyFrom, lowerRangeEndTime), this.currentSegmentId);
                this.cacheKeyTo = CachingSessionStore.this.cacheFunction.cacheKey(CachingSessionStore.this.keySchema.upperRange(this.keyTo, this.latestSessionStartTime), this.currentSegmentId);
            }
        }

        private Bytes segmentLowerRangeFixedSize(Bytes key, long segmentBeginTime) {
            Windowed<Bytes> sessionKey = new Windowed<Bytes>(key, new SessionWindow(0L, Math.max(0L, segmentBeginTime)));
            return SessionKeySchema.toBinary(sessionKey);
        }

        private Bytes segmentUpperRangeFixedSize(Bytes key, long segmentEndTime) {
            Windowed<Bytes> sessionKey = new Windowed<Bytes>(key, new SessionWindow(Math.min(this.latestSessionStartTime, segmentEndTime), segmentEndTime));
            return SessionKeySchema.toBinary(sessionKey);
        }
    }
}

