package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.function.Factory;
import org.neo4j.function.primitive.PrimitiveLongPredicate;
import org.neo4j.graphdb.Resource;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.Task;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.class */
public abstract class ProcessorStep<T> extends AbstractStep<T> {
    private TaskExecutor<ProcessorStep<T>.Sender> executor;
    private final int workAheadSize;
    private final int initialProcessorCount = 1;
    private final int maxProcessors;
    private final PrimitiveLongPredicate catchUp;
    protected final AtomicLong begunBatches;
    private final PrimitiveLongPredicate rightBeginTicket;
    private final AtomicLong lastBatchEndTime;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep$Sender.class */
    public class Sender implements BatchSender {
        private long sendTime;
        private long ticket;

        private Sender() {
        }

        @Override // org.neo4j.unsafe.impl.batchimport.staging.BatchSender
        public void send(Object obj) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ProcessorStep.this.sendDownstream(this.ticket, obj);
                this.sendTime += System.currentTimeMillis() - currentTimeMillis;
            } catch (Throwable th) {
                this.sendTime += System.currentTimeMillis() - currentTimeMillis;
                throw th;
            }
        }

        public void initialize(long j) {
            this.ticket = j;
            this.sendTime = 0L;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProcessorStep(StageControl stageControl, String str, Configuration configuration, int i, StatsProvider... statsProviderArr) {
        super(stageControl, str, configuration, statsProviderArr);
        this.initialProcessorCount = 1;
        this.catchUp = new PrimitiveLongPredicate() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.1
            public boolean accept(long j) {
                return ((long) ProcessorStep.this.queuedBatches.get()) <= j;
            }
        };
        this.begunBatches = new AtomicLong();
        this.rightBeginTicket = new PrimitiveLongPredicate() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.2
            public boolean accept(long j) {
                return ProcessorStep.this.begunBatches.get() == j;
            }
        };
        this.lastBatchEndTime = new AtomicLong();
        this.workAheadSize = configuration.workAheadSize();
        this.maxProcessors = i;
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.staging.Step
    public void start(int i) {
        super.start(i);
        this.executor = new DynamicTaskExecutor(1, this.maxProcessors, this.workAheadSize, DynamicTaskExecutor.DEFAULT_PARK_STRATEGY, name(), new Factory<ProcessorStep<T>.Sender>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.3
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public ProcessorStep<T>.Sender m426newInstance() {
                return new Sender();
            }
        });
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.Step
    public long receive(final long j, final T t) {
        long await = await(this.catchUp, this.workAheadSize);
        incrementQueue();
        this.executor.submit(new Task<ProcessorStep<T>.Sender>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.4
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.neo4j.unsafe.impl.batchimport.executor.Task
            public void run(ProcessorStep<T>.Sender sender) {
                ProcessorStep.this.assertHealthy();
                sender.initialize(j);
                try {
                    if (ProcessorStep.this.guarantees(2)) {
                        ProcessorStep.this.await(ProcessorStep.this.rightBeginTicket, j);
                    }
                    Resource permit = ProcessorStep.this.permit(t);
                    Throwable th = null;
                    try {
                        try {
                            ProcessorStep.this.begunBatches.incrementAndGet();
                            long currentTimeMillis = System.currentTimeMillis();
                            ProcessorStep.this.process(t, sender);
                            if (ProcessorStep.this.downstream == null) {
                                ProcessorStep.this.doneBatches.incrementAndGet();
                            }
                            ProcessorStep.this.totalProcessingTime.add((System.currentTimeMillis() - currentTimeMillis) - ((Sender) sender).sendTime);
                            if (permit != null) {
                                if (0 != 0) {
                                    try {
                                        permit.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    permit.close();
                                }
                            }
                            ProcessorStep.this.decrementQueue();
                            ProcessorStep.this.checkNotifyEndDownstream();
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } finally {
                    }
                } catch (Throwable th4) {
                    ProcessorStep.this.issuePanic(th4);
                }
            }
        });
        return await;
    }

    protected Resource permit(T t) throws Throwable {
        return Resource.EMPTY;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decrementQueue() {
        int decrementAndGet = this.queuedBatches.decrementAndGet();
        if (!$assertionsDisabled && decrementAndGet < 0) {
            throw new AssertionError("Negative queue size " + decrementAndGet);
        }
        if (decrementAndGet == 0) {
            this.lastBatchEndTime.set(System.currentTimeMillis());
        }
    }

    private void incrementQueue() {
        if (this.queuedBatches.getAndIncrement() == 0) {
            long j = this.lastBatchEndTime.get();
            if (j != 0) {
                this.upstreamIdleTime.addAndGet(System.currentTimeMillis() - j);
            }
        }
    }

    protected abstract void process(T t, BatchSender batchSender) throws Throwable;

    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.staging.Step, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        this.executor.shutdown(this.panic == null);
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.Parallelizable
    public int numberOfProcessors() {
        return this.executor.numberOfProcessors();
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.Parallelizable
    public boolean incrementNumberOfProcessors() {
        return this.executor.incrementNumberOfProcessors();
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.Parallelizable
    public boolean decrementNumberOfProcessors() {
        return this.executor.decrementNumberOfProcessors();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendDownstream(long j, Object obj) {
        if (guarantees(1)) {
            await(this.rightDoneTicket, j);
        }
        this.downstreamIdleTime.addAndGet(this.downstream.receive(j, obj));
        this.doneBatches.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.neo4j.unsafe.impl.batchimport.staging.AbstractStep
    public void done() {
        lastCallForEmittingOutstandingBatches(new Sender());
        super.done();
    }

    protected void lastCallForEmittingOutstandingBatches(BatchSender batchSender) {
    }

    static {
        $assertionsDisabled = !ProcessorStep.class.desiredAssertionStatus();
    }
}
