/*
 * Decompiled with CFR 0.152.
 */
package org.eobjects.analyzer.util.batch;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eobjects.analyzer.util.batch.ArrayBatchSource;
import org.eobjects.analyzer.util.batch.BatchEntry;
import org.eobjects.analyzer.util.batch.BatchEntryBatchSink;
import org.eobjects.analyzer.util.batch.BatchTransformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchTransformationBuffer<I, O> {
    private static final Logger logger = LoggerFactory.getLogger(BatchTransformationBuffer.class);
    public static final int DEFAULT_FLUSH_INTERVAL = 1000;
    public static final int DEFAULT_MAX_BATCH_SIZE = 20;
    private final BatchTransformation<I, O> _transformation;
    private final BlockingQueue<BatchEntry<I, O>> _queue;
    private final AtomicInteger _batchNo;
    private final int _maxBatchSize;
    private final ScheduledExecutorService _threadPool;
    private final int _flushInterval;
    private volatile CountDownLatch _countDownLatch;

    public BatchTransformationBuffer(BatchTransformation<I, O> transformation) {
        this(transformation, 20, 1000);
    }

    public BatchTransformationBuffer(BatchTransformation<I, O> transformation, int maxBatchSize, int flushIntervalMillis) {
        this._transformation = transformation;
        this._flushInterval = flushIntervalMillis;
        this._maxBatchSize = maxBatchSize;
        this._queue = new ArrayBlockingQueue<BatchEntry<I, O>>(maxBatchSize);
        this._countDownLatch = new CountDownLatch(1);
        this._batchNo = new AtomicInteger();
        this._threadPool = Executors.newScheduledThreadPool(1);
    }

    public void start() {
        logger.info("start()");
        this._threadPool.scheduleAtFixedRate(this.createFlushCommand(), this._flushInterval, this._flushInterval, TimeUnit.MILLISECONDS);
    }

    private Runnable createFlushCommand() {
        return new Runnable(){

            @Override
            public void run() {
                BatchTransformationBuffer.this.flushBuffer(true);
            }
        };
    }

    public int getBatchCount() {
        return this._batchNo.get();
    }

    public void flushBuffer() {
        this.flushBuffer(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushBuffer(boolean scheduled) {
        int batchSize;
        CountDownLatch batchCountDownLatch;
        if (this._queue.isEmpty()) {
            return;
        }
        if (!scheduled && this._queue.size() < this._maxBatchSize) {
            logger.debug("Batch ignored, flush operation not scheduled and queue is not full");
            return;
        }
        ArrayList entries = new ArrayList(this._maxBatchSize);
        CountDownLatch countDownLatch = this._countDownLatch;
        synchronized (countDownLatch) {
            batchCountDownLatch = this._countDownLatch;
            this._countDownLatch = new CountDownLatch(1);
            batchSize = this._queue.drainTo(entries);
        }
        if (batchSize == 0) {
            logger.debug("Batch ignored, no elements left in queue");
            return;
        }
        int batchNumber = this._batchNo.incrementAndGet();
        logger.info("Batch #{} - Preparing {} entries", (Object)batchNumber, (Object)batchSize);
        Object[] input = new Object[batchSize];
        for (int i = 0; i < batchSize; ++i) {
            input[i] = ((BatchEntry)entries.get(i)).getInput();
        }
        ArrayBatchSource source = new ArrayBatchSource(input);
        BatchEntryBatchSink sink = new BatchEntryBatchSink(entries);
        this._transformation.map(source, sink);
        logger.info("Batch #{} - Finished", (Object)batchNumber, (Object)batchSize);
        batchCountDownLatch.countDown();
        if (scheduled && !this._queue.isEmpty()) {
            this.flushBuffer(true);
        }
    }

    public void shutdown() {
        logger.info("shutdown()");
        this._threadPool.shutdown();
    }

    public O transform(I input) {
        BatchEntry entry = new BatchEntry(input);
        while (!this._queue.offer(entry)) {
            this.flushBuffer();
        }
        try {
            CountDownLatch batchCountDownLatch = this._countDownLatch;
            batchCountDownLatch.await();
            return entry.getOuput();
        }
        catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new IllegalStateException(e);
        }
    }
}

