package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.class */
public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderProcessor implements BeanFactoryAware {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionProcessor.class);
    private static final String OUTBOUND = "outbound";
    private final BindingServiceProperties bindingServiceProperties;
    private final Map<String, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private BeanFactory beanFactory;
    private StreamFunctionProperties streamFunctionProperties;
    private KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;

    public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, CleanupConfig cleanupConfig, StreamFunctionProperties streamFunctionProperties, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
        this.methodStreamsBuilderFactoryBeanMap = new HashMap();
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.streamFunctionProperties = streamFunctionProperties;
        this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
    }

    private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType, KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory) {
        ResolvableType generic;
        ResolvableType generic2;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (resolvableType != null && resolvableType.getRawClass() != null) {
            int i = 1;
            if (resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) {
                i = 2;
                generic = resolvableType.getGeneric(new int[]{2});
            } else {
                generic = resolvableType.getGeneric(new int[]{1});
            }
            while (true) {
                ResolvableType resolvableType2 = generic;
                if (resolvableType2.getRawClass() == null || !functionOrConsumerFound(resolvableType2)) {
                    break;
                }
                i++;
                generic = resolvableType2.getGeneric(new int[]{1});
            }
            Iterator<String> it = new LinkedHashSet(kafkaStreamsBindableProxyFactory.getInputs()).iterator();
            popuateResolvableTypeMap(resolvableType, linkedHashMap, it);
            ResolvableType resolvableType3 = resolvableType;
            int i2 = (resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) ? 2 : 1;
            if (i2 == i) {
                generic2 = resolvableType3.getGeneric(new int[]{i2});
            } else {
                while (i2 < i && it.hasNext()) {
                    resolvableType3 = resolvableType3.getGeneric(new int[]{1});
                    if (resolvableType3.getRawClass() != null && functionOrConsumerFound(resolvableType3)) {
                        popuateResolvableTypeMap(resolvableType3, linkedHashMap, it);
                    }
                    i2++;
                }
                generic2 = resolvableType3.getGeneric(new int[]{1});
            }
            linkedHashMap.put(OUTBOUND, generic2);
        }
        return linkedHashMap;
    }

    private boolean functionOrConsumerFound(ResolvableType resolvableType) {
        return resolvableType.getRawClass().equals(Function.class) || resolvableType.getRawClass().equals(Consumer.class);
    }

    private void popuateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> map, Iterator<String> it) {
        map.put(it.next(), resolvableType.getGeneric(new int[]{0}));
        if (resolvableType.getRawClass() != null) {
            if ((resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) && it.hasNext()) {
                map.put(it.next(), resolvableType.getGeneric(new int[]{1}));
            }
        }
    }

    public void setupFunctionInvokerForKafkaStreams(ResolvableType resolvableType, String str, KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory) {
        Map<String, ResolvableType> buildTypeMap = buildTypeMap(resolvableType, kafkaStreamsBindableProxyFactory);
        ResolvableType remove = buildTypeMap.remove(OUTBOUND);
        Object[] adaptAndRetrieveInboundArguments = adaptAndRetrieveInboundArguments(buildTypeMap, str);
        try {
            if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
                ((Consumer) this.beanFactory.getBean(str)).accept(adaptAndRetrieveInboundArguments[0]);
            } else if (resolvableType.getRawClass() == null || !resolvableType.getRawClass().equals(BiConsumer.class)) {
                Object apply = (resolvableType.getRawClass() == null || !resolvableType.getRawClass().equals(BiFunction.class)) ? ((Function) this.beanFactory.getBean(str)).apply(adaptAndRetrieveInboundArguments[0]) : ((BiFunction) this.beanFactory.getBean(str)).apply(adaptAndRetrieveInboundArguments[0], adaptAndRetrieveInboundArguments[1]);
                int i = 1;
                while (true) {
                    if (!(apply instanceof Function) && !(apply instanceof Consumer)) {
                        break;
                    }
                    if (apply instanceof Function) {
                        apply = ((Function) apply).apply(adaptAndRetrieveInboundArguments[i]);
                    } else {
                        ((Consumer) apply).accept(adaptAndRetrieveInboundArguments[i]);
                        apply = null;
                    }
                    i++;
                }
                if (apply != null) {
                    this.kafkaStreamsBindingInformationCatalogue.setOutboundKStreamResolvable(remove != null ? remove : resolvableType.getGeneric(new int[]{1}));
                    Iterator it = new TreeSet(kafkaStreamsBindableProxyFactory.getOutputs()).iterator();
                    if (apply.getClass().isArray()) {
                        int length = ((Object[]) apply).length;
                        Iterator<String> it2 = getOutputBindings(str, length).iterator();
                        BeanDefinitionRegistry beanDefinitionRegistry = this.beanFactory;
                        Object[] objArr = (Object[]) apply;
                        for (int i2 = 0; i2 < length; i2++) {
                            String next = it2.next();
                            kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class);
                            RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
                            rootBeanDefinition.setInstanceSupplier(() -> {
                                return kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).getBoundTarget();
                            });
                            beanDefinitionRegistry.registerBeanDefinition(next, rootBeanDefinition);
                            ((KStreamBoundElementFactory.KStreamWrapper) this.applicationContext.getBean(next)).wrap((KStream) objArr[i2]);
                        }
                    } else if (it.hasNext()) {
                        ((KStreamBoundElementFactory.KStreamWrapper) this.applicationContext.getBean((String) it.next())).wrap((KStream) apply);
                    }
                }
            } else {
                ((BiConsumer) this.beanFactory.getBean(str)).accept(adaptAndRetrieveInboundArguments[0], adaptAndRetrieveInboundArguments[1]);
            }
        } catch (Exception e) {
            throw new BeanInitializationException("Cannot setup function invoker for this Kafka Streams function.", e);
        }
    }

    private List<String> getOutputBindings(String str, int i) {
        List list = (List) this.streamFunctionProperties.getOutputBindings().get(str);
        ArrayList arrayList = new ArrayList();
        if (!CollectionUtils.isEmpty(list)) {
            arrayList.addAll(list);
            return arrayList;
        }
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(String.format("%s_%s_%d", str, KafkaStreamsBindableProxyFactory.DEFAULT_OUTPUT_SUFFIX, Integer.valueOf(i2)));
        }
        return arrayList;
    }

    private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> map, String str) {
        Object[] objArr = new Object[map.size()];
        int i = 0;
        for (String str2 : map.keySet()) {
            Class<?> rawClass = map.get(str2).getRawClass();
            if (str2 == null) {
                throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
            }
            Object bean = this.applicationContext.getBean(str2);
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(str2);
            if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(str)) {
                this.methodStreamsBuilderFactoryBeanMap.put(str, buildStreamsBuilderAndRetrieveConfig(str, this.applicationContext, str2, this.kafkaStreamsBinderConfigurationProperties));
            }
            try {
                StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(str);
                StreamsBuilder streamsBuilder = (StreamsBuilder) streamsBuilderFactoryBean.getObject();
                KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = (KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str2);
                Serde<?> inboundKeySerde = this.keyValueSerdeResolver.getInboundKeySerde(kafkaStreamsConsumerProperties, map.get(str2));
                LOG.info("Key Serde used for " + str2 + ": " + inboundKeySerde.getClass().getName());
                Serde<?> valueSerde = this.bindingServiceProperties.getConsumerProperties(str2).isUseNativeDecoding() ? getValueSerde(str2, kafkaStreamsConsumerProperties, map.get(str2)) : Serdes.ByteArray();
                LOG.info("Value Serde used for " + str2 + ": " + valueSerde.getClass().getName());
                Topology.AutoOffsetReset autoOffsetReset = getAutoOffsetReset(str2, kafkaStreamsConsumerProperties);
                if (rawClass.isAssignableFrom(KStream.class)) {
                    KStream<?, ?> kStream = getKStream(str2, bindingProperties, streamsBuilder, inboundKeySerde, valueSerde, autoOffsetReset);
                    KStream<?, ?> kStream2 = (KStreamBoundElementFactory.KStreamWrapper) bean;
                    kStream2.wrap(kStream);
                    this.kafkaStreamsBindingInformationCatalogue.addKeySerde(kStream2, inboundKeySerde);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    if (KStream.class.isAssignableFrom(map.get(str2).getRawClass())) {
                        Class<?> rawClass2 = map.get(str2).getGeneric(new int[]{1}).getRawClass() != null ? map.get(str2).getGeneric(new int[]{1}).getRawClass() : Object.class;
                        if (this.kafkaStreamsBindingInformationCatalogue.isUseNativeDecoding(kStream2)) {
                            objArr[i] = kStream;
                        } else {
                            objArr[i] = this.kafkaStreamsMessageConversionDelegate.deserializeOnInbound(rawClass2, kStream);
                        }
                    }
                    if (objArr[i] == null) {
                        objArr[i] = kStream;
                    }
                    Assert.notNull(objArr[i], "Problems encountered while adapting the function argument.");
                } else {
                    handleKTableGlobalKTableInputs(objArr, i, str2, rawClass, bean, streamsBuilderFactoryBean, streamsBuilder, kafkaStreamsConsumerProperties, inboundKeySerde, valueSerde, autoOffsetReset);
                }
                i++;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return objArr;
    }

    private KStream<?, ?> getkStream(String str, BindingProperties bindingProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, Topology.AutoOffsetReset autoOffsetReset) {
        return getKStream(str, bindingProperties, streamsBuilder, serde, serde2, autoOffsetReset);
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }
}
