package io.vertx.reactivex.impl;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayDeque;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/reactivex/impl/ReadStreamSubscriber.class */
public class ReadStreamSubscriber<R, J> implements Subscriber<R>, ReadStream<J> {
    private static final Runnable NOOP_ACTION = () -> {
    };
    private static final Throwable DONE_SENTINEL = new Throwable();
    public static final int BUFFER_SIZE = 16;
    private final Function<R, J> adapter;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<J> elementHandler;
    private Throwable completed;
    private Subscription subscription;
    private boolean paused = false;
    private ArrayDeque<R> pending = new ArrayDeque<>();
    private int requested = 0;

    public static <R, J> ReadStream<J> asReadStream(Flowable<R> flowable, Function<R, J> function) {
        ReadStreamSubscriber readStreamSubscriber = new ReadStreamSubscriber(function);
        flowable.subscribe(readStreamSubscriber);
        return readStreamSubscriber;
    }

    public static <R, J> ReadStream<J> asReadStream(Observable<R> observable, Function<R, J> function) {
        return asReadStream(observable.toFlowable(BackpressureStrategy.BUFFER), function);
    }

    public ReadStreamSubscriber(Function<R, J> function) {
        this.adapter = function;
    }

    public ReadStream<J> handler(Handler<J> handler) {
        synchronized (this) {
            this.elementHandler = handler;
        }
        checkStatus();
        return this;
    }

    public ReadStream<J> pause() {
        synchronized (this) {
            this.paused = true;
        }
        return this;
    }

    public ReadStream<J> fetch(long j) {
        throw new UnsupportedOperationException("todo");
    }

    public ReadStream<J> resume() {
        synchronized (this) {
            this.paused = false;
        }
        checkStatus();
        return this;
    }

    public void onSubscribe(Subscription subscription) {
        synchronized (this) {
            this.subscription = subscription;
        }
        checkStatus();
    }

    private void checkStatus() {
        Handler<Throwable> handler;
        Throwable th;
        Handler<J> handler2;
        J apply;
        Runnable runnable = NOOP_ACTION;
        while (true) {
            synchronized (this) {
                if (this.paused || (handler2 = this.elementHandler) == null || this.pending.size() <= 0) {
                    break;
                }
                this.requested--;
                apply = this.adapter.apply(this.pending.poll());
            }
            handler2.handle(apply);
        }
        if (this.completed != null) {
            if (this.pending.isEmpty()) {
                if (this.completed != DONE_SENTINEL) {
                    handler = this.exceptionHandler;
                    th = this.completed;
                    this.exceptionHandler = null;
                } else {
                    handler = null;
                    th = null;
                }
                Handler<Void> handler3 = this.endHandler;
                this.endHandler = null;
                Handler<Throwable> handler4 = handler;
                Throwable th2 = th;
                runnable = () -> {
                    if (handler4 != null) {
                        try {
                            handler4.handle(th2);
                        } finally {
                            if (handler3 != null) {
                                handler3.handle((Object) null);
                            }
                        }
                    }
                };
            }
        } else if (this.elementHandler != null && this.requested < 8) {
            int i = 16 - this.requested;
            runnable = () -> {
                this.subscription.request(i);
            };
            this.requested = 16;
        }
        runnable.run();
    }

    public ReadStream<J> endHandler(Handler<Void> handler) {
        synchronized (this) {
            if (this.completed == null || this.pending.size() > 0) {
                this.endHandler = handler;
            } else if (handler != null) {
                throw new IllegalStateException();
            }
        }
        return this;
    }

    public ReadStream<J> exceptionHandler(Handler<Throwable> handler) {
        synchronized (this) {
            if (this.completed == null || this.pending.size() > 0) {
                this.exceptionHandler = handler;
            } else if (handler != null) {
                throw new IllegalStateException();
            }
        }
        return this;
    }

    public void onComplete() {
        onError(DONE_SENTINEL);
    }

    public void onError(Throwable th) {
        synchronized (this) {
            if (this.completed != null) {
                return;
            }
            this.completed = th;
            checkStatus();
        }
    }

    public void onNext(R r) {
        synchronized (this) {
            this.pending.add(r);
        }
        checkStatus();
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m261exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
