package org.apache.samza.table.batching;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.samza.util.HighResolutionClock;

/* loaded from: input_file:org/apache/samza/table/batching/BatchProcessor.class */
public class BatchProcessor<K, V, U> {
    private final ScheduledExecutorService scheduledExecutorService;
    private final ReentrantLock lock = new ReentrantLock();
    private final BatchHandler<K, V, U> batchHandler;
    private final BatchProvider<K, V, U> batchProvider;
    private final BatchMetrics batchMetrics;
    private final HighResolutionClock clock;
    private Batch<K, V, U> batch;
    private ScheduledFuture<?> scheduledFuture;
    private long batchOpenTimestamp;

    public BatchProcessor(BatchMetrics batchMetrics, BatchHandler<K, V, U> batchHandler, BatchProvider<K, V, U> batchProvider, HighResolutionClock highResolutionClock, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(batchHandler);
        Preconditions.checkNotNull(batchProvider);
        Preconditions.checkNotNull(highResolutionClock);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.batchHandler = batchHandler;
        this.batchProvider = batchProvider;
        this.scheduledExecutorService = scheduledExecutorService;
        this.batchMetrics = batchMetrics;
        this.clock = highResolutionClock;
    }

    private CompletableFuture<Void> addOperation(Operation<K, V, U> operation) {
        if (this.batch == null) {
            startNewBatch();
        }
        CompletableFuture<Void> addOperation = this.batch.addOperation(operation);
        if (this.batch.isClosed()) {
            processBatch(true);
        }
        return addOperation;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<V> processQueryOperation(Operation<K, V, U> operation) {
        Preconditions.checkNotNull(operation);
        Preconditions.checkArgument(operation instanceof GetOperation);
        this.lock.lock();
        try {
            GetOperation getOperation = (GetOperation) operation;
            addOperation(getOperation);
            CompletableFuture<V> completableFuture = getOperation.getCompletableFuture();
            this.lock.unlock();
            return completableFuture;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> processPutDeleteOrUpdateOperations(Operation<K, V, U> operation) {
        Preconditions.checkNotNull(operation);
        Preconditions.checkArgument((operation instanceof PutOperation) || (operation instanceof DeleteOperation) || (operation instanceof UpdateOperation));
        this.lock.lock();
        try {
            return addOperation(operation);
        } finally {
            this.lock.unlock();
        }
    }

    private void processBatch(boolean z) {
        mayCancelTimer(z);
        closeBatch();
        this.batchHandler.handle(this.batch);
        startNewBatch();
    }

    private void startNewBatch() {
        this.batch = this.batchProvider.getBatch();
        this.batchOpenTimestamp = this.clock.nanoTime();
        this.batchMetrics.incBatchCount();
        setBatchTimer(this.batch);
    }

    private void closeBatch() {
        this.batch.close();
        this.batchMetrics.updateBatchDuration(this.clock.nanoTime() - this.batchOpenTimestamp);
    }

    private void mayCancelTimer(boolean z) {
        if (!z || this.scheduledFuture == null) {
            return;
        }
        this.scheduledFuture.cancel(true);
    }

    private void setBatchTimer(Batch<K, V, U> batch) {
        long millis = batch.getMaxBatchDelay().toMillis();
        if (millis != 2147483647L) {
            this.scheduledFuture = this.scheduledExecutorService.schedule(() -> {
                this.lock.lock();
                try {
                    processBatch(false);
                } finally {
                    this.lock.unlock();
                }
            }, millis, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
    }

    @VisibleForTesting
    int size() {
        if (this.batch == null) {
            return 0;
        }
        return this.batch.size();
    }

    @VisibleForTesting
    Operation<K, V, U> getLatestPutUpdateOrDelete(K k) {
        Operation<K, V, U> operation = null;
        for (Operation<K, V, U> operation2 : this.batch.getOperations()) {
            if ((operation2 instanceof PutOperation) || (operation2 instanceof DeleteOperation) || (operation2 instanceof UpdateOperation)) {
                if (operation2.getKey().equals(k)) {
                    operation = operation2;
                }
            }
        }
        return operation;
    }
}
