/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.rsocket;

import io.rsocket.frame.FrameType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.rsocket.FunctionRSocketUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class RSocketListenerFunction
implements Function<Object, Publisher<?>> {
    private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction;

    RSocketListenerFunction(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
        Assert.isTrue((targetFunction != null ? 1 : 0) != 0, (String)"Failed to discover target function. \nTo fix it you should either provide 'spring.cloud.function.definition' property or if you are using RSocketRequester provide valid function definition via 'route' operator (e.g., requester.route(\"echo\"))");
        this.targetFunction = targetFunction;
    }

    @Override
    public Publisher<?> apply(Object input) {
        Message inputMessage = (Message)input;
        FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType((Message)inputMessage);
        switch (frameType) {
            case REQUEST_FNF: {
                return this.handle((Message<Publisher<Object>>)inputMessage);
            }
            case REQUEST_RESPONSE: 
            case REQUEST_STREAM: 
            case REQUEST_CHANNEL: {
                return this.handleAndReply((Message<Publisher<Object>>)inputMessage);
            }
        }
        throw new UnsupportedOperationException();
    }

    private Mono<Void> handle(Message<Publisher<Object>> messageToProcess) {
        if (this.targetFunction.isRoutingFunction()) {
            Flux dataFlux = Flux.from((Publisher)((Publisher)messageToProcess.getPayload())).map(payload -> MessageBuilder.createMessage((Object)payload, (MessageHeaders)messageToProcess.getHeaders()));
            return dataFlux.doOnNext((Consumer)this.targetFunction).then();
        }
        if (this.targetFunction.isConsumer()) {
            Flux dataFlux = Flux.from((Publisher)((Publisher)messageToProcess.getPayload())).map(payload -> this.buildReceivedMessage(payload, messageToProcess.getHeaders()));
            dataFlux = FunctionTypeUtils.isPublisher((Type)this.targetFunction.getInputType()) ? dataFlux.transform((Function)this.targetFunction) : dataFlux.doOnNext((Consumer)this.targetFunction);
            return dataFlux.then();
        }
        return Mono.error((Throwable)new IllegalStateException("Only 'Consumer' can handle 'fire-and-forget' RSocket frame."));
    }

    private Flux<?> handleAndReply(Message<Publisher<Object>> messageToProcess) {
        Flux dataFlux = Flux.from((Publisher)((Publisher)messageToProcess.getPayload())).map(payload -> this.buildReceivedMessage(payload, messageToProcess.getHeaders()));
        dataFlux = this.targetFunction.getInputType() != null && FunctionTypeUtils.isPublisher((Type)this.targetFunction.getInputType()) ? dataFlux.transform((Function)this.targetFunction) : dataFlux.flatMap(data -> {
            Map<String, Object> messageMap = FunctionRSocketUtils.sanitizeMessageToMap((Message)data);
            Message sanitizedMessage = MessageBuilder.withPayload((Object)messageMap.remove(FunctionRSocketUtils.PAYLOAD)).copyHeaders((Map)messageMap.get(FunctionRSocketUtils.HEADERS)).build();
            Object result = this.targetFunction.isSupplier() ? this.targetFunction.apply(null) : this.targetFunction.apply((Object)sanitizedMessage);
            Publisher resultPublisher = result instanceof Publisher ? (Publisher)result : Mono.just((Object)result);
            return Flux.from((Publisher)resultPublisher).map(v -> this.extractPayloadIfNecessary(v));
        });
        return dataFlux;
    }

    private Message<?> buildReceivedMessage(Object mayBeMessage, MessageHeaders messageHeaders) {
        return mayBeMessage instanceof Message ? MessageBuilder.fromMessage((Message)((Message)mayBeMessage)).copyHeadersIfAbsent((Map)messageHeaders).build() : MessageBuilder.withPayload((Object)mayBeMessage).copyHeadersIfAbsent((Map)messageHeaders).build();
    }

    private Object extractPayloadIfNecessary(Object output) {
        if (output instanceof Message) {
            Message resultMessage = (Message)output;
            Object contentType = resultMessage.getHeaders().get((Object)"contentType");
            if (contentType != null && contentType.toString().equals("application/json")) {
                return output;
            }
            return resultMessage.getPayload();
        }
        return output;
    }
}

