/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.rx.java.RxHelper;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;

public class ObservableReadStream<T, R>
implements Observable.OnSubscribe<R> {
    private static final Throwable COMPLETED_SENTINEL = new Throwable();
    private static final Object RESUME = new Object();
    private static final Object PAUSE = new Object();
    private static final Object NO_ITEM = new Object();
    public static final long DEFAULT_MAX_BUFFER_SIZE = 256L;
    private final long highWaterMark;
    private final ReadStream<T> stream;
    private final Function<T, R> adapter;
    private final AtomicReference<Sub> subscription = new AtomicReference();
    private Throwable completed;

    public ObservableReadStream(ReadStream<T> stream, Function<T, R> adapter) {
        this(stream, adapter, 256L);
    }

    public ObservableReadStream(ReadStream<T> stream, Function<T, R> adapter, long maxBufferSize) {
        this.stream = stream;
        this.adapter = adapter;
        this.highWaterMark = maxBufferSize;
    }

    public long getRequested() {
        Sub sub = this.subscription.get();
        return sub != null ? sub.adapter.requested() : 0L;
    }

    public void call(Subscriber<? super R> subscriber) {
        SimpleAdapter adapter = new SimpleAdapter(subscriber);
        Sub sub = new Sub(adapter);
        if (!this.subscription.compareAndSet(null, sub)) {
            throw new IllegalStateException();
        }
        subscriber.setProducer((Producer)sub);
        subscriber.add((Subscription)sub);
        long requested = adapter.requested();
        if (requested != Long.MAX_VALUE) {
            sub.adapter = new QueueAdapter(requested, subscriber);
        }
        this.stream.exceptionHandler(sub.adapter::end);
        this.stream.endHandler(v -> sub.adapter.end(COMPLETED_SENTINEL));
        this.stream.handler((Handler)sub.adapter);
    }

    private class QueueAdapter
    extends Adapter {
        private final long lowWaterMark;
        private ArrayDeque<R> pending;
        private boolean draining;
        private boolean paused;
        private boolean subscribed;

        private QueueAdapter(long requested, Subscriber<? super R> subscriber) {
            super(subscriber);
            this.pending = new ArrayDeque();
            this.subscribed = true;
            this.requested = requested;
            this.lowWaterMark = ObservableReadStream.this.highWaterMark / 2L;
        }

        @Override
        boolean dispose() {
            if (!this.subscribed) {
                throw new AssertionError();
            }
            boolean resume = this.paused;
            this.paused = false;
            this.subscribed = false;
            return resume;
        }

        @Override
        public void request(long n) {
            super.request(n);
            this.drain();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drain() {
            ObservableReadStream observableReadStream;
            ObservableReadStream observableReadStream2 = ObservableReadStream.this;
            synchronized (observableReadStream2) {
                if (this.draining) {
                    return;
                }
                this.draining = true;
            }
            while (true) {
                Object next;
                block27: {
                    next = NO_ITEM;
                    observableReadStream = ObservableReadStream.this;
                    synchronized (observableReadStream) {
                        block26: {
                            if (!this.subscribed) {
                                this.draining = false;
                                return;
                            }
                            if (this.pending.size() <= 0) break block26;
                            if (this.requested <= 0L) break block27;
                            next = this.pending.poll();
                            if (this.requested == Long.MAX_VALUE) break block27;
                            --this.requested;
                            break block27;
                        }
                        if (ObservableReadStream.this.completed != null) {
                            break;
                        }
                    }
                }
                if (next == NO_ITEM) break;
                this.subscriber.onNext(next);
            }
            Object action = null;
            observableReadStream = ObservableReadStream.this;
            synchronized (observableReadStream) {
                if (ObservableReadStream.this.completed != null) {
                    if (this.pending.size() == 0) {
                        action = ObservableReadStream.this.completed;
                    }
                } else if (this.paused && (long)this.pending.size() < this.lowWaterMark) {
                    this.paused = false;
                    action = RESUME;
                } else if (!this.paused && (long)this.pending.size() >= ObservableReadStream.this.highWaterMark) {
                    this.paused = true;
                    action = PAUSE;
                }
                this.draining = false;
            }
            if (action != null) {
                if (action == RESUME) {
                    ObservableReadStream.this.stream.resume();
                } else if (action == PAUSE) {
                    ObservableReadStream.this.stream.pause();
                } else if (action == COMPLETED_SENTINEL) {
                    this.subscriber.onCompleted();
                } else {
                    this.subscriber.onError((Throwable)action);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle(T item) {
            ObservableReadStream observableReadStream = ObservableReadStream.this;
            synchronized (observableReadStream) {
                this.pending.add(ObservableReadStream.this.adapter.apply(item));
            }
            this.drain();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        void end(Throwable t) {
            ObservableReadStream observableReadStream = ObservableReadStream.this;
            synchronized (observableReadStream) {
                if (ObservableReadStream.this.completed != null) {
                    return;
                }
                ObservableReadStream.this.completed = t;
            }
            this.drain();
        }
    }

    private class SimpleAdapter
    extends Adapter {
        SimpleAdapter(Subscriber<? super R> subscriber) {
            super(subscriber);
        }

        @Override
        boolean dispose() {
            return false;
        }

        @Override
        void end(Throwable t) {
            if (t == COMPLETED_SENTINEL) {
                this.subscriber.onCompleted();
            } else {
                this.subscriber.onError(t);
            }
        }

        public void handle(T item) {
            this.subscriber.onNext(ObservableReadStream.this.adapter.apply(item));
        }
    }

    private abstract class Adapter
    implements Handler<T> {
        protected final Subscriber<? super R> subscriber;
        long requested;

        Adapter(Subscriber<? super R> subscriber) {
            this.subscriber = subscriber;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        synchronized long requested() {
            ObservableReadStream observableReadStream = ObservableReadStream.this;
            synchronized (observableReadStream) {
                return this.requested;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void request(long n) {
            ObservableReadStream observableReadStream = ObservableReadStream.this;
            synchronized (observableReadStream) {
                this.requested = n == Long.MAX_VALUE || n >= Long.MAX_VALUE - this.requested ? Long.MAX_VALUE : (this.requested += n);
            }
        }

        abstract boolean dispose();

        abstract void end(Throwable var1);
    }

    private class Sub
    implements Subscription,
    Producer {
        private Adapter adapter;

        Sub(Adapter queue) {
            this.adapter = queue;
        }

        public void request(long n) {
            if (n < 0L) {
                throw new IllegalArgumentException("Cannot request negative items:" + n);
            }
            this.adapter.request(n);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void unsubscribe() {
            if (ObservableReadStream.this.subscription.compareAndSet(this, null)) {
                boolean resume;
                ObservableReadStream observableReadStream = ObservableReadStream.this;
                synchronized (observableReadStream) {
                    resume = this.adapter.dispose() && ObservableReadStream.this.completed == null;
                }
                if (resume) {
                    ObservableReadStream.this.stream.resume();
                }
                RxHelper.setNullHandlers(ObservableReadStream.this.stream);
            }
        }

        public boolean isUnsubscribed() {
            return ObservableReadStream.this.subscription.get() != this;
        }
    }
}

