package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.LongPredicate;
import java.util.function.Supplier;
import org.neo4j.helpers.FutureAdapter;
import org.neo4j.unsafe.impl.batchimport.Parallelizable;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.class */
public class TicketedProcessing<FROM, STATE, TO> implements Parallelizable {
    private static final ParkStrategy park = new ParkStrategy.Park(10, TimeUnit.MILLISECONDS);
    private final TaskExecutor<STATE> executor;
    private final BiFunction<FROM, STATE, TO> processor;
    private final ArrayBlockingQueue<TO> processed;
    private final AtomicLong submittedTicket = new AtomicLong(-1);
    private final AtomicLong processedTicket = new AtomicLong(-1);
    private final LongPredicate myTurnToAddToProcessedQueue = new LongPredicate() { // from class: org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing.1
        @Override // java.util.function.LongPredicate
        public boolean test(long j) {
            return TicketedProcessing.this.processedTicket.get() == j - 1;
        }
    };
    private final Runnable healthCheck;
    private volatile boolean done;

    public TicketedProcessing(String str, int i, BiFunction<FROM, STATE, TO> biFunction, Supplier<STATE> supplier) {
        this.processor = biFunction;
        this.executor = new DynamicTaskExecutor(1, i, i, park, str, supplier);
        TaskExecutor<STATE> taskExecutor = this.executor;
        taskExecutor.getClass();
        this.healthCheck = taskExecutor::assertHealthy;
        this.processed = new ArrayBlockingQueue<>(i);
    }

    public void submit(long j, FROM from) {
        this.submittedTicket.incrementAndGet();
        this.executor.submit(obj -> {
            TO apply = this.processor.apply(from, obj);
            Processing.await(this.myTurnToAddToProcessedQueue, j, this.healthCheck, park);
            do {
            } while (!this.processed.offer(apply, 10L, TimeUnit.MILLISECONDS));
            this.processedTicket.incrementAndGet();
        });
    }

    public Future<Void> slurp(Iterator<FROM> it, boolean z) {
        return FutureAdapter.future(() -> {
            long j = 0;
            while (true) {
                long j2 = j;
                if (!it.hasNext()) {
                    break;
                }
                submit(j2, it.next());
                j = j2 + 1;
            }
            if (!z) {
                return null;
            }
            shutdown(true);
            return null;
        });
    }

    public void shutdown(boolean z) {
        this.done = true;
        this.executor.shutdown(z ? 1 : 0);
    }

    public TO next() {
        while (true) {
            if (this.done && this.processedTicket.get() >= this.submittedTicket.get() && this.processed.isEmpty()) {
                return null;
            }
            try {
                TO poll = this.processed.poll(10L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    return poll;
                }
                this.healthCheck.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }

    @Override // org.neo4j.unsafe.impl.batchimport.Parallelizable
    public int processors(int i) {
        return this.executor.processors(i);
    }
}
