package org.springframework.cloud.function.stream.config;

import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.stream.config.AbstractStreamListeningInvoker;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.class */
public class StreamListeningConsumerInvoker extends AbstractStreamListeningInvoker {
    public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, String str, boolean z) {
        super(functionCatalog, functionInspector, compositeMessageConverterFactory, str, z);
    }

    @StreamListener
    public void handle(@Input("input") Flux<Message<?>> flux) {
        flux.groupBy(this::select).flatMap(groupedFlux -> {
            return ((AbstractStreamListeningInvoker.FluxMessageProcessor) groupedFlux.key()).process(groupedFlux);
        }).subscribe();
    }
}
