package org.apache.samza.table.remote;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.BaseReadWriteUpdateTable;
import org.apache.samza.table.ReadWriteUpdateTable;
import org.apache.samza.table.RecordNotFoundException;
import org.apache.samza.table.batching.AsyncBatchingTable;
import org.apache.samza.table.batching.BatchProvider;
import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
import org.apache.samza.table.retry.AsyncRetriableTable;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.TableMetricsUtil;

/* loaded from: input_file:org/apache/samza/table/remote/RemoteTable.class */
public final class RemoteTable<K, V, U> extends BaseReadWriteUpdateTable<K, V, U> implements ReadWriteUpdateTable<K, V, U>, AsyncReadWriteUpdateTable<K, V, U> {
    protected final TableReadFunction<K, V> readFn;
    protected final TableWriteFunction<K, V, U> writeFn;
    protected final TableRateLimiter<K, V> readRateLimiter;
    protected final TableRateLimiter<K, V> writeRateLimiter;
    protected final TableRateLimiter<K, U> updateRateLimiter;
    protected final ExecutorService rateLimitingExecutor;
    protected final TableRetryPolicy readRetryPolicy;
    protected final TableRetryPolicy writeRetryPolicy;
    protected final ScheduledExecutorService retryExecutor;
    protected final BatchProvider<K, V, U> batchProvider;
    protected final ScheduledExecutorService batchExecutor;
    protected final ExecutorService callbackExecutor;
    protected final AsyncReadWriteUpdateTable<K, V, U> asyncTable;

    public RemoteTable(String str, TableReadFunction<K, V> tableReadFunction, TableWriteFunction<K, V, U> tableWriteFunction, TableRateLimiter<K, V> tableRateLimiter, TableRateLimiter<K, V> tableRateLimiter2, TableRateLimiter<K, U> tableRateLimiter3, ExecutorService executorService, TableRetryPolicy tableRetryPolicy, TableRetryPolicy tableRetryPolicy2, ScheduledExecutorService scheduledExecutorService, BatchProvider<K, V, U> batchProvider, ScheduledExecutorService scheduledExecutorService2, ExecutorService executorService2) {
        super(str);
        Preconditions.checkArgument((tableWriteFunction == null && tableReadFunction == null) ? false : true, "Must have one of TableReadFunction or TableWriteFunction");
        this.readFn = tableReadFunction;
        this.writeFn = tableWriteFunction;
        this.readRateLimiter = tableRateLimiter;
        this.writeRateLimiter = tableRateLimiter2;
        this.updateRateLimiter = tableRateLimiter3;
        this.rateLimitingExecutor = executorService;
        this.readRetryPolicy = tableRetryPolicy;
        this.writeRetryPolicy = tableRetryPolicy2;
        this.callbackExecutor = executorService2;
        this.retryExecutor = scheduledExecutorService;
        this.batchProvider = batchProvider;
        this.batchExecutor = scheduledExecutorService2;
        AsyncReadWriteUpdateTable asyncRemoteTable = new AsyncRemoteTable(tableReadFunction, tableWriteFunction);
        asyncRemoteTable = (tableRateLimiter == null && tableRateLimiter2 == null && tableRateLimiter3 == null) ? asyncRemoteTable : new AsyncRateLimitedTable(str, asyncRemoteTable, tableRateLimiter, tableRateLimiter2, tableRateLimiter3, executorService);
        asyncRemoteTable = (tableRetryPolicy == null && tableRetryPolicy2 == null) ? asyncRemoteTable : new AsyncRetriableTable(str, asyncRemoteTable, tableRetryPolicy, tableRetryPolicy2, scheduledExecutorService, tableReadFunction, tableWriteFunction);
        this.asyncTable = batchProvider != null ? new AsyncBatchingTable(str, asyncRemoteTable, batchProvider, scheduledExecutorService2) : asyncRemoteTable;
    }

    public V get(K k, Object... objArr) {
        try {
            return getAsync(k, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<V> getAsync(K k, Object... objArr) {
        Preconditions.checkNotNull(k, "null key");
        return instrument(() -> {
            return this.asyncTable.getAsync(k, objArr);
        }, this.metrics.numGets, this.metrics.getNs).handle((obj, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to get the records for " + k, th);
            }
            if (obj == null) {
                TableMetricsUtil.incCounter(this.metrics.numMissedLookups);
            }
            return obj;
        });
    }

    public Map<K, V> getAll(List<K> list, Object... objArr) {
        try {
            return getAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Map<K, V>> getAllAsync(List<K> list, Object... objArr) {
        Preconditions.checkNotNull(list, "null keys");
        return list.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : instrument(() -> {
            return this.asyncTable.getAllAsync(list, objArr);
        }, this.metrics.numGetAlls, this.metrics.getAllNs).handle((map, th) -> {
            if (th != null) {
                throw new SamzaException("Failed to get the records for " + list, th);
            }
            map.values().stream().filter(Objects::isNull).forEach(obj -> {
                TableMetricsUtil.incCounter(this.metrics.numMissedLookups);
            });
            return map;
        });
    }

    public <T> T read(int i, Object... objArr) {
        try {
            return readAsync(i, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public <T> CompletableFuture<T> readAsync(int i, Object... objArr) {
        return instrument(() -> {
            return this.asyncTable.readAsync(i, objArr);
        }, this.metrics.numReads, this.metrics.readNs).exceptionally((Function) th -> {
            throw new SamzaException(String.format("Failed to read, opId=%d", Integer.valueOf(i)), th);
        });
    }

    public void put(K k, V v, Object... objArr) {
        try {
            putAsync(k, v, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> putAsync(K k, V v, Object... objArr) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(k, "null key");
        return v == null ? deleteAsync(k, objArr) : instrument(() -> {
            return this.asyncTable.putAsync(k, v, objArr);
        }, this.metrics.numPuts, this.metrics.putNs).exceptionally(th -> {
            throw new SamzaException("Failed to put a record with key=" + k, th);
        });
    }

    public void putAll(List<Entry<K, V>> list, Object... objArr) {
        try {
            putAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> list, Object... objArr) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(list, "null records");
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        List<K> list2 = (List) list.stream().filter(entry -> {
            return entry.getValue() == null;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        List list3 = (List) list.stream().filter(entry2 -> {
            return entry2.getValue() != null;
        }).collect(Collectors.toList());
        return CompletableFuture.allOf(list2.isEmpty() ? CompletableFuture.completedFuture(null) : deleteAllAsync(list2, objArr), instrument(() -> {
            return this.asyncTable.putAllAsync(list3, objArr);
        }, this.metrics.numPutAlls, this.metrics.putAllNs)).exceptionally(th -> {
            throw new SamzaException("Failed to put records with keys=" + ((String) list.stream().map(entry3 -> {
                return entry3.getKey().toString();
            }).collect(Collectors.joining(","))), th);
        });
    }

    public void update(K k, U u) {
        try {
            updateAsync(k, u).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public void updateAll(List<Entry<K, U>> list) {
        try {
            updateAllAsync(list).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> updateAsync(K k, U u) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(k, "null key");
        Preconditions.checkNotNull(u, "null update");
        return instrument(() -> {
            return this.asyncTable.updateAsync(k, u);
        }, this.metrics.numUpdates, this.metrics.updateNs).exceptionally(th -> {
            if (th.getCause() instanceof RecordNotFoundException) {
                throw th.getCause();
            }
            throw new SamzaException("Failed to update a record with key=" + k, th);
        });
    }

    public CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> list) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(list, "null records");
        return list.isEmpty() ? CompletableFuture.completedFuture(null) : instrument(() -> {
            return this.asyncTable.updateAllAsync(list);
        }, this.metrics.numUpdateAlls, this.metrics.updateAllNs).exceptionally(th -> {
            throw new SamzaException("Failed to put records with keys=" + ((String) list.stream().map(entry -> {
                return entry.getKey().toString();
            }).collect(Collectors.joining(","))), th);
        });
    }

    public void delete(K k, Object... objArr) {
        try {
            deleteAsync(k, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> deleteAsync(K k, Object... objArr) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(k, "null key");
        return instrument(() -> {
            return this.asyncTable.deleteAsync(k, objArr);
        }, this.metrics.numDeletes, this.metrics.deleteNs).exceptionally(th -> {
            throw new SamzaException("Failed to delete the record for " + k, th);
        });
    }

    public void deleteAll(List<K> list, Object... objArr) {
        try {
            deleteAllAsync(list, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public CompletableFuture<Void> deleteAllAsync(List<K> list, Object... objArr) {
        Preconditions.checkNotNull(this.writeFn, "null write function");
        Preconditions.checkNotNull(list, "null keys");
        return list.isEmpty() ? CompletableFuture.completedFuture(null) : instrument(() -> {
            return this.asyncTable.deleteAllAsync(list, objArr);
        }, this.metrics.numDeleteAlls, this.metrics.deleteAllNs).exceptionally(th -> {
            throw new SamzaException("Failed to delete records for " + list, th);
        });
    }

    public <T> T write(int i, Object... objArr) {
        try {
            return writeAsync(i, objArr).get();
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public <T> CompletableFuture<T> writeAsync(int i, Object... objArr) {
        return instrument(() -> {
            return this.asyncTable.writeAsync(i, objArr);
        }, this.metrics.numWrites, this.metrics.writeNs).exceptionally((Function) th -> {
            throw new SamzaException(String.format("Failed to write, opId=%d", Integer.valueOf(i)), th);
        });
    }

    @Override // org.apache.samza.table.BaseReadWriteUpdateTable
    public void init(Context context) {
        super.init(context);
        this.asyncTable.init(context);
        if (this.readFn != null) {
            this.readFn.init(context, this);
        }
        if (this.writeFn != null) {
            this.writeFn.init(context, this);
        }
    }

    public void flush() {
        try {
            TableMetricsUtil.incCounter(this.metrics.numFlushes);
            long nanoTime = this.clock.nanoTime();
            this.asyncTable.flush();
            TableMetricsUtil.updateTimer(this.metrics.flushNs, this.clock.nanoTime() - nanoTime);
        } catch (Exception e) {
            this.logger.error("Failed to flush remote store", e);
            throw new SamzaException("Failed to flush remote store", e);
        }
    }

    public void close() {
        this.asyncTable.close();
    }

    public TableReadFunction<K, V> getReadFunction() {
        return this.readFn;
    }

    public TableWriteFunction<K, V, U> getWriteFunction() {
        return this.writeFn;
    }

    protected <T> CompletableFuture<T> instrument(BaseReadWriteUpdateTable.Func1<T> func1, Counter counter, Timer timer) {
        TableMetricsUtil.incCounter(counter);
        long nanoTime = this.clock.nanoTime();
        CompletableFuture<T> apply = func1.apply();
        if (this.callbackExecutor != null) {
            apply.thenApplyAsync((Function) obj -> {
                TableMetricsUtil.updateTimer(timer, this.clock.nanoTime() - nanoTime);
                return obj;
            }, (Executor) this.callbackExecutor);
        } else {
            apply.thenApply((Function) obj2 -> {
                TableMetricsUtil.updateTimer(timer, this.clock.nanoTime() - nanoTime);
                return obj2;
            });
        }
        return apply;
    }

    public BatchProvider<K, V, U> getBatchProvider() {
        return this.batchProvider;
    }
}
