/*
 * Decompiled with CFR 0.152.
 */
package com.hotels.styx.api;

import com.google.common.base.Preconditions;
import com.hotels.styx.api.ContentOverflowException;
import com.hotels.styx.api.FlowControlDisableOperator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import rx.Observable;
import rx.Subscriber;

public final class HttpMessageBody {
    public static final HttpMessageBody NO_BODY = new HttpMessageBody((Observable<ByteBuf>)Observable.empty());
    private final Observable<ByteBuf> content;

    public HttpMessageBody(Observable<ByteBuf> content) {
        this.content = (Observable)Preconditions.checkNotNull(content);
    }

    public Observable<ByteBuf> content() {
        return this.content;
    }

    public <T> Observable<T> decode(Function<ByteBuf, T> decoder, int maxContentBytes) {
        CompositeByteBuf byteBufs = Unpooled.compositeBuffer();
        return this.content.lift(FlowControlDisableOperator.disableFlowControl()).doOnError(e -> byteBufs.release()).collect(() -> byteBufs, (composite, part) -> {
            long newSize = composite.readableBytes() + part.readableBytes();
            if (newSize > (long)maxContentBytes) {
                ReferenceCountUtil.release((Object)composite);
                ReferenceCountUtil.release((Object)part);
                throw new ContentOverflowException(String.format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes));
            }
            composite.addComponent(part);
            composite.writerIndex(composite.writerIndex() + part.readableBytes());
        }).map(aggregate -> HttpMessageBody.decodeAndRelease(decoder, aggregate));
    }

    private static <T> T decodeAndRelease(Function<ByteBuf, T> decoder, CompositeByteBuf aggregate) {
        try {
            T t = decoder.apply((ByteBuf)aggregate);
            return t;
        }
        finally {
            aggregate.release();
        }
    }

    public Observable<ByteBuf> aggregate(int maxContentBytes) {
        CompositeByteBuf byteBufs = Unpooled.compositeBuffer();
        return this.content.lift(FlowControlDisableOperator.disableFlowControl()).doOnError(e -> byteBufs.release()).collect(() -> byteBufs, (composite, part) -> {
            long newSize = composite.readableBytes() + part.readableBytes();
            if (newSize > (long)maxContentBytes) {
                ReferenceCountUtil.release((Object)composite);
                ReferenceCountUtil.release((Object)part);
                throw new ContentOverflowException(String.format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes));
            }
            composite.addComponent(part);
            composite.writerIndex(composite.writerIndex() + part.readableBytes());
        }).map(composite -> composite);
    }

    public CompletableFuture<Boolean> releaseContentBuffers() {
        final CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.content.subscribe((Subscriber)new Subscriber<ByteBuf>(){

            public void onCompleted() {
                future.complete(true);
            }

            public void onError(Throwable e) {
            }

            public void onNext(ByteBuf byteBuf) {
                byteBuf.release();
            }
        });
        return future;
    }

    public static Function<ByteBuf, String> utf8String() {
        return bytes -> bytes.toString(StandardCharsets.UTF_8);
    }
}

