/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.ArrayList;
import java.util.Iterator;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

public class ProducerStep<T>
extends AbstractStep<Iterator<T>> {
    private final int batchSize;

    public ProducerStep(StageControl control, String name, int batchSize) {
        super(control, name);
        this.batchSize = batchSize;
    }

    @Override
    public long receive(long ticket, final Iterator<T> input) {
        new Thread(){

            @Override
            public void run() {
                ProducerStep.this.assertHealthy();
                try {
                    ProducerStep.this.process(input);
                    ProducerStep.this.endOfUpstream();
                }
                catch (Throwable e) {
                    ProducerStep.this.issuePanic(e);
                }
            }
        }.start();
        return 0L;
    }

    protected void process(Iterator<T> input) {
        ArrayList<T> batch = new ArrayList<T>(this.batchSize);
        int size = 0;
        long startTime = System.currentTimeMillis();
        while (input.hasNext()) {
            batch.add(input.next());
            if (++size != this.batchSize) continue;
            this.totalProcessingTime.addAndGet(System.currentTimeMillis() - startTime);
            this.sendDownstream(this.nextTicket(), batch);
            batch = new ArrayList(this.batchSize);
            size = 0;
            this.assertHealthy();
            startTime = System.currentTimeMillis();
        }
        if (size > 0) {
            this.totalProcessingTime.addAndGet(System.currentTimeMillis() - startTime);
            this.sendDownstream(this.nextTicket(), batch);
        }
    }

    private long nextTicket() {
        long ticket = this.doneBatches.incrementAndGet();
        this.receivedBatches.incrementAndGet();
        return ticket;
    }
}

