package org.apache.samza.table.ratelimit;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.BaseReadWriteUpdateTable;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.utils.TableMetricsUtil;

/* loaded from: input_file:org/apache/samza/table/ratelimit/AsyncRateLimitedTable.class */
public class AsyncRateLimitedTable<K, V, U> implements AsyncReadWriteUpdateTable<K, V, U> {
    private final String tableId;
    private final AsyncReadWriteUpdateTable<K, V, U> table;
    private final TableRateLimiter<K, V> readRateLimiter;
    private final TableRateLimiter<K, V> writeRateLimiter;
    private final TableRateLimiter<K, U> updateRateLimiter;
    private final ExecutorService rateLimitingExecutor;

    public AsyncRateLimitedTable(String str, AsyncReadWriteUpdateTable<K, V, U> asyncReadWriteUpdateTable, TableRateLimiter<K, V> tableRateLimiter, TableRateLimiter<K, V> tableRateLimiter2, TableRateLimiter<K, U> tableRateLimiter3, ExecutorService executorService) {
        Preconditions.checkNotNull(str, "null tableId");
        Preconditions.checkNotNull(asyncReadWriteUpdateTable, "null table");
        Preconditions.checkNotNull(executorService, "null rateLimitingExecutor");
        Preconditions.checkArgument((tableRateLimiter == null && tableRateLimiter2 == null) ? false : true, "both readRateLimiter and writeRateLimiter are null");
        this.tableId = str;
        this.table = asyncReadWriteUpdateTable;
        this.readRateLimiter = tableRateLimiter;
        this.writeRateLimiter = tableRateLimiter2;
        this.updateRateLimiter = tableRateLimiter3;
        this.rateLimitingExecutor = executorService;
    }

    public CompletableFuture<V> getAsync(K k, Object... objArr) {
        return (CompletableFuture<V>) doRead(() -> {
            this.readRateLimiter.throttle(k, objArr);
        }, () -> {
            return this.table.getAsync(k, objArr);
        });
    }

    public CompletableFuture<Map<K, V>> getAllAsync(List<K> list, Object... objArr) {
        return (CompletableFuture<Map<K, V>>) doRead(() -> {
            this.readRateLimiter.throttle(list, objArr);
        }, () -> {
            return this.table.getAllAsync(list, objArr);
        });
    }

    public <T> CompletableFuture<T> readAsync(int i, Object... objArr) {
        return doRead(() -> {
            this.readRateLimiter.throttle(i, objArr);
        }, () -> {
            return this.table.readAsync(i, objArr);
        });
    }

    public CompletableFuture<Void> putAsync(K k, V v, Object... objArr) {
        return doWrite(() -> {
            this.writeRateLimiter.throttle(k, v, objArr);
        }, () -> {
            return this.table.putAsync(k, v, objArr);
        });
    }

    public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> list, Object... objArr) {
        return doWrite(() -> {
            this.writeRateLimiter.throttleRecords(list, new Object[0]);
        }, () -> {
            return this.table.putAllAsync(list, objArr);
        });
    }

    public CompletableFuture<Void> updateAsync(K k, U u) {
        return doUpdate(() -> {
            this.updateRateLimiter.throttle(k, u, new Object[0]);
        }, () -> {
            return this.table.updateAsync(k, u);
        });
    }

    public CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> list) {
        return doUpdate(() -> {
            this.updateRateLimiter.throttleRecords(list, new Object[0]);
        }, () -> {
            return this.table.updateAllAsync(list);
        });
    }

    public CompletableFuture<Void> deleteAsync(K k, Object... objArr) {
        return doWrite(() -> {
            this.writeRateLimiter.throttle(k, objArr);
        }, () -> {
            return this.table.deleteAsync(k, objArr);
        });
    }

    public CompletableFuture<Void> deleteAllAsync(List<K> list, Object... objArr) {
        return doWrite(() -> {
            this.writeRateLimiter.throttle(list, objArr);
        }, () -> {
            return this.table.deleteAllAsync(list, objArr);
        });
    }

    public <T> CompletableFuture<T> writeAsync(int i, Object... objArr) {
        return doWrite(() -> {
            this.writeRateLimiter.throttle(i, objArr);
        }, () -> {
            return this.table.writeAsync(i, objArr);
        });
    }

    public void init(Context context) {
        this.table.init(context);
        if (new MetricsConfig(context.getJobContext().getConfig()).getMetricsTimerEnabled()) {
            TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, this.tableId);
            if (isReadRateLimited()) {
                this.readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
            }
            if (isWriteRateLimited()) {
                this.writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
            }
            if (isUpdateRateLimited()) {
                this.updateRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("update-throttle-ns"));
            }
        }
    }

    public void flush() {
        this.table.flush();
    }

    public void close() {
        this.table.close();
    }

    private boolean isReadRateLimited() {
        return this.readRateLimiter != null;
    }

    private boolean isWriteRateLimited() {
        return this.writeRateLimiter != null;
    }

    private boolean isUpdateRateLimited() {
        return this.updateRateLimiter != null;
    }

    private <T> CompletableFuture<T> doRead(BaseReadWriteUpdateTable.Func0 func0, BaseReadWriteUpdateTable.Func1<T> func1) {
        return isReadRateLimited() ? CompletableFuture.runAsync(() -> {
            func0.apply();
        }, this.rateLimitingExecutor).thenCompose(r3 -> {
            return func1.apply();
        }) : func1.apply();
    }

    private <T> CompletableFuture<T> doWrite(BaseReadWriteUpdateTable.Func0 func0, BaseReadWriteUpdateTable.Func1<T> func1) {
        return isWriteRateLimited() ? CompletableFuture.runAsync(() -> {
            func0.apply();
        }, this.rateLimitingExecutor).thenCompose(r3 -> {
            return func1.apply();
        }) : func1.apply();
    }

    private <T> CompletableFuture<T> doUpdate(BaseReadWriteUpdateTable.Func0 func0, BaseReadWriteUpdateTable.Func1<T> func1) {
        return isUpdateRateLimited() ? CompletableFuture.runAsync(() -> {
            func0.apply();
        }, this.rateLimitingExecutor).thenCompose(r3 -> {
            return func1.apply();
        }) : func1.apply();
    }
}
