/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.dispatch.processor;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registry;
import reactor.core.dispatch.processor.Operation;
import reactor.core.support.Assert;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.batch.BatchConsumer;
import reactor.jarjar.com.lmax.disruptor.BlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.EventHandler;
import reactor.jarjar.com.lmax.disruptor.ExceptionHandler;
import reactor.jarjar.com.lmax.disruptor.LifecycleAware;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.dsl.Disruptor;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

public class Processor<T>
implements Supplier<Operation<T>> {
    private final int opsBufferSize;
    private final ExecutorService executor;
    private final Disruptor<Operation<T>> disruptor;
    private final RingBuffer<Operation<T>> ringBuffer;

    public Processor(final @Nonnull Supplier<T> dataSupplier, @Nonnull Consumer<T> consumer, @Nonnull Registry<Consumer<Throwable>> errorConsumers, WaitStrategy waitStrategy, boolean multiThreadedProducer, int opsBufferSize) {
        Assert.notNull(dataSupplier, "Data Supplier cannot be null.");
        Assert.notNull(consumer, "Consumer cannot be null.");
        Assert.notNull(errorConsumers, "Error Consumers Registry cannot be null.");
        this.executor = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("processor"));
        this.opsBufferSize = opsBufferSize < 1 ? 256 * Runtime.getRuntime().availableProcessors() : opsBufferSize;
        this.disruptor = new Disruptor(new EventFactory<Operation<T>>(){

            @Override
            public Operation<T> newInstance() {
                return new Operation<T>(dataSupplier.get()){

                    @Override
                    public void commit() {
                        Processor.this.ringBuffer.publish(this.id);
                    }
                };
            }
        }, this.opsBufferSize, this.executor, multiThreadedProducer ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy != null ? waitStrategy : new BlockingWaitStrategy());
        this.disruptor.handleExceptionsWith(new ConsumerExceptionHandler(errorConsumers));
        this.disruptor.handleEventsWith(new ConsumerEventHandler(consumer));
        this.ringBuffer = this.disruptor.start();
    }

    public void shutdown() {
        this.executor.shutdown();
        this.disruptor.shutdown();
    }

    public Operation<T> prepare() {
        long seqId = this.ringBuffer.next();
        Operation<T> op = this.ringBuffer.get(seqId);
        op.setId(seqId);
        return op;
    }

    public Processor<T> commit(List<Operation<T>> ops) {
        if (null == ops || ops.isEmpty()) {
            return this;
        }
        Operation<T> first = ops.get(0);
        Operation<T> last = ops.get(ops.size() - 1);
        long firstSeqId = first.id;
        long lastSeqId = last.id;
        if (lastSeqId > firstSeqId) {
            this.ringBuffer.publish(firstSeqId, lastSeqId);
        } else {
            this.ringBuffer.publish(firstSeqId);
        }
        return this;
    }

    public Processor<T> batch(int size, Consumer<T> mutator) {
        long start = -1L;
        long end = 0L;
        for (int i = 0; i < size; ++i) {
            if (i > 0 && i % this.opsBufferSize == 0) {
                this.ringBuffer.publish(start, end);
                start = -1L;
            }
            long l = this.ringBuffer.next();
            if (start < 0L) {
                start = l;
            }
            end = l;
            mutator.accept(this.ringBuffer.get(end).get());
        }
        this.ringBuffer.publish(start, end);
        return this;
    }

    @Override
    public Operation<T> get() {
        return this.prepare();
    }

    private static class ConsumerExceptionHandler
    implements ExceptionHandler {
        final Registry<Consumer<Throwable>> errorConsumers;

        private ConsumerExceptionHandler(Registry<Consumer<Throwable>> errorConsumers) {
            this.errorConsumers = errorConsumers;
        }

        @Override
        public void handleEventException(Throwable ex, long sequence, Object event) {
            for (Registration<Consumer<Throwable>> reg : this.errorConsumers.select(ex.getClass())) {
                reg.getObject().accept(ex);
            }
        }

        @Override
        public void handleOnStartException(Throwable ex) {
            this.handleEventException(ex, -1L, null);
        }

        @Override
        public void handleOnShutdownException(Throwable ex) {
            this.handleEventException(ex, -1L, null);
        }
    }

    private static class ConsumerEventHandler<T>
    implements EventHandler<Operation<T>>,
    LifecycleAware {
        final Consumer<T> consumer;
        final boolean isBatchConsumer;

        private ConsumerEventHandler(Consumer<T> consumer) {
            this.consumer = consumer;
            this.isBatchConsumer = consumer instanceof BatchConsumer;
        }

        @Override
        public void onStart() {
            if (this.isBatchConsumer) {
                ((BatchConsumer)this.consumer).start();
            }
        }

        @Override
        public void onShutdown() {
            if (this.isBatchConsumer) {
                ((BatchConsumer)this.consumer).end();
            }
        }

        @Override
        public void onEvent(Operation<T> op, long sequence, boolean endOfBatch) throws Exception {
            this.consumer.accept(op.get());
        }
    }
}

