package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.class */
public abstract class ForkedProcessorStep<T> extends ProcessorStep<T> {
    protected static final int MAIN = 0;
    private final AtomicInteger doneSignal;
    private final int maxForkedProcessors;
    protected final List<ForkedProcessorStep<T>.ForkedProcessor> forkedProcessors;
    private T currentBatch;
    private volatile long globalTicket;
    private volatile int processorCount;
    private Throwable error;
    private Thread submitterThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep$ForkedProcessor.class */
    public class ForkedProcessor extends Thread {
        private final int id;
        private volatile boolean halted;
        private long localTicket;

        ForkedProcessor(int i) {
            super(ForkedProcessorStep.this.name() + "-" + i);
            this.id = i;
            this.localTicket = ForkedProcessorStep.this.globalTicket;
            start();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.halted) {
                boolean z = false;
                try {
                    try {
                        LockSupport.park();
                        if (!this.halted && this.localTicket + 1 == ForkedProcessorStep.this.globalTicket) {
                            z = true;
                            ForkedProcessorStep.this.forkedProcess(this.id, ForkedProcessorStep.this.forkedProcessors.size(), ForkedProcessorStep.this.currentBatch);
                        }
                        if (z) {
                            this.localTicket++;
                            ForkedProcessorStep.this.doneSignal.decrementAndGet();
                            LockSupport.unpark(ForkedProcessorStep.this.submitterThread);
                        }
                    } catch (Throwable th) {
                        ForkedProcessorStep.this.error = th;
                        if (0 != 0) {
                            this.localTicket++;
                            ForkedProcessorStep.this.doneSignal.decrementAndGet();
                            LockSupport.unpark(ForkedProcessorStep.this.submitterThread);
                        }
                    }
                } catch (Throwable th2) {
                    if (0 != 0) {
                        this.localTicket++;
                        ForkedProcessorStep.this.doneSignal.decrementAndGet();
                        LockSupport.unpark(ForkedProcessorStep.this.submitterThread);
                    }
                    throw th2;
                }
            }
        }

        void halt() {
            this.halted = true;
            LockSupport.unpark(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ForkedProcessorStep(StageControl stageControl, String str, Configuration configuration, int i) {
        super(stageControl, str, configuration, 1, new StatsProvider[0]);
        this.doneSignal = new AtomicInteger();
        this.forkedProcessors = new ArrayList();
        this.processorCount = 1;
        this.maxForkedProcessors = i == 0 ? configuration.maxNumberOfProcessors() : i;
        applyProcessorCount();
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep
    protected void process(T t, BatchSender batchSender) throws Throwable {
        applyProcessorCount();
        int size = this.forkedProcessors.size();
        if (size == 1) {
            forkedProcess(0, 1, t);
        } else {
            this.currentBatch = t;
            this.submitterThread = Thread.currentThread();
            this.doneSignal.set(size);
            this.globalTicket++;
            notifyProcessors();
            while (this.doneSignal.get() > 0) {
                LockSupport.park();
            }
            if (this.error != null) {
                throw this.error;
            }
        }
        if (this.downstream != null) {
            batchSender.send(t);
        }
    }

    private void notifyProcessors() {
        for (int i = 0; i < this.forkedProcessors.size(); i++) {
            LockSupport.unpark(this.forkedProcessors.get(i));
        }
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep, org.neo4j.unsafe.impl.batchimport.staging.AbstractStep, org.neo4j.unsafe.impl.batchimport.staging.Step, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        Iterator<ForkedProcessorStep<T>.ForkedProcessor> it = this.forkedProcessors.iterator();
        while (it.hasNext()) {
            it.next().halt();
        }
    }

    protected abstract void forkedProcess(int i, int i2, T t);

    private void applyProcessorCount() {
        int i = this.processorCount;
        while (i != this.forkedProcessors.size()) {
            if (this.forkedProcessors.size() < i) {
                this.forkedProcessors.add(new ForkedProcessor(this.forkedProcessors.size()));
            } else {
                this.forkedProcessors.remove(this.forkedProcessors.size() - 1).halt();
            }
        }
    }

    @Override // org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep, org.neo4j.unsafe.impl.batchimport.Parallelizable
    public int processors(int i) {
        int i2 = this.processorCount + i;
        if (i2 < 1) {
            i2 = 1;
        }
        if (i2 > this.maxForkedProcessors) {
            i2 = this.maxForkedProcessors;
        }
        int i3 = i2;
        this.processorCount = i3;
        return i3;
    }
}
