/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.kernel.logging.DevNullLoggingService;
import org.neo4j.kernel.logging.Logging;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.SilentExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageExecution;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.Key;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StepStats;

public class StageTest {
    @Test
    public void shouldReceiveBatchesInOrder() throws Exception {
        Configuration.Default config = new Configuration.Default();
        Stage stage = new Stage((Logging)new DevNullLoggingService(), "Test stage", (Configuration)config);
        int batchSize = 10;
        long batches = 1000L;
        final long items = batches * (long)batchSize;
        stage.add((Step)new ProducerStep<Object>(stage.control(), "Producer", batchSize){
            private long i;
            private final Object theObject;
            {
                super(x0, x1, x2);
                this.i = 0L;
                this.theObject = new Object();
            }

            protected Object nextOrNull() {
                return ++this.i > items ? null : this.theObject;
            }
        });
        for (int i = 0; i < 3; ++i) {
            stage.add((Step)new ReceiveOrderAssertingStep(stage.control(), "Step" + i, 20, 2, i));
        }
        stage.add((Step)new LastReceiveOrderAssertingStep(stage.control(), "Final step", 20, 2, 0L));
        StageExecution execution = stage.execute();
        new SilentExecutionMonitor().monitor(new StageExecution[]{execution});
        for (StepStats stats : execution.stats()) {
            Assert.assertEquals((long)batches, (long)stats.stat((Key)Keys.done_batches).asLong());
        }
    }

    private static class LastReceiveOrderAssertingStep
    extends ReceiveOrderAssertingStep {
        LastReceiveOrderAssertingStep(StageControl control, String name, int workAheadSize, int numberOfExecutors, long processingTime) {
            super(control, name, workAheadSize, numberOfExecutors, processingTime);
        }

        @Override
        protected Object process(long ticket, Object batch) {
            super.process(ticket, batch);
            return null;
        }
    }

    private static class ReceiveOrderAssertingStep
    extends ExecutorServiceStep<Object> {
        private final AtomicLong lastTicket = new AtomicLong();
        private final long processingTime;

        ReceiveOrderAssertingStep(StageControl control, String name, int workAheadSize, int numberOfExecutors, long processingTime) {
            super(control, name, workAheadSize, numberOfExecutors);
            this.processingTime = processingTime;
        }

        public long receive(long ticket, Object batch) {
            Assert.assertEquals((long)this.lastTicket.incrementAndGet(), (long)ticket);
            return super.receive(ticket, batch);
        }

        protected Object process(long ticket, Object batch) {
            try {
                Thread.sleep(this.processingTime);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return batch;
        }
    }
}

