/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.stream;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.codec.Codec;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;

public final class DecoderStream<SRC, IN>
extends Stream<IN> {
    private final Codec<SRC, IN, ?> codec;
    private final Publisher<? extends SRC> publisher;

    public DecoderStream(Codec<SRC, IN, ?> codec, Publisher<? extends SRC> publisher) {
        this.codec = codec;
        this.publisher = publisher;
    }

    public void subscribe(final Subscriber<? super IN> subscriber) {
        final Function<SRC, IN> decoder = this.codec.decoder(new Consumer<IN>(){

            @Override
            public void accept(IN in) {
                subscriber.onNext(in);
            }
        });
        this.publisher.subscribe((Subscriber)new DefaultSubscriber<SRC>(){

            @Override
            public void onSubscribe(final Subscription s) {
                subscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        s.request(n);
                    }

                    public void cancel() {
                        s.cancel();
                    }
                });
            }

            @Override
            public void onNext(SRC src) {
                decoder.apply(src);
            }

            @Override
            public void onError(Throwable t) {
                subscriber.onError(t);
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
            }
        });
    }
}

