/*
 * Decompiled with CFR 0.152.
 */
package com.hotels.styx.api;

import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.ByteStreamAggregator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

public class ByteStream
implements Publisher<Buffer> {
    private final Publisher<Buffer> stream;

    public ByteStream(Publisher<Buffer> stream) {
        this.stream = Objects.requireNonNull(stream);
    }

    public ByteStream map(Function<Buffer, Buffer> mapping) {
        return new ByteStream((Publisher<Buffer>)Flux.from(this.stream).map(ByteStream.releaseOldBuffers(mapping)));
    }

    private static Function<Buffer, Buffer> releaseOldBuffers(Function<Buffer, Buffer> mapping) {
        return buffer -> {
            Buffer buffer2 = (Buffer)mapping.apply((Buffer)buffer);
            if (buffer != buffer2) {
                buffer.delegate().release();
            }
            return buffer2;
        };
    }

    public ByteStream drop() {
        return new ByteStream((Publisher<Buffer>)Flux.from(this.stream).doOnNext(buffer -> buffer.delegate().release()).filter(buffer -> false));
    }

    public ByteStream doOnEnd(Consumer<Optional<Throwable>> action) {
        return new ByteStream((Publisher<Buffer>)Flux.from(this.stream).doOnError(cause -> action.accept(Optional.of(cause))).doOnComplete(() -> action.accept(Optional.empty())));
    }

    public ByteStream doOnCancel(Runnable action) {
        return new ByteStream((Publisher<Buffer>)Flux.from(this.stream).doOnCancel(action));
    }

    CompletableFuture<Buffer> aggregate(int maxContentBytes) {
        return new ByteStreamAggregator(this.stream, maxContentBytes).apply();
    }

    public void subscribe(Subscriber<? super Buffer> subscriber) {
        this.stream.subscribe(subscriber);
    }
}

