package org.apache.samza.table.caching;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.BaseReadWriteUpdateTable;
import org.apache.samza.table.ReadWriteUpdateTable;
import org.apache.samza.table.utils.TableMetricsUtil;

/* loaded from: input_file:org/apache/samza/table/caching/CachingTable.class */
public class CachingTable<K, V, U> extends BaseReadWriteUpdateTable<K, V, U> implements ReadWriteUpdateTable<K, V, U> {
    private final ReadWriteUpdateTable<K, V, U> table;
    private final ReadWriteUpdateTable<K, V, U> cache;
    private final boolean isWriteAround;
    private AtomicLong hitCount;
    private AtomicLong missCount;

    public CachingTable(String str, ReadWriteUpdateTable<K, V, U> readWriteUpdateTable, ReadWriteUpdateTable<K, V, U> readWriteUpdateTable2, boolean z) {
        super(str);
        this.hitCount = new AtomicLong();
        this.missCount = new AtomicLong();
        this.table = readWriteUpdateTable;
        this.cache = readWriteUpdateTable2;
        this.isWriteAround = z;
    }

    @Override // org.apache.samza.table.BaseReadWriteUpdateTable
    public void init(Context context) {
        super.init(context);
        TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, this.tableId);
        tableMetricsUtil.newGauge("hit-rate", () -> {
            return Double.valueOf(hitRate());
        });
        tableMetricsUtil.newGauge("miss-rate", () -> {
            return Double.valueOf(missRate());
        });
        tableMetricsUtil.newGauge("req-count", () -> {
            return Long.valueOf(requestCount());
        });
    }

    private List<K> lookupCache(List<K> list, Map<K, V> map, Object... objArr) {
        ArrayList arrayList = new ArrayList();
        map.putAll(this.cache.getAll(list, objArr));
        list.forEach(obj -> {
            if (map.containsKey(obj)) {
                return;
            }
            arrayList.add(obj);
        });
        return arrayList;
    }

    public V get(K k, Object... objArr) {
        try {
            return getAsync(k, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<V> getAsync(K k, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numGets);
        Object obj = this.cache.get(k, objArr);
        if (obj != null) {
            this.hitCount.incrementAndGet();
            return CompletableFuture.completedFuture(obj);
        }
        long nanoTime = this.clock.nanoTime();
        this.missCount.incrementAndGet();
        return this.table.getAsync(k, objArr).handle((obj2, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to get the record for " + k, th);
            }
            if (obj2 != null) {
                this.cache.put(k, obj2, objArr);
            }
            TableMetricsUtil.updateTimer(this.metrics.getNs, this.clock.nanoTime() - nanoTime);
            return obj2;
        });
    }

    public Map<K, V> getAll(List<K> list, Object... objArr) {
        try {
            return getAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Map<K, V>> getAllAsync(List<K> list, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numGetAlls);
        HashMap hashMap = new HashMap();
        List<K> lookupCache = lookupCache(list, hashMap, new Object[0]);
        if (lookupCache.isEmpty()) {
            return CompletableFuture.completedFuture(hashMap);
        }
        long nanoTime = this.clock.nanoTime();
        return this.table.getAllAsync(lookupCache, objArr).handle((map, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to get records for " + list, th);
            }
            if (map != null) {
                this.cache.putAll((List) map.entrySet().stream().map(entry -> {
                    return new Entry(entry.getKey(), entry.getValue());
                }).collect(Collectors.toList()), objArr);
                hashMap.putAll(map);
            }
            TableMetricsUtil.updateTimer(this.metrics.getAllNs, this.clock.nanoTime() - nanoTime);
            return hashMap;
        });
    }

    public void put(K k, V v, Object... objArr) {
        try {
            putAsync(k, v, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> putAsync(K k, V v, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numPuts);
        Preconditions.checkNotNull(this.table, "Cannot write to a read-only table: " + this.table);
        long nanoTime = this.clock.nanoTime();
        return this.table.putAsync(k, v, objArr).handle((r12, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to put a record, key=" + k + ", value=" + v, th);
            }
            if (!this.isWriteAround) {
                if (v == null) {
                    this.cache.delete(k, objArr);
                } else {
                    this.cache.put(k, v, objArr);
                }
            }
            TableMetricsUtil.updateTimer(this.metrics.putNs, this.clock.nanoTime() - nanoTime);
            return r12;
        });
    }

    public void putAll(List<Entry<K, V>> list, Object... objArr) {
        try {
            putAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> list, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numPutAlls);
        long nanoTime = this.clock.nanoTime();
        Preconditions.checkNotNull(this.table, "Cannot write to a read-only table: " + this.table);
        return this.table.putAllAsync(list, objArr).handle((r11, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to put records " + list, th);
            }
            if (!this.isWriteAround) {
                this.cache.putAll(list, objArr);
            }
            TableMetricsUtil.updateTimer(this.metrics.putAllNs, this.clock.nanoTime() - nanoTime);
            return r11;
        });
    }

    public void update(K k, U u) {
        throw new SamzaException("Caching not supported with updates");
    }

    public void updateAll(List<Entry<K, U>> list) {
        throw new SamzaException("Caching not supported with updates");
    }

    public CompletableFuture<Void> updateAsync(K k, U u) {
        throw new SamzaException("Caching not supported with updates");
    }

    public CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> list) {
        throw new SamzaException("Caching not supported with updates");
    }

    public void delete(K k, Object... objArr) {
        try {
            deleteAsync(k, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> deleteAsync(K k, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numDeletes);
        long nanoTime = this.clock.nanoTime();
        Preconditions.checkNotNull(this.table, "Cannot delete from a read-only table: " + this.table);
        return this.table.deleteAsync(k, objArr).handle((r11, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to delete the record for " + k, th);
            }
            if (!this.isWriteAround) {
                this.cache.delete(k, objArr);
            }
            TableMetricsUtil.updateTimer(this.metrics.deleteNs, this.clock.nanoTime() - nanoTime);
            return r11;
        });
    }

    public void deleteAll(List<K> list, Object... objArr) {
        try {
            deleteAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> deleteAllAsync(List<K> list, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numDeleteAlls);
        long nanoTime = this.clock.nanoTime();
        Preconditions.checkNotNull(this.table, "Cannot delete from a read-only table: " + this.table);
        return this.table.deleteAllAsync(list, objArr).handle((r11, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to delete the record for " + list, th);
            }
            if (!this.isWriteAround) {
                this.cache.deleteAll(list, objArr);
            }
            TableMetricsUtil.updateTimer(this.metrics.deleteAllNs, this.clock.nanoTime() - nanoTime);
            return r11;
        });
    }

    public <T> CompletableFuture<T> readAsync(int i, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numReads);
        long nanoTime = this.clock.nanoTime();
        return this.table.readAsync(i, objArr).handle((BiFunction) (obj, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to read, opId=" + i, th);
            }
            TableMetricsUtil.updateTimer(this.metrics.readNs, this.clock.nanoTime() - nanoTime);
            return obj;
        });
    }

    public <T> CompletableFuture<T> writeAsync(int i, Object... objArr) {
        TableMetricsUtil.incCounter(this.metrics.numWrites);
        long nanoTime = this.clock.nanoTime();
        return this.table.writeAsync(i, objArr).handle((BiFunction) (obj, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to write, opId=" + i, th);
            }
            TableMetricsUtil.updateTimer(this.metrics.writeNs, this.clock.nanoTime() - nanoTime);
            return obj;
        });
    }

    public synchronized void flush() {
        TableMetricsUtil.incCounter(this.metrics.numFlushes);
        long nanoTime = this.clock.nanoTime();
        Preconditions.checkNotNull(this.table, "Cannot flush a read-only table: " + this.table);
        this.table.flush();
        TableMetricsUtil.updateTimer(this.metrics.flushNs, this.clock.nanoTime() - nanoTime);
    }

    public void close() {
        this.cache.close();
        this.table.close();
    }

    double hitRate() {
        long requestCount = requestCount();
        if (requestCount == 0) {
            return 1.0d;
        }
        return this.hitCount.get() / requestCount;
    }

    double missRate() {
        long requestCount = requestCount();
        if (requestCount == 0) {
            return 1.0d;
        }
        return this.missCount.get() / requestCount;
    }

    long requestCount() {
        return this.hitCount.get() + this.missCount.get();
    }
}
