/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.processor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.processor.CancelException;
import reactor.core.processor.ExecutorPoweredProcessor;
import reactor.core.processor.MutableSignal;
import reactor.core.processor.util.RingBufferSubscriberUtils;
import reactor.core.support.SpecificationExceptions;
import reactor.jarjar.com.lmax.disruptor.AlertException;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.EventProcessor;
import reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.Sequence;
import reactor.jarjar.com.lmax.disruptor.SequenceBarrier;
import reactor.jarjar.com.lmax.disruptor.TimeoutException;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

public final class RingBufferProcessor<E>
extends ExecutorPoweredProcessor<E, E> {
    private final SequenceBarrier barrier;
    private final RingBuffer<MutableSignal<E>> ringBuffer;
    private final Sequence recentSequence;

    public static <E> RingBufferProcessor<E> create() {
        return RingBufferProcessor.create(RingBufferProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> create(boolean autoCancel) {
        return RingBufferProcessor.create(RingBufferProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service) {
        return RingBufferProcessor.create(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service, boolean autoCancel) {
        return RingBufferProcessor.create(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> create(String name, int bufferSize) {
        return RingBufferProcessor.create(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> create(String name, int bufferSize, boolean autoCancel) {
        return RingBufferProcessor.create(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize) {
        return RingBufferProcessor.create(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, boolean autoCancel) {
        return RingBufferProcessor.create(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> create(String name, int bufferSize, WaitStrategy strategy) {
        return new RingBufferProcessor<E>(name, null, bufferSize, strategy, false, true);
    }

    public static <E> RingBufferProcessor<E> create(String name, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferProcessor<E>(name, null, bufferSize, strategy, false, autoCancel);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, WaitStrategy strategy) {
        return RingBufferProcessor.create(service, bufferSize, strategy, true);
    }

    public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferProcessor<E>(null, service, bufferSize, strategy, false, autoCancel);
    }

    public static <E> RingBufferProcessor<E> share() {
        return RingBufferProcessor.share(RingBufferProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> share(boolean autoCancel) {
        return RingBufferProcessor.share(RingBufferProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service) {
        return RingBufferProcessor.share(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service, boolean autoCancel) {
        return RingBufferProcessor.share(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> share(String name, int bufferSize) {
        return RingBufferProcessor.share(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> share(String name, int bufferSize, boolean autoCancel) {
        return RingBufferProcessor.share(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize) {
        return RingBufferProcessor.share(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, boolean autoCancel) {
        return RingBufferProcessor.share(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferProcessor<E> share(String name, int bufferSize, WaitStrategy strategy) {
        return new RingBufferProcessor<E>(name, null, bufferSize, strategy, true, true);
    }

    public static <E> RingBufferProcessor<E> share(String name, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferProcessor<E>(name, null, bufferSize, strategy, true, autoCancel);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, WaitStrategy strategy) {
        return RingBufferProcessor.share(service, bufferSize, strategy, true);
    }

    public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferProcessor<E>(null, service, bufferSize, strategy, true, autoCancel);
    }

    private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) {
        super(name, executor, autoCancel);
        this.ringBuffer = RingBuffer.create(shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>(){

            @Override
            public MutableSignal<E> newInstance() {
                return new MutableSignal();
            }
        }, bufferSize, waitStrategy);
        this.recentSequence = new Sequence(-1L);
        this.barrier = this.ringBuffer.newBarrier(new Sequence[0]);
    }

    public void subscribe(Subscriber<? super E> subscriber) {
        if (null == subscriber) {
            throw new NullPointerException("Cannot subscribe NULL subscriber");
        }
        try {
            Sequence pendingRequest = new Sequence(0L);
            BatchSignalProcessor<E> signalProcessor = new BatchSignalProcessor<E>(this, pendingRequest, subscriber);
            if (this.incrementSubscribers()) {
                this.ringBuffer.addGatingSequences(signalProcessor.getSequence());
                signalProcessor.getSequence().set(this.recentSequence.get());
            } else {
                signalProcessor.getSequence().set(this.ringBuffer.getCursor());
                signalProcessor.nextSequence = signalProcessor.getSequence().get();
                this.ringBuffer.addGatingSequences(signalProcessor.getSequence());
            }
            signalProcessor.setSubscription(new RingBufferSubscription(pendingRequest, subscriber, signalProcessor));
            this.executor.execute(signalProcessor);
        }
        catch (Throwable t) {
            subscriber.onError(t);
        }
    }

    public void onNext(E o) {
        RingBufferSubscriberUtils.onNext(o, this.ringBuffer);
    }

    public void onError(Throwable t) {
        RingBufferSubscriberUtils.onError(t, this.ringBuffer);
    }

    @Override
    public void onComplete() {
        RingBufferSubscriberUtils.onComplete(this.ringBuffer);
        super.onComplete();
    }

    public String toString() {
        return "RingBufferProcessor{barrier=" + this.barrier + ", ringBuffer=" + this.ringBuffer + '}';
    }

    @Override
    public long getAvailableCapacity() {
        return this.ringBuffer.remainingCapacity();
    }

    @Override
    public long getCapacity() {
        return this.ringBuffer.getBufferSize();
    }

    private static final class BatchSignalProcessor<T>
    implements EventProcessor {
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final Sequence sequence = new Sequence(-1L);
        private final RingBufferProcessor<T> processor;
        private final Sequence pendingRequest;
        private final Subscriber<? super T> subscriber;
        private Subscription subscription;
        long nextSequence = -1L;

        public BatchSignalProcessor(RingBufferProcessor<T> processor, Sequence pendingRequest, Subscriber<? super T> subscriber) {
            this.processor = processor;
            this.pendingRequest = pendingRequest;
            this.subscriber = subscriber;
        }

        public Subscription getSubscription() {
            return this.subscription;
        }

        public void setSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override
        public Sequence getSequence() {
            return this.sequence;
        }

        @Override
        public void halt() {
            this.running.set(false);
            ((RingBufferProcessor)this.processor).barrier.alert();
        }

        @Override
        public boolean isRunning() {
            return this.running.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        @Override
        public void run() {
            block22: {
                if (!this.running.compareAndSet(false, true)) {
                    this.subscriber.onError((Throwable)new IllegalStateException("Thread is already running"));
                    return;
                }
                try {
                    this.subscriber.onSubscribe(this.subscription);
                }
                catch (Throwable t) {
                    this.subscriber.onError(t);
                }
                event = null;
                this.nextSequence = this.sequence.get() + 1L;
                if (!RingBufferSubscriberUtils.waitRequestOrTerminalEvent(this.pendingRequest, RingBufferProcessor.access$000(this.processor), RingBufferProcessor.access$100(this.processor), this.subscriber, this.running)) {
                    return;
                }
                unbounded = this.pendingRequest.get() == 0x7FFFFFFFFFFFFFFFL;
                while (true) lbl-1000:
                // 4 sources

                {
                    try {
                        while (true) {
                            availableSequence = RingBufferProcessor.access$100(this.processor).waitFor(this.nextSequence);
                            while (this.nextSequence <= availableSequence) {
                                event = (MutableSignal)RingBufferProcessor.access$000(this.processor).get(this.nextSequence);
                                if (event.type == MutableSignal.Type.NEXT) {
                                    if (!unbounded && this.pendingRequest.addAndGet(-1L) < 0L) {
                                        this.pendingRequest.incrementAndGet();
                                        while (this.pendingRequest.addAndGet(-1L) < 0L) {
                                            this.pendingRequest.incrementAndGet();
                                            RingBufferProcessor.access$100(this.processor).checkAlert();
                                            LockSupport.parkNanos(1L);
                                        }
                                    }
                                    RingBufferSubscriberUtils.route(event, this.subscriber);
                                    ++this.nextSequence;
                                    continue;
                                }
                                this.running.set(false);
                                RingBufferSubscriberUtils.route(event, this.subscriber);
                                if (event.type == MutableSignal.Type.ERROR) {
                                    RingBufferProcessor.access$100(this.processor).alert();
                                }
                                throw AlertException.INSTANCE;
                            }
                            this.sequence.set(availableSequence);
                        }
                    }
                    catch (TimeoutException availableSequence) {
                    }
                    catch (CancelException | AlertException ex) {
                        if (!this.running.get()) {
                            break block22;
                        }
                        cursor = RingBufferProcessor.access$100(this.processor).getCursor();
                        if (((MutableSignal)RingBufferProcessor.access$000(this.processor).get((long)cursor)).type == MutableSignal.Type.ERROR) {
                            this.sequence.set(cursor);
                            this.nextSequence = cursor;
                        } else {
                            this.sequence.set(cursor - 1L);
                        }
                        RingBufferProcessor.access$100(this.processor).clearAlert();
                    }
                    catch (Throwable ex) {
                        this.subscriber.onError(ex);
                        this.sequence.set(this.nextSequence);
                        ++this.nextSequence;
                        continue;
                    }
                    break;
                }
                ** GOTO lbl-1000
                finally {
                    RingBufferProcessor.access$000(this.processor).removeGatingSequence(this.sequence);
                    this.running.set(false);
                }
            }
        }
    }

    private final class RingBufferSubscription
    implements Subscription {
        private final Sequence pendingRequest;
        private final Subscriber<? super E> subscriber;
        private final BatchSignalProcessor<E> eventProcessor;

        public RingBufferSubscription(Sequence pendingRequest, Subscriber<? super E> subscriber, BatchSignalProcessor<E> eventProcessor) {
            this.subscriber = subscriber;
            this.eventProcessor = eventProcessor;
            this.pendingRequest = pendingRequest;
        }

        public void request(long n) {
            Subscription parent;
            long toRequest;
            long buffered;
            if (n <= 0L) {
                this.subscriber.onError((Throwable)SpecificationExceptions.spec_3_09_exception(n));
                return;
            }
            if (!this.eventProcessor.isRunning()) {
                return;
            }
            long currentSequence = this.eventProcessor.nextSequence;
            long cursor = RingBufferProcessor.this.ringBuffer.getCursor();
            long l = currentSequence < cursor ? cursor - (currentSequence == -1L ? currentSequence + 1L : currentSequence) : (buffered = 0L);
            if (this.pendingRequest.addAndGet(n) < 0L) {
                this.pendingRequest.set(Long.MAX_VALUE);
            }
            if ((toRequest = buffered > 0L ? (n - buffered < 0L ? 0L : n - buffered) : n) > 0L && (parent = RingBufferProcessor.this.upstreamSubscription) != null) {
                parent.request(toRequest);
            }
        }

        public void cancel() {
            try {
                this.eventProcessor.halt();
            }
            finally {
                RingBufferProcessor.this.decrementSubscribers();
            }
        }
    }
}

