package io.micronaut.http.server.netty.binders;

import io.micronaut.context.BeanLocator;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.subscriber.TypedSubscriber;
import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionError;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.convert.exceptions.ConversionErrorException;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.bind.binders.DefaultBodyAnnotationBinder;
import io.micronaut.http.bind.binders.NonBlockingBodyArgumentBinder;
import io.micronaut.http.netty.stream.StreamedHttpRequest;
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.DefaultHttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentSubscriberFactory;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.web.router.exceptions.UnsatisfiedRouteException;
import io.micronaut.web.router.qualifier.ConsumesMediaTypeQualifier;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.util.ReferenceCounted;
import java.util.Optional;
import javax.inject.Singleton;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Internal
/* loaded from: input_file:io/micronaut/http/server/netty/binders/PublisherBodyBinder.class */
public class PublisherBodyBinder extends DefaultBodyAnnotationBinder<Publisher> implements NonBlockingBodyArgumentBinder<Publisher> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);
    private static final Argument<Publisher> TYPE = Argument.of(Publisher.class);
    private final BeanLocator beanLocator;
    private final HttpServerConfiguration httpServerConfiguration;

    public PublisherBodyBinder(ConversionService conversionService, BeanLocator beanLocator, HttpServerConfiguration httpServerConfiguration) {
        super(conversionService);
        this.beanLocator = beanLocator;
        this.httpServerConfiguration = httpServerConfiguration;
    }

    public Argument<Publisher> argumentType() {
        return TYPE;
    }

    public ArgumentBinder.BindingResult<Publisher> bind(ArgumentConversionContext<Publisher> argumentConversionContext, HttpRequest<?> httpRequest) {
        if (httpRequest instanceof NettyHttpRequest) {
            NettyHttpRequest nettyHttpRequest = (NettyHttpRequest) httpRequest;
            if (nettyHttpRequest.getNativeRequest() instanceof StreamedHttpRequest) {
                Optional contentType = httpRequest.getContentType();
                Argument argument = (Argument) argumentConversionContext.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
                HttpContentProcessor httpContentProcessor = (HttpContentProcessor) contentType.flatMap(mediaType -> {
                    return this.beanLocator.findBean(HttpContentSubscriberFactory.class, new ConsumesMediaTypeQualifier(mediaType));
                }).map(httpContentSubscriberFactory -> {
                    return httpContentSubscriberFactory.build(nettyHttpRequest);
                }).orElse(new DefaultHttpContentProcessor(nettyHttpRequest, this.httpServerConfiguration));
                return () -> {
                    return Optional.of(subscriber -> {
                        httpContentProcessor.subscribe(new TypedSubscriber<Object>(argumentConversionContext.getArgument()) { // from class: io.micronaut.http.server.netty.binders.PublisherBodyBinder.1
                            Subscription s;

                            protected void doOnSubscribe(Subscription subscription) {
                                this.s = subscription;
                                subscriber.onSubscribe(subscription);
                            }

                            protected void doOnNext(Object obj) {
                                if (PublisherBodyBinder.LOG.isTraceEnabled()) {
                                    PublisherBodyBinder.LOG.trace("Server received streaming message for argument [{}]: {}", argumentConversionContext.getArgument(), obj);
                                }
                                ArgumentConversionContext with = argumentConversionContext.with(argument);
                                if (obj instanceof ByteBufHolder) {
                                    obj = ((ByteBufHolder) obj).content();
                                    if (obj instanceof EmptyByteBuf) {
                                        return;
                                    }
                                }
                                Optional convert = PublisherBodyBinder.this.conversionService.convert(obj, with);
                                if (convert.isPresent()) {
                                    subscriber.onNext(convert.get());
                                } else {
                                    try {
                                        Optional lastError = with.getLastError();
                                        if (lastError.isPresent()) {
                                            if (PublisherBodyBinder.LOG.isDebugEnabled()) {
                                                PublisherBodyBinder.LOG.debug("Cannot convert message for argument [" + argumentConversionContext.getArgument() + "] and value: " + obj, lastError.get());
                                            }
                                            subscriber.onError(new ConversionErrorException(argumentConversionContext.getArgument(), (ConversionError) lastError.get()));
                                        } else {
                                            if (PublisherBodyBinder.LOG.isDebugEnabled()) {
                                                PublisherBodyBinder.LOG.debug("Cannot convert message for argument [{}] and value: {}", argumentConversionContext.getArgument(), obj);
                                            }
                                            subscriber.onError(new UnsatisfiedRouteException(argumentConversionContext.getArgument()));
                                        }
                                    } finally {
                                        this.s.cancel();
                                    }
                                }
                                if (obj instanceof ReferenceCounted) {
                                    ((ReferenceCounted) obj).release();
                                }
                            }

                            protected void doOnError(Throwable th) {
                                if (PublisherBodyBinder.LOG.isTraceEnabled()) {
                                    PublisherBodyBinder.LOG.trace("Server received error for argument [" + argumentConversionContext.getArgument() + "]: " + th.getMessage(), th);
                                }
                                try {
                                    subscriber.onError(th);
                                } finally {
                                    this.s.cancel();
                                }
                            }

                            protected void doOnComplete() {
                                if (PublisherBodyBinder.LOG.isTraceEnabled()) {
                                    PublisherBodyBinder.LOG.trace("Done receiving messages for argument: {}", argumentConversionContext.getArgument());
                                }
                                subscriber.onComplete();
                            }
                        });
                    });
                };
            }
        }
        return ArgumentBinder.BindingResult.EMPTY;
    }

    public /* bridge */ /* synthetic */ ArgumentBinder.BindingResult bind(ArgumentConversionContext argumentConversionContext, Object obj) {
        return bind((ArgumentConversionContext<Publisher>) argumentConversionContext, (HttpRequest<?>) obj);
    }
}
