/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.reactive;

import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

public class MessageChannelToInputFluxParameterAdapter
implements StreamListenerParameterAdapter<Flux<?>, SubscribableChannel> {
    private final CompositeMessageConverter messageConverter;

    public MessageChannelToInputFluxParameterAdapter(CompositeMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"cannot not be null");
        this.messageConverter = messageConverter;
    }

    public boolean supports(Class<?> bindingTargetType, MethodParameter methodParameter) {
        return MessageChannel.class.isAssignableFrom(bindingTargetType) && Flux.class.isAssignableFrom(methodParameter.getParameterType());
    }

    public Flux<?> adapt(SubscribableChannel bindingTarget, MethodParameter parameter) {
        ResolvableType fluxResolvableType = ResolvableType.forMethodParameter((MethodParameter)parameter);
        ResolvableType fluxTypeParameter = fluxResolvableType.getGeneric(new int[]{0});
        Class<Object> fluxTypeParameterRawClass = fluxTypeParameter.getRawClass();
        Class<Object> fluxTypeParameterClass = fluxTypeParameterRawClass != null ? fluxTypeParameterRawClass : Object.class;
        Object monitor = new Object();
        if (Message.class.isAssignableFrom(fluxTypeParameterClass)) {
            ResolvableType payloadTypeParameter = fluxTypeParameter.getGeneric(new int[]{0});
            Class<Object> payloadTypeParameterRawClass = payloadTypeParameter.getRawClass();
            Class<Object> payloadTypeParameterClass = payloadTypeParameterRawClass != null ? payloadTypeParameterRawClass : Object.class;
            return Flux.create(emitter -> {
                MessageHandler messageHandler = message -> {
                    Object object = monitor;
                    synchronized (object) {
                        if (payloadTypeParameterClass.isAssignableFrom(message.getPayload().getClass())) {
                            emitter.next((Object)message);
                        } else {
                            emitter.next((Object)MessageBuilder.createMessage((Object)this.messageConverter.fromMessage(message, payloadTypeParameterClass), (MessageHeaders)message.getHeaders()));
                        }
                    }
                };
                bindingTarget.subscribe(messageHandler);
                emitter.onCancel(() -> bindingTarget.unsubscribe(messageHandler));
            }).publish().autoConnect();
        }
        return Flux.create(emitter -> {
            MessageHandler messageHandler = message -> {
                Object object = monitor;
                synchronized (object) {
                    if (fluxTypeParameterClass.isAssignableFrom(message.getPayload().getClass())) {
                        emitter.next(message.getPayload());
                    } else {
                        emitter.next(this.messageConverter.fromMessage(message, fluxTypeParameterClass));
                    }
                }
            };
            bindingTarget.subscribe(messageHandler);
            emitter.onCancel(() -> bindingTarget.unsubscribe(messageHandler));
        }).publish().autoConnect();
    }
}

