/*
 * Decompiled with CFR 0.152.
 */
package examples;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.rxjava.core.RxHelper;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.eventbus.EventBus;
import io.vertx.rxjava.core.eventbus.MessageConsumer;
import io.vertx.rxjava.core.file.AsyncFile;
import io.vertx.rxjava.core.file.FileSystem;
import io.vertx.rxjava.core.http.HttpClient;
import io.vertx.rxjava.core.http.HttpClientRequest;
import io.vertx.rxjava.core.http.HttpClientResponse;
import io.vertx.rxjava.core.http.HttpServer;
import io.vertx.rxjava.core.http.HttpServerRequest;
import io.vertx.rxjava.core.http.ServerWebSocket;
import io.vertx.rxjava.core.http.WebSocket;
import io.vertx.rxjava.core.http.WebSocketStream;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

public class RxifiedExamples {
    public void readStream(Vertx vertx) {
        FileSystem fs = vertx.fileSystem();
        fs.open("/data.txt", new OpenOptions(), (Handler<AsyncResult<AsyncFile>>)((Handler)result -> {
            AsyncFile file = (AsyncFile)result.result();
            Observable<Buffer> observable = file.toObservable();
            observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
        }));
    }

    public void observableFuture(Vertx vertx) {
        vertx.createHttpServer(new HttpServerOptions().setPort(1234).setHost("localhost")).listenObservable().subscribe(server -> {}, failure -> {});
    }

    public void scheduler(Vertx vertx) {
        Scheduler scheduler = RxHelper.scheduler(vertx);
        Observable timer = Observable.timer((long)100L, (long)100L, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)scheduler);
    }

    public void schedulerHook(Vertx vertx) {
        RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
        RxJavaPlugins.getInstance().registerSchedulersHook(hook);
    }

    public void unmarshaller(FileSystem fileSystem) {
        fileSystem.open("/data.txt", new OpenOptions(), (Handler<AsyncResult<AsyncFile>>)((Handler)result -> {
            AsyncFile file = (AsyncFile)result.result();
            Observable<Buffer> observable = file.toObservable();
            observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(mypojo -> {});
        }));
    }

    public void deployVerticle(Vertx vertx, Verticle verticle) {
        Observable<String> deployment = RxHelper.deployVerticle(vertx, verticle);
        deployment.subscribe(id -> {}, err -> {});
    }

    public void get(HttpClient client) {
        Observable<HttpClientResponse> get = RxHelper.get(client, "http://the-server");
        get.subscribe(resp -> {}, err -> {});
    }

    public void embedded() {
        Vertx vertx = Vertx.vertx();
    }

    public void verticle() {
    }

    public void eventBusMessages(Vertx vertx) {
        EventBus eb = vertx.eventBus();
        MessageConsumer consumer = eb.consumer("the-address");
        Observable observable = consumer.toObservable();
        Subscription sub = observable.subscribe(msg -> {});
        vertx.setTimer(10000L, (Handler<Long>)((Handler)id -> sub.unsubscribe()));
    }

    public void eventBusBodies(Vertx vertx) {
        EventBus eb = vertx.eventBus();
        MessageConsumer consumer = eb.consumer("the-address");
        Observable observable = consumer.bodyStream().toObservable();
    }

    public void eventBusMapReduce(Vertx vertx) {
        Observable observable = vertx.eventBus().consumer("heat-sensor").bodyStream().toObservable();
        observable.buffer(1L, TimeUnit.SECONDS).map(samples -> samples.stream().collect(Collectors.averagingDouble(d -> d))).subscribe(heat -> vertx.eventBus().send("news-feed", "Current heat is " + heat));
    }

    public void websocketServer(HttpServer server) {
        Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable();
        socketObservable.subscribe(socket -> System.out.println("Web socket connect"), failure -> System.out.println("Should never be called"), () -> System.out.println("Subscription ended or server closed"));
    }

    public void websocketServerBuffer(Observable<ServerWebSocket> socketObservable) {
        socketObservable.subscribe(socket -> {
            Observable<Buffer> dataObs = socket.toObservable();
            dataObs.subscribe(buffer -> System.out.println("Got message " + buffer.toString("UTF-8")));
        });
    }

    public void websocketClient(Vertx vertx) {
        HttpClient client = vertx.createHttpClient(new HttpClientOptions());
        WebSocketStream stream = client.websocketStream(8080, "localhost", "/the_uri");
        stream.toObservable().subscribe(ws -> {}, error -> {});
    }

    public void websocketClientBuffer(Observable<WebSocket> socketObservable) {
        socketObservable.subscribe(socket -> {
            Observable<Buffer> dataObs = socket.toObservable();
            dataObs.subscribe(buffer -> System.out.println("Got message " + buffer.toString("UTF-8")));
        });
    }

    public void httpClientRequest(Vertx vertx) {
        HttpClient client = vertx.createHttpClient(new HttpClientOptions());
        HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri");
        request.toObservable().subscribe(response -> {}, error -> {});
        request.end();
    }

    public void httpClientResponse(HttpClientRequest request) {
        request.toObservable().subscribe(response -> {
            Observable<Buffer> observable = response.toObservable();
            observable.forEach(buffer -> {});
        });
    }

    public void httpClientResponseFlatMap(HttpClientRequest request) {
        request.toObservable().flatMap(HttpClientResponse::toObservable).forEach(buffer -> {});
    }

    public void httpClientResponseFlatMapUnmarshall(HttpClientRequest request) {
        request.toObservable().flatMap(HttpClientResponse::toObservable).lift(RxHelper.unmarshaller(MyPojo.class)).forEach(pojo -> {});
    }

    public void httpServerRequest(HttpServer server) {
        Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
        requestObservable.subscribe(request -> {});
    }

    public void httpServerRequestObservable(HttpServer server) {
        Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
        requestObservable.subscribe(request -> {
            Observable<Buffer> observable = request.toObservable();
        });
    }

    public void httpServerRequestObservableUnmarshall(HttpServer server) {
        Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
        requestObservable.subscribe(request -> {
            Observable observable = request.toObservable().lift(RxHelper.unmarshaller(MyPojo.class));
        });
    }

    public void timer(Vertx vertx) {
        vertx.timerStream(1000L).toObservable().subscribe(id -> System.out.println("Callback after 1 second"));
    }

    public void periodic(Vertx vertx) {
        vertx.periodicStream(1000L).toObservable().subscribe(id -> System.out.println("Callback every second"));
    }

    public void periodicUnsubscribe(Vertx vertx) {
        vertx.periodicStream(1000L).toObservable().subscribe((Subscriber)new Subscriber<Long>(){

            public void onNext(Long aLong) {
                this.unsubscribe();
            }

            public void onError(Throwable e) {
            }

            public void onCompleted() {
            }
        });
    }

    private class MyPojo {
        private MyPojo() {
        }
    }
}

