package org.apache.samza.operators.impl.store;

import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.samza.storage.kv.ClosableIterator;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.util.TimestampedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.class */
public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStoreImpl.class);
    private final KeyValueStore<TimeSeriesKey<K>, V> kvStore;
    private final AtomicLong seqNum;
    private final boolean appendMode;

    /* loaded from: input_file:org/apache/samza/operators/impl/store/TimeSeriesStoreImpl$BoundedClosableIterator.class */
    private static class BoundedClosableIterator<T> implements ClosableIterator<T> {
        private final AtomicInteger currentCount = new AtomicInteger(0);
        private final ClosableIterator<T> wrappedIterator;
        private final int maxCount;

        public BoundedClosableIterator(ClosableIterator<T> closableIterator, int i) {
            this.wrappedIterator = closableIterator;
            this.maxCount = i;
        }

        public boolean hasNext() {
            return this.wrappedIterator.hasNext() && this.currentCount.get() < this.maxCount;
        }

        public T next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            this.currentCount.incrementAndGet();
            return (T) this.wrappedIterator.next();
        }

        public void remove() {
            this.wrappedIterator.remove();
        }

        public void close() {
            this.wrappedIterator.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/operators/impl/store/TimeSeriesStoreImpl$TimeSeriesStoreIterator.class */
    public static class TimeSeriesStoreIterator<K, V> implements ClosableIterator<TimestampedValue<V>> {
        private final KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator;

        public TimeSeriesStoreIterator(KeyValueIterator<TimeSeriesKey<K>, V> keyValueIterator) {
            this.wrappedIterator = keyValueIterator;
        }

        public void close() {
            this.wrappedIterator.close();
        }

        public boolean hasNext() {
            return this.wrappedIterator.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public TimestampedValue<V> m142next() {
            Entry entry = (Entry) this.wrappedIterator.next();
            return new TimestampedValue<>(entry.getValue(), ((TimeSeriesKey) entry.getKey()).getTimestamp());
        }

        public void remove() {
            this.wrappedIterator.remove();
        }
    }

    public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> keyValueStore, boolean z) {
        this.seqNum = new AtomicLong();
        this.kvStore = keyValueStore;
        this.appendMode = z;
    }

    public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> keyValueStore) {
        this(keyValueStore, true);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public void put(K k, V v, long j) {
        if (this.appendMode) {
            this.seqNum.getAndIncrement();
        }
        TimeSeriesKey timeSeriesKey = new TimeSeriesKey(k, j, this.seqNum.get());
        LOG.trace("Inserting {} -> {} into the store", timeSeriesKey, v);
        this.kvStore.put(timeSeriesKey, v);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public ClosableIterator<TimestampedValue<V>> get(K k, long j, long j2) {
        validateRange(j, j2);
        KeyValueIterator range = this.kvStore.range(new TimeSeriesKey(k, j, 0L), new TimeSeriesKey(k, j2, 0L));
        LOG.trace("Getting entries in the store for {} from {} to {}", new Object[]{k, Long.valueOf(j), Long.valueOf(j2)});
        return new TimeSeriesStoreIterator(range);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public ClosableIterator<TimestampedValue<V>> get(K k, long j, long j2, int i) {
        return new BoundedClosableIterator(get(k, j, j2), i);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public ClosableIterator<TimestampedValue<V>> get(K k, long j) {
        return get(k, j, j + 1);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public void remove(K k, long j, long j2) {
        validateRange(j, j2);
        TimeSeriesKey timeSeriesKey = new TimeSeriesKey(k, j, 0L);
        TimeSeriesKey timeSeriesKey2 = new TimeSeriesKey(k, j2, 0L);
        LinkedList linkedList = new LinkedList();
        KeyValueIterator range = this.kvStore.range(timeSeriesKey, timeSeriesKey2);
        while (range.hasNext()) {
            try {
                linkedList.add(((Entry) range.next()).getKey());
            } finally {
                range.close();
            }
        }
        this.kvStore.deleteAll(linkedList);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public void remove(K k, long j) {
        remove(k, j, j + 1);
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public void flush() {
        this.kvStore.flush();
    }

    @Override // org.apache.samza.operators.impl.store.TimeSeriesStore
    public void close() {
    }

    private void validateRange(long j, long j2) throws IllegalArgumentException {
        if (j < 0) {
            throw new IllegalArgumentException(String.format("Start timestamp :%d is less than zero", Long.valueOf(j)));
        }
        if (j2 < 0) {
            throw new IllegalArgumentException(String.format("End timestamp :%d is less than zero", Long.valueOf(j2)));
        }
        if (j2 < j) {
            throw new IllegalArgumentException(String.format("End timestamp :%d is less than start timestamp: %d", Long.valueOf(j2), Long.valueOf(j)));
        }
    }
}
