/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.rx.java.ContextScheduler;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.ObservableHandler;
import io.vertx.rx.java.ReadStreamAdapter;
import io.vertx.rx.java.UnmarshallerOperator;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.plugins.RxJavaSchedulersHook;

public class RxHelper {
    public static <T> Observable<T> toObservable(ReadStream<T> stream) {
        return Observable.create(new ReadStreamAdapter<T>(stream));
    }

    public static <T> ObservableFuture<T> observableFuture() {
        return new ObservableFuture();
    }

    public static <T> ObservableHandler<T> observableHandler() {
        return RxHelper.observableHandler(false);
    }

    public static <T> ObservableHandler<T> observableHandler(boolean multi) {
        return new ObservableHandler(multi);
    }

    public static <T> Handler<AsyncResult<T>> toFuture(Observer<T> observer) {
        ObservableFuture<T> observable = RxHelper.observableFuture();
        observable.subscribe(observer);
        return observable.toHandler();
    }

    public static <T> Handler<T> toHandler(Observer<T> observer) {
        return RxHelper.toHandler(observer, false);
    }

    public static <T> Handler<T> toHandler(Observer<T> observer, boolean multi) {
        ObservableHandler<T> observable = RxHelper.observableHandler(multi);
        observable.subscribe(observer);
        return observable.toHandler();
    }

    public static <T> Handler<AsyncResult<T>> toFuture(Action1<T> onNext) {
        ObservableFuture<T> observable = RxHelper.observableFuture();
        observable.subscribe(onNext);
        return observable.toHandler();
    }

    public static <T> Handler<T> toHandler(Action1<T> onNext) {
        ObservableHandler<T> observable = RxHelper.observableHandler(true);
        observable.subscribe(onNext);
        return observable.toHandler();
    }

    public static <T> Handler<AsyncResult<T>> toFuture(Action1<T> onNext, Action1<Throwable> onError) {
        ObservableFuture<T> observable = RxHelper.observableFuture();
        observable.subscribe(onNext, onError);
        return observable.toHandler();
    }

    public static <T> Handler<AsyncResult<T>> toFuture(Action1<T> onNext, Action1<Throwable> onError, Action0 onComplete) {
        ObservableFuture<T> observable = RxHelper.observableFuture();
        observable.subscribe(onNext, onError, onComplete);
        return observable.toHandler();
    }

    public static Scheduler scheduler(Vertx vertx) {
        return new ContextScheduler(vertx, false);
    }

    public static Scheduler scheduler(Context context) {
        return new ContextScheduler(context, false);
    }

    public static Scheduler blockingScheduler(Vertx vertx) {
        return new ContextScheduler(vertx, true);
    }

    public static RxJavaSchedulersHook schedulerHook(final Context context) {
        return new RxJavaSchedulersHook(){

            public Scheduler getComputationScheduler() {
                return RxHelper.scheduler(context);
            }

            public Scheduler getIOScheduler() {
                return RxHelper.blockingScheduler(context.owner());
            }

            public Scheduler getNewThreadScheduler() {
                return RxHelper.scheduler(context);
            }
        };
    }

    public static RxJavaSchedulersHook schedulerHook(final Vertx vertx) {
        return new RxJavaSchedulersHook(){

            public Scheduler getComputationScheduler() {
                return RxHelper.scheduler(vertx);
            }

            public Scheduler getIOScheduler() {
                return RxHelper.blockingScheduler(vertx);
            }

            public Scheduler getNewThreadScheduler() {
                return RxHelper.scheduler(vertx);
            }
        };
    }

    public static <T> Observable.Operator<T, Buffer> unmarshaller(Class<T> mappedType) {
        return new UnmarshallerOperator<T, Buffer>((Class)mappedType){

            @Override
            public Buffer unwrap(Buffer buffer) {
                return buffer;
            }
        };
    }
}

