/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.dispatch;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.dispatch.AbstractLifecycleDispatcher;
import reactor.core.dispatch.InsufficientCapacityException;
import reactor.core.dispatch.SingleThreadDispatcher;
import reactor.core.dispatch.wait.WaitingMood;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.Consumer;
import reactor.jarjar.com.lmax.disruptor.BatchEventProcessor;
import reactor.jarjar.com.lmax.disruptor.BlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.EventHandler;
import reactor.jarjar.com.lmax.disruptor.ExceptionHandler;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.Sequence;
import reactor.jarjar.com.lmax.disruptor.SequenceBarrier;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.dsl.Disruptor;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

public final class RingBufferDispatcher
extends SingleThreadDispatcher
implements WaitingMood {
    private static final int DEFAULT_BUFFER_SIZE = 1024;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executor;
    private final Disruptor<RingBufferTask> disruptor;
    private final RingBuffer<RingBufferTask> ringBuffer;
    private final WaitingMood waitingMood;

    public RingBufferDispatcher(String name) {
        this(name, 1024);
    }

    public RingBufferDispatcher(String name, int bufferSize) {
        this(name, bufferSize, null);
    }

    public RingBufferDispatcher(String name, int bufferSize, Consumer<Throwable> uncaughtExceptionHandler) {
        this(name, bufferSize, uncaughtExceptionHandler, ProducerType.MULTI, new BlockingWaitStrategy());
    }

    public RingBufferDispatcher(String name, int bufferSize, final Consumer<Throwable> uncaughtExceptionHandler, ProducerType producerType, WaitStrategy waitStrategy) {
        super(bufferSize);
        this.waitingMood = WaitingMood.class.isAssignableFrom(waitStrategy.getClass()) ? (WaitingMood)((Object)waitStrategy) : null;
        this.executor = Executors.newSingleThreadExecutor(new NamedDaemonThreadFactory(name, this.getContext()));
        this.disruptor = new Disruptor<RingBufferTask>(new EventFactory<RingBufferTask>(){

            @Override
            public RingBufferTask newInstance() {
                return new RingBufferTask();
            }
        }, bufferSize, this.executor, producerType, waitStrategy);
        this.disruptor.handleExceptionsWith(new ExceptionHandler(){

            @Override
            public void handleEventException(Throwable ex, long sequence, Object event) {
                this.handleOnStartException(ex);
            }

            @Override
            public void handleOnStartException(Throwable ex) {
                if (null != uncaughtExceptionHandler) {
                    uncaughtExceptionHandler.accept(ex);
                } else {
                    RingBufferDispatcher.this.log.error(ex.getMessage(), ex);
                }
            }

            @Override
            public void handleOnShutdownException(Throwable ex) {
                this.handleOnStartException(ex);
            }
        });
        this.disruptor.handleEventsWith(new EventHandler<RingBufferTask>(){

            @Override
            public void onEvent(RingBufferTask task, long sequence, boolean endOfBatch) throws Exception {
                task.run();
            }
        });
        this.ringBuffer = this.disruptor.start();
    }

    public <E> Processor<E, E> dispatch() {
        if (this.alive()) {
            this.disruptor.shutdown();
        }
        SequenceBarrier barrier = this.ringBuffer.newBarrier(new Sequence[0]);
        return new RingBufferProcessor(barrier);
    }

    @Override
    public boolean awaitAndShutdown(long timeout, TimeUnit timeUnit) {
        boolean alive = this.alive();
        this.shutdown();
        try {
            this.executor.awaitTermination(timeout, timeUnit);
            if (alive) {
                this.disruptor.shutdown();
            }
        }
        catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    @Override
    public void shutdown() {
        this.executor.shutdown();
        this.disruptor.shutdown();
        super.shutdown();
    }

    @Override
    public void forceShutdown() {
        this.executor.shutdownNow();
        this.disruptor.halt();
        super.forceShutdown();
    }

    @Override
    public long remainingSlots() {
        return this.ringBuffer.remainingCapacity();
    }

    @Override
    public void nervous() {
        if (this.waitingMood != null) {
            this.execute(new Runnable(){

                @Override
                public void run() {
                    RingBufferDispatcher.this.waitingMood.nervous();
                }
            });
        }
    }

    @Override
    public void calm() {
        if (this.waitingMood != null) {
            this.execute(new Runnable(){

                @Override
                public void run() {
                    RingBufferDispatcher.this.waitingMood.calm();
                }
            });
        }
    }

    @Override
    protected AbstractLifecycleDispatcher.Task tryAllocateTask() throws InsufficientCapacityException {
        try {
            long seqId = this.ringBuffer.tryNext();
            return this.ringBuffer.get(seqId).setSequenceId(seqId);
        }
        catch (reactor.jarjar.com.lmax.disruptor.InsufficientCapacityException e) {
            throw InsufficientCapacityException.INSTANCE;
        }
    }

    @Override
    protected AbstractLifecycleDispatcher.Task allocateTask() {
        long seqId = this.ringBuffer.next();
        return this.ringBuffer.get(seqId).setSequenceId(seqId);
    }

    @Override
    protected void execute(AbstractLifecycleDispatcher.Task task) {
        this.ringBuffer.publish(((RingBufferTask)task).getSequenceId());
    }

    private class RingBufferProcessor<E>
    implements Processor<E, E>,
    Consumer<E> {
        private final SequenceBarrier barrier;
        private final Sequence pending = new Sequence(0L);
        private final Sequence batch = new Sequence(0L);
        final Consumer COMPLETE_SENTINEL = new Consumer(){

            public void accept(Object o) {
            }
        };
        final Consumer<Throwable> ERROR_SENTINEL = new Consumer<Throwable>(){

            @Override
            public void accept(Throwable throwable) {
            }
        };
        Subscription s;

        public RingBufferProcessor(SequenceBarrier barrier) {
            this.barrier = barrier;
        }

        public void subscribe(final Subscriber<? super E> sub) {
            Subscription subscription;
            RingBufferTaskEventHandler<? super E> eventHandler = new RingBufferTaskEventHandler<E>(this, sub);
            final BatchEventProcessor<RingBufferTask> p = new BatchEventProcessor<RingBufferTask>(RingBufferDispatcher.this.ringBuffer, this.barrier, eventHandler);
            p.getSequence().set(this.barrier.getCursor());
            RingBufferDispatcher.this.ringBuffer.addGatingSequences(p.getSequence());
            RingBufferDispatcher.this.executor.execute(p);
            eventHandler.s = subscription = new Subscription(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void request(long n) {
                    if (n <= 0L) {
                        sub.onError((Throwable)SpecificationExceptions.spec_3_09_exception(n));
                        return;
                    }
                    if (RingBufferProcessor.this.pending.get() == Long.MAX_VALUE) {
                        return;
                    }
                    3 var3_2 = this;
                    synchronized (var3_2) {
                        if (RingBufferProcessor.this.pending.addAndGet(n) < 0L) {
                            RingBufferProcessor.this.pending.set(Long.MAX_VALUE);
                        }
                    }
                    if (RingBufferProcessor.this.s != null) {
                        if (n == Long.MAX_VALUE) {
                            RingBufferProcessor.this.s.request(n);
                        } else {
                            int toRequest = (int)Math.min(Integer.MAX_VALUE, Math.min(n, RingBufferDispatcher.this.remainingSlots()));
                            if (toRequest > 0) {
                                RingBufferProcessor.this.s.request((long)toRequest);
                                if (RingBufferProcessor.this.batch.get() > 1L) {
                                    RingBufferDispatcher.this.ringBuffer.next(toRequest);
                                }
                            }
                        }
                    }
                }

                public void cancel() {
                    try {
                        RingBufferDispatcher.this.ringBuffer.removeGatingSequence(p.getSequence());
                        p.halt();
                    }
                    finally {
                        RingBufferProcessor.this.s.cancel();
                    }
                }
            };
            sub.onSubscribe(subscription);
        }

        public void onSubscribe(Subscription s) {
            if (this.s != null) {
                s.cancel();
                return;
            }
            this.s = s;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void publishSignal(Object data, Consumer consumer) {
            long seqId;
            long batch = this.batch.incrementAndGet();
            long demand = this.pending.get();
            if (batch > 1L) {
                seqId = RingBufferDispatcher.this.ringBuffer.getCursor() - demand + batch;
            } else if (demand == 0L || demand == Long.MAX_VALUE) {
                seqId = RingBufferDispatcher.this.ringBuffer.next();
            } else {
                int preallocate = (int)Math.min(Integer.MAX_VALUE, Math.min(RingBufferDispatcher.this.ringBuffer.remainingCapacity(), Math.abs(batch - demand)) + 1L);
                if ((long)preallocate > 0L) {
                    seqId = RingBufferDispatcher.this.ringBuffer.next(preallocate);
                    System.out.println(Thread.currentThread() + " is nexting " + preallocate + " with " + data + " on seq " + seqId);
                    seqId -= (long)preallocate - 1L;
                } else {
                    seqId = RingBufferDispatcher.this.ringBuffer.next();
                }
            }
            System.out.println(Thread.currentThread() + " is marking " + seqId + " with " + data + "=" + batch + "/" + demand + " : " + RingBufferDispatcher.this.ringBuffer.getCursor());
            ((RingBufferTask)RingBufferDispatcher.this.ringBuffer.get(seqId)).setSequenceId(seqId).setData(data).setEventConsumer(consumer);
            if (demand == Long.MAX_VALUE) {
                RingBufferDispatcher.this.ringBuffer.publish(seqId);
            } else if (demand == batch) {
                RingBufferProcessor ringBufferProcessor = this;
                synchronized (ringBufferProcessor) {
                    if (this.pending.addAndGet(-demand) < 0L) {
                        this.pending.set(0L);
                    }
                }
                this.batch.set(0L);
                RingBufferDispatcher.this.ringBuffer.publish(seqId - (batch - 1L), seqId);
            }
        }

        public void onNext(E o) {
            if (!RingBufferDispatcher.this.inContext()) {
                this.publishSignal(o, this);
            } else {
                RingBufferDispatcher.this.allocateRecursiveTask().setData(o).setEventConsumer(this);
            }
        }

        @Override
        public void accept(E e) {
        }

        public void onError(Throwable t) {
            try {
                this.pending.set(0L);
                this.publishSignal(t, this.ERROR_SENTINEL);
            }
            finally {
                if (this.s != null) {
                    this.s.cancel();
                }
            }
        }

        public void onComplete() {
            try {
                this.pending.set(0L);
                this.publishSignal(null, this.COMPLETE_SENTINEL);
            }
            finally {
                if (this.s != null) {
                    this.s.cancel();
                }
            }
        }

        public String toString() {
            return "RingBufferSubscriber{, barrier=" + this.barrier.getCursor() + ", pending=" + this.pending + '}';
        }
    }

    private class RingBufferTaskEventHandler<E>
    implements EventHandler<RingBufferTask>,
    Consumer<E> {
        private final Subscriber<? super E> sub;
        private final RingBufferProcessor<E> owner;
        Subscription s;

        public RingBufferTaskEventHandler(RingBufferProcessor<E> owner, Subscriber<? super E> sub) {
            this.sub = sub;
            this.owner = owner;
        }

        @Override
        public void accept(E e) {
            try {
                this.sub.onNext(e);
            }
            catch (Throwable t) {
                this.sub.onError(t);
            }
        }

        @Override
        public void onEvent(RingBufferTask ringBufferTask, long seq, boolean end) throws Exception {
            try {
                this.handleSubscriber(ringBufferTask, seq, end);
                this.recurse();
            }
            catch (Throwable e) {
                this.sub.onError(e);
            }
        }

        private void handleSubscriber(AbstractLifecycleDispatcher.Task task, long seq, boolean end) {
            if (task.eventConsumer == this.owner.COMPLETE_SENTINEL) {
                if (this.s != null) {
                    this.s.cancel();
                    this.s = null;
                }
                this.sub.onComplete();
            } else if (task.eventConsumer == this.owner.ERROR_SENTINEL) {
                if (this.s != null) {
                    this.s.cancel();
                    this.s = null;
                }
                this.sub.onError((Throwable)task.data);
            } else if (task.eventConsumer == this.owner) {
                System.out.println(Thread.currentThread() + "-" + RingBufferDispatcher.this.inContext() + " ] Task:" + task.data + " seq:" + seq + " " + end);
                this.sub.onNext(task.data);
            }
        }

        void recurse() {
            if (RingBufferDispatcher.this.tailRecurseSeq < 0) {
                return;
            }
            int next = -1;
            while (next < RingBufferDispatcher.this.tailRecurseSeq) {
                this.handleSubscriber((AbstractLifecycleDispatcher.Task)RingBufferDispatcher.this.tailRecursionPile.get(++next), -1L, false);
            }
            next = RingBufferDispatcher.this.tailRecurseSeq;
            int max = RingBufferDispatcher.this.backlog * 2;
            while (next >= max) {
                RingBufferDispatcher.this.tailRecursionPile.remove(next--);
            }
            RingBufferDispatcher.this.tailRecursionPileSize = max;
            RingBufferDispatcher.this.tailRecurseSeq = -1;
        }
    }

    private class RingBufferTask
    extends SingleThreadDispatcher.SingleThreadTask {
        private long sequenceId;

        private RingBufferTask() {
        }

        public long getSequenceId() {
            return this.sequenceId;
        }

        public RingBufferTask setSequenceId(long sequenceId) {
            this.sequenceId = sequenceId;
            return this;
        }
    }
}

