package org.springframework.cloud.stream.function;

import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/cloud/stream/function/IntegrationFlowFunctionSupport.class */
public class IntegrationFlowFunctionSupport {
    private final FunctionCatalogWrapper functionCatalog;
    private final FunctionInspector functionInspector;
    private final CompositeMessageConverterFactory messageConverterFactory;
    private final StreamFunctionProperties functionProperties;

    @Autowired
    private MessageChannel errorChannel;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IntegrationFlowFunctionSupport(FunctionCatalogWrapper functionCatalogWrapper, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, StreamFunctionProperties streamFunctionProperties, BindingServiceProperties bindingServiceProperties) {
        Assert.notNull(functionCatalogWrapper, "'functionCatalog' must not be null");
        Assert.notNull(functionInspector, "'functionInspector' must not be null");
        Assert.notNull(compositeMessageConverterFactory, "'messageConverterFactory' must not be null");
        Assert.notNull(streamFunctionProperties, "'functionProperties' must not be null");
        this.functionCatalog = functionCatalogWrapper;
        this.functionInspector = functionInspector;
        this.messageConverterFactory = compositeMessageConverterFactory;
        this.functionProperties = streamFunctionProperties;
        this.functionProperties.setBindingServiceProperties(bindingServiceProperties);
    }

    public <T> boolean containsFunction(Class<T> cls) {
        return StringUtils.hasText(this.functionProperties.getDefinition()) && this.functionCatalog.contains(cls, this.functionProperties.getDefinition());
    }

    public <T> boolean containsFunction(Class<T> cls, String str) {
        return StringUtils.hasText(str) && this.functionCatalog.contains(cls, str);
    }

    public FunctionType getCurrentFunctionType() {
        return this.functionInspector.getRegistration(this.functionCatalog.lookup(this.functionProperties.getDefinition())).getType();
    }

    public IntegrationFlowBuilder integrationFlowFromNamedSupplier() {
        if (!StringUtils.hasText(this.functionProperties.getDefinition())) {
            throw new IllegalStateException("A Supplier is not specified in the 'spring.cloud.stream.function.definition' property.");
        }
        Supplier supplier = (Supplier) this.functionCatalog.lookup(Supplier.class, this.functionProperties.getDefinition());
        if (supplier instanceof FluxSupplier) {
            supplier = ((FluxSupplier) supplier).getTarget();
        }
        return integrationFlowFromProvidedSupplier(supplier).split();
    }

    public IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier) {
        return IntegrationFlows.from(supplier);
    }

    public IntegrationFlowBuilder integrationFlowFromChannel(SubscribableChannel subscribableChannel) {
        return IntegrationFlows.from(subscribableChannel).bridge();
    }

    public IntegrationFlowBuilder integrationFlowForFunction(SubscribableChannel subscribableChannel, MessageChannel messageChannel) {
        if (subscribableChannel instanceof IntegrationObjectSupport) {
            String componentName = ((IntegrationObjectSupport) subscribableChannel).getComponentName();
            if (StringUtils.hasText(componentName)) {
                this.functionProperties.setInputDestinationName(componentName);
            }
        }
        if (messageChannel instanceof IntegrationObjectSupport) {
            String componentName2 = ((IntegrationObjectSupport) messageChannel).getComponentName();
            if (StringUtils.hasText(componentName2)) {
                this.functionProperties.setOutputDestinationName(componentName2);
            }
        }
        IntegrationFlowBuilder integrationFlowBuilder = (IntegrationFlowBuilder) IntegrationFlows.from(subscribableChannel).bridge();
        if (!andThenFunction(integrationFlowBuilder, messageChannel, this.functionProperties)) {
            integrationFlowBuilder = (IntegrationFlowBuilder) integrationFlowBuilder.channel(messageChannel);
        }
        return integrationFlowBuilder;
    }

    public boolean andThenFunction(IntegrationFlowBuilder integrationFlowBuilder, MessageChannel messageChannel, StreamFunctionProperties streamFunctionProperties) {
        return andThenFunction(integrationFlowBuilder.toReactivePublisher(), messageChannel, streamFunctionProperties);
    }

    public <I, O> boolean andThenFunction(Publisher<?> publisher, MessageChannel messageChannel, StreamFunctionProperties streamFunctionProperties) {
        if (!StringUtils.hasText(streamFunctionProperties.getDefinition())) {
            return false;
        }
        FunctionInvoker<I, O> functionInvoker = new FunctionInvoker<>(streamFunctionProperties, this.functionCatalog, this.functionInspector, this.messageConverterFactory, this.errorChannel);
        if (messageChannel == null) {
            subscribeToInput(functionInvoker, publisher, null);
            return true;
        }
        messageChannel.getClass();
        subscribeToInput(functionInvoker, publisher, messageChannel::send);
        return true;
    }

    private <O> Mono<Void> subscribeToOutput(Consumer<Message<O>> consumer, Publisher<Message<O>> publisher) {
        return (consumer == null ? Flux.from(publisher) : Flux.from(publisher).doOnNext(consumer)).then();
    }

    private <I, O> void subscribeToInput(FunctionInvoker<I, O> functionInvoker, Publisher<?> publisher, Consumer<Message<O>> consumer) {
        subscribeToOutput(consumer, functionInvoker.apply((Flux) Flux.from(publisher))).subscribe();
    }
}
