/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

public abstract class ForkedProcessorStep<T>
extends AbstractStep<T> {
    protected static final int MAIN = 0;
    private final long COMPLETED_PROCESSORS_OFFSET = UnsafeUtil.getFieldOffset(Unit.class, (String)"completedProcessors");
    private final long PROCESSING_TIME_OFFSET = UnsafeUtil.getFieldOffset(Unit.class, (String)"processingTime");
    private static final ParkStrategy PARK = new ParkStrategy.Park(10L, TimeUnit.MILLISECONDS);
    private final Object[] forkedProcessors;
    private volatile int numberOfForkedProcessors;
    private final Unit noop = new Unit(-1L, null, 0);
    private final AtomicReference<Unit> head = new AtomicReference<Unit>(this.noop);
    private final AtomicReference<Unit> tail = new AtomicReference<Unit>(this.noop);
    private final Thread downstreamSender = new CompletedBatchesSender();
    private volatile int numberOfProcessors = 1;
    private final int maxProcessors;
    private Thread receiverThread;

    protected ForkedProcessorStep(StageControl control, String name, Configuration config) {
        super(control, name, config, new StatsProvider[0]);
        this.maxProcessors = config.maxNumberOfProcessors();
        this.forkedProcessors = new Object[this.maxProcessors];
        this.applyProcessorCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyProcessorCount() {
        if (this.numberOfForkedProcessors != this.numberOfProcessors) {
            ForkedProcessorStep forkedProcessorStep = this;
            synchronized (forkedProcessorStep) {
                int processors = this.numberOfProcessors;
                while (this.numberOfForkedProcessors < processors) {
                    this.forkedProcessors[this.numberOfForkedProcessors] = new ForkedProcessor(this.numberOfForkedProcessors, this.head.get());
                    ++this.numberOfForkedProcessors;
                }
                while (this.numberOfForkedProcessors > processors) {
                    --this.numberOfForkedProcessors;
                }
                this.awaitEmpty();
            }
        }
    }

    private void awaitEmpty() {
        while (this.head.get().ticket - this.tail.get().ticket > 0L) {
            this.receiverThread = Thread.currentThread();
            PARK.park(this.receiverThread);
        }
    }

    @Override
    public synchronized int processors(int delta) {
        this.numberOfProcessors = Integer.max(1, Integer.min(this.numberOfProcessors + delta, this.maxProcessors));
        return this.numberOfProcessors;
    }

    @Override
    public void start(int orderingGuarantees) {
        super.start(orderingGuarantees);
        this.downstreamSender.start();
    }

    @Override
    public long receive(long ticket, T batch) {
        Unit myHead;
        this.applyProcessorCount();
        while (this.head.get().ticket - this.tail.get().ticket >= (long)(this.maxProcessors - 1)) {
            this.receiverThread = Thread.currentThread();
            PARK.park(this.receiverThread);
        }
        this.queuedBatches.incrementAndGet();
        Unit unit = new Unit(ticket, batch, this.numberOfForkedProcessors);
        long time = System.nanoTime();
        while (!this.head.compareAndSet(myHead = this.head.get(), unit)) {
        }
        myHead.next = unit;
        long queueTime = System.nanoTime() - time;
        return queueTime;
    }

    protected abstract void forkedProcess(int var1, int var2, T var3) throws Throwable;

    void sendDownstream(Unit unit) {
        this.downstreamIdleTime.addAndGet(this.downstream.receive(unit.ticket, unit.batch));
    }

    class ForkedProcessor
    extends Thread {
        private final int id;
        private Unit current;

        ForkedProcessor(int id, Unit startingUnit) {
            super(ForkedProcessorStep.this.name() + "-" + id);
            this.id = id;
            this.current = startingUnit;
            this.start();
        }

        @Override
        public void run() {
            try {
                while (!ForkedProcessorStep.this.isCompleted()) {
                    Unit candidate = this.current.next;
                    if (candidate != null) {
                        if (this.id < candidate.processors) {
                            long time = System.nanoTime();
                            ForkedProcessorStep.this.forkedProcess(this.id, candidate.processors, candidate.batch);
                            candidate.processorDone(System.nanoTime() - time);
                            this.current = candidate;
                            continue;
                        }
                        break;
                    }
                    PARK.park(this);
                }
            }
            catch (Throwable e) {
                ForkedProcessorStep.this.issuePanic(e, false);
            }
        }
    }

    private final class CompletedBatchesSender
    extends Thread {
        private CompletedBatchesSender() {
        }

        @Override
        public void run() {
            Unit current = (Unit)ForkedProcessorStep.this.tail.get();
            while (!ForkedProcessorStep.this.isCompleted()) {
                Unit candidate = current.next;
                if (candidate != null && candidate.isCompleted()) {
                    if (ForkedProcessorStep.this.downstream != null) {
                        ForkedProcessorStep.this.sendDownstream(candidate);
                    }
                    current.next = null;
                    current = candidate;
                    ForkedProcessorStep.this.tail.set(current);
                    ForkedProcessorStep.this.queuedBatches.decrementAndGet();
                    ForkedProcessorStep.this.doneBatches.incrementAndGet();
                    ForkedProcessorStep.this.totalProcessingTime.add(candidate.processingTime);
                    ForkedProcessorStep.this.checkNotifyEndDownstream();
                    continue;
                }
                if (ForkedProcessorStep.this.receiverThread != null) {
                    PARK.unpark(ForkedProcessorStep.this.receiverThread);
                }
                PARK.park(this);
            }
        }
    }

    class Unit {
        private final long ticket;
        private final T batch;
        private final int processors;
        private volatile int completedProcessors;
        private volatile long processingTime;
        private volatile Unit next;

        Unit(long ticket, T batch, int processors) {
            this.ticket = ticket;
            this.batch = batch;
            this.processors = processors;
        }

        boolean isCompleted() {
            return this.processors > 0 && this.processors == this.completedProcessors;
        }

        void processorDone(long time) {
            UnsafeUtil.getAndAddLong((Object)this, (long)ForkedProcessorStep.this.PROCESSING_TIME_OFFSET, (long)time);
            int prevCompletedProcessors = UnsafeUtil.getAndAddInt((Object)this, (long)ForkedProcessorStep.this.COMPLETED_PROCESSORS_OFFSET, (int)1);
            assert (prevCompletedProcessors < this.processors);
            if (prevCompletedProcessors == this.processors - 1) {
                PARK.unpark(ForkedProcessorStep.this.downstreamSender);
            }
        }
    }
}

