package org.springframework.cloud.stream.reactive;

import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.class */
public class MessageChannelToInputFluxParameterAdapter implements StreamListenerParameterAdapter<Flux<?>, SubscribableChannel> {
    private final CompositeMessageConverter messageConverter;

    public MessageChannelToInputFluxParameterAdapter(CompositeMessageConverter compositeMessageConverter) {
        Assert.notNull(compositeMessageConverter, "cannot not be null");
        this.messageConverter = compositeMessageConverter;
    }

    public boolean supports(Class<?> cls, MethodParameter methodParameter) {
        return MessageChannel.class.isAssignableFrom(cls) && Flux.class.isAssignableFrom(methodParameter.getParameterType());
    }

    public Flux<?> adapt(SubscribableChannel subscribableChannel, MethodParameter methodParameter) {
        ResolvableType generic = ResolvableType.forMethodParameter(methodParameter).getGeneric(new int[]{0});
        Class<Object> rawClass = generic.getRawClass();
        Class<Object> cls = rawClass != null ? rawClass : Object.class;
        Object obj = new Object();
        if (!Message.class.isAssignableFrom(cls)) {
            return Flux.create(fluxSink -> {
                MessageHandler messageHandler = message -> {
                    synchronized (obj) {
                        if (cls.isAssignableFrom(message.getPayload().getClass())) {
                            fluxSink.next(message.getPayload());
                        } else {
                            fluxSink.next(this.messageConverter.fromMessage(message, cls));
                        }
                    }
                };
                subscribableChannel.subscribe(messageHandler);
                fluxSink.onCancel(() -> {
                    subscribableChannel.unsubscribe(messageHandler);
                });
            }).publish().autoConnect();
        }
        Class<Object> rawClass2 = generic.getGeneric(new int[]{0}).getRawClass();
        Class<Object> cls2 = rawClass2 != null ? rawClass2 : Object.class;
        return Flux.create(fluxSink2 -> {
            MessageHandler messageHandler = message -> {
                synchronized (obj) {
                    if (cls2.isAssignableFrom(message.getPayload().getClass())) {
                        fluxSink2.next(message);
                    } else {
                        fluxSink2.next(MessageBuilder.createMessage(this.messageConverter.fromMessage(message, cls2), message.getHeaders()));
                    }
                }
            };
            subscribableChannel.subscribe(messageHandler);
            fluxSink2.onCancel(() -> {
                subscribableChannel.unsubscribe(messageHandler);
            });
        }).publish().autoConnect();
    }
}
