package org.springframework.cloud.stream.binder.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import io.micrometer.observation.ObservationRegistry;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.class */
public class RabbitMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>, RabbitExchangeQueueProvisioner> implements ExtendedPropertiesBinder<MessageChannel, RabbitConsumerProperties, RabbitProducerProperties>, DisposableBean {
    private static final SimplePassthroughMessageConverter passThoughConverter = new SimplePassthroughMessageConverter();
    private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy = new AmqpMessageHeaderErrorMessageStrategy();
    private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() { // from class: org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.1
        public MessageProperties toMessageProperties(AMQP.BasicProperties basicProperties, Envelope envelope, String str) {
            MessageProperties messageProperties = super.toMessageProperties(basicProperties, envelope, str);
            messageProperties.setDeliveryMode((MessageDeliveryMode) null);
            return messageProperties;
        }
    };
    private static final Pattern interceptorNeededPattern = Pattern.compile("(payload|#root|#this)");
    private final RabbitProperties rabbitProperties;
    private boolean destroyConnectionFactory;
    private ConnectionFactory connectionFactory;
    private MessagePostProcessor decompressingPostProcessor;
    private MessagePostProcessor compressingPostProcessor;
    private volatile String[] adminAddresses;
    private volatile String[] nodes;
    private volatile boolean clustered;
    private RabbitExtendedBindingProperties extendedBindingProperties;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder$SimplePassthroughMessageConverter.class */
    public static final class SimplePassthroughMessageConverter extends AbstractMessageConverter {
        private static final SimpleMessageConverter converter = new SimpleMessageConverter();

        SimplePassthroughMessageConverter() {
        }

        protected Message createMessage(Object obj, MessageProperties messageProperties) {
            return obj instanceof byte[] ? new Message((byte[]) obj, messageProperties) : converter.toMessage(obj, messageProperties);
        }

        public Object fromMessage(Message message) throws MessageConversionException {
            return message.getBody();
        }
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner rabbitExchangeQueueProvisioner) {
        this(connectionFactory, rabbitProperties, rabbitExchangeQueueProvisioner, null, null);
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner rabbitExchangeQueueProvisioner, ListenerContainerCustomizer<MessageListenerContainer> listenerContainerCustomizer) {
        this(connectionFactory, rabbitProperties, rabbitExchangeQueueProvisioner, listenerContainerCustomizer, null);
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner rabbitExchangeQueueProvisioner, ListenerContainerCustomizer<MessageListenerContainer> listenerContainerCustomizer, MessageSourceCustomizer<AmqpMessageSource> messageSourceCustomizer) {
        super(new String[0], rabbitExchangeQueueProvisioner, listenerContainerCustomizer, messageSourceCustomizer);
        this.decompressingPostProcessor = new DelegatingDecompressingPostProcessor();
        this.compressingPostProcessor = new GZipPostProcessor();
        this.extendedBindingProperties = new RabbitExtendedBindingProperties();
        Assert.notNull(connectionFactory, "connectionFactory must not be null");
        Assert.notNull(rabbitProperties, "rabbitProperties must not be null");
        this.connectionFactory = connectionFactory;
        this.rabbitProperties = rabbitProperties;
    }

    public void setDecompressingPostProcessor(MessagePostProcessor messagePostProcessor) {
        this.decompressingPostProcessor = messagePostProcessor;
    }

    public void setCompressingPostProcessor(MessagePostProcessor messagePostProcessor) {
        this.compressingPostProcessor = messagePostProcessor;
    }

    public void setAdminAddresses(String[] strArr) {
        this.adminAddresses = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setNodes(String[] strArr) {
        this.nodes = (String[]) Arrays.copyOf(strArr, strArr.length);
        this.clustered = strArr.length > 1;
    }

    public void setExtendedBindingProperties(RabbitExtendedBindingProperties rabbitExtendedBindingProperties) {
        this.extendedBindingProperties = rabbitExtendedBindingProperties;
    }

    public void onInit() throws Exception {
        super.onInit();
        if (this.clustered) {
            String[] commaDelimitedListToStringArray = StringUtils.commaDelimitedListToStringArray(this.rabbitProperties.getAddresses());
            Assert.state(commaDelimitedListToStringArray.length == this.adminAddresses.length && commaDelimitedListToStringArray.length == this.nodes.length, "'addresses', 'adminAddresses', and 'nodes' properties must have equal length");
            this.connectionFactory = new LocalizedQueueConnectionFactory(this.connectionFactory, commaDelimitedListToStringArray, this.adminAddresses, this.nodes, this.rabbitProperties.getVirtualHost(), this.rabbitProperties.getUsername(), this.rabbitProperties.getPassword(), this.rabbitProperties.getSsl().getEnabled().booleanValue(), this.rabbitProperties.getSsl().getKeyStore(), this.rabbitProperties.getSsl().getTrustStore(), this.rabbitProperties.getSsl().getKeyStorePassword(), this.rabbitProperties.getSsl().getTrustStorePassword());
            this.destroyConnectionFactory = true;
        }
    }

    public void destroy() throws Exception {
        DisposableBean disposableBean = this.connectionFactory;
        if (disposableBean instanceof DisposableBean) {
            DisposableBean disposableBean2 = disposableBean;
            if (this.destroyConnectionFactory) {
                disposableBean2.destroy();
            }
        }
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public RabbitConsumerProperties m3getExtendedConsumerProperties(String str) {
        return (RabbitConsumerProperties) this.extendedBindingProperties.getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public RabbitProducerProperties m2getExtendedProducerProperties(String str) {
        return (RabbitProducerProperties) this.extendedBindingProperties.getExtendedProducerProperties(str);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public String getBinderIdentity() {
        return "rabbit-" + super.getBinderIdentity();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties, MessageChannel messageChannel) {
        Assert.state(!HeaderMode.embeddedHeaders.equals(extendedProducerProperties.getHeaderMode()), "the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
        String prefix = ((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix();
        String name = producerDestination.getName();
        String substring = !StringUtils.hasText(prefix) ? name : name.substring(prefix.length());
        RabbitProducerProperties rabbitProducerProperties = (RabbitProducerProperties) extendedProducerProperties.getExtension();
        return !RabbitProducerProperties.ProducerType.AMQP.equals(((RabbitProducerProperties) extendedProducerProperties.getExtension()).getProducerType()) ? StreamUtils.createStreamMessageHandler(producerDestination, extendedProducerProperties, messageChannel, substring, rabbitProducerProperties, getApplicationContext(), this::configureHeaderMapper) : amqpHandler(producerDestination, extendedProducerProperties, messageChannel, substring, rabbitProducerProperties);
    }

    private AmqpOutboundEndpoint amqpHandler(ProducerDestination producerDestination, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties, MessageChannel messageChannel, String str, RabbitProducerProperties rabbitProducerProperties) {
        AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(buildRabbitTemplate(rabbitProducerProperties, messageChannel != null || rabbitProducerProperties.isUseConfirmHeader()));
        amqpOutboundEndpoint.setExchangeName(producerDestination.getName());
        boolean expressionInterceptorNeeded = expressionInterceptorNeeded(rabbitProducerProperties);
        Expression routingKeyExpression = rabbitProducerProperties.getRoutingKeyExpression();
        if (extendedProducerProperties.isPartitioned()) {
            if (routingKeyExpression == null) {
                amqpOutboundEndpoint.setRoutingKeyExpression(buildPartitionRoutingExpression(str, false));
            } else if (expressionInterceptorNeeded) {
                amqpOutboundEndpoint.setRoutingKeyExpression(buildPartitionRoutingExpression("headers['scst_routingKey']", true));
            } else {
                amqpOutboundEndpoint.setRoutingKeyExpression(buildPartitionRoutingExpression(routingKeyExpression.getExpressionString(), true));
            }
        } else if (routingKeyExpression == null) {
            amqpOutboundEndpoint.setRoutingKey(str);
        } else if (expressionInterceptorNeeded) {
            amqpOutboundEndpoint.setRoutingKeyExpressionString("headers['scst_routingKey']");
        } else {
            amqpOutboundEndpoint.setRoutingKeyExpression(routingKeyExpression);
        }
        if (rabbitProducerProperties.getDelayExpression() != null) {
            if (expressionInterceptorNeeded) {
                amqpOutboundEndpoint.setDelayExpressionString("headers['scst_delay']");
            } else {
                amqpOutboundEndpoint.setDelayExpression(rabbitProducerProperties.getDelayExpression());
            }
        }
        amqpOutboundEndpoint.setHeaderMapper(configureHeaderMapper(rabbitProducerProperties));
        amqpOutboundEndpoint.setDefaultDeliveryMode(rabbitProducerProperties.getDeliveryMode());
        amqpOutboundEndpoint.setBeanFactory(getBeanFactory());
        if (messageChannel != null) {
            checkConnectionFactoryIsErrorCapable();
            amqpOutboundEndpoint.setReturnChannel(messageChannel);
            if (rabbitProducerProperties.isUseConfirmHeader()) {
                Assert.state(!StringUtils.hasText(rabbitProducerProperties.getConfirmAckChannel()), "You cannot specify a 'confirmAckChannel' when 'useConfirmHeader' is true");
            } else {
                amqpOutboundEndpoint.setConfirmNackChannel(messageChannel);
                String confirmAckChannel = StringUtils.hasText(rabbitProducerProperties.getConfirmAckChannel()) ? rabbitProducerProperties.getConfirmAckChannel() : "nullChannel";
                if (!confirmAckChannel.equals("nullChannel") && !getApplicationContext().containsBean(confirmAckChannel)) {
                    getApplicationContext().registerBean(confirmAckChannel, DirectChannel.class, () -> {
                        return new DirectChannel();
                    }, new BeanDefinitionCustomizer[0]);
                }
                amqpOutboundEndpoint.setConfirmAckChannelName(confirmAckChannel);
                amqpOutboundEndpoint.setConfirmCorrelationExpressionString("#root");
            }
            amqpOutboundEndpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
        }
        amqpOutboundEndpoint.setHeadersMappedLast(true);
        return amqpOutboundEndpoint;
    }

    private AmqpHeaderMapper configureHeaderMapper(RabbitProducerProperties rabbitProducerProperties) {
        DefaultAmqpHeaderMapper outboundMapper = DefaultAmqpHeaderMapper.outboundMapper();
        ArrayList arrayList = new ArrayList(rabbitProducerProperties.getHeaderPatterns().length + 3);
        if (!rabbitProducerProperties.isSuperStream()) {
            arrayList.add("!scst_partition");
        }
        arrayList.add("!sourceData");
        arrayList.add("!deliveryAttempt");
        arrayList.add("!rabbitmq_streamContext");
        arrayList.addAll(Arrays.asList(rabbitProducerProperties.getHeaderPatterns()));
        outboundMapper.setRequestHeaderNames((String[]) arrayList.toArray(new String[arrayList.size()]));
        return outboundMapper;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void postProcessOutputChannel(MessageChannel messageChannel, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        RabbitProducerProperties rabbitProducerProperties = (RabbitProducerProperties) extendedProducerProperties.getExtension();
        if (expressionInterceptorNeeded(rabbitProducerProperties)) {
            ((AbstractMessageChannel) messageChannel).addInterceptor(0, new RabbitExpressionEvaluatingInterceptor(rabbitProducerProperties.getRoutingKeyExpression(), rabbitProducerProperties.getDelayExpression(), getEvaluationContext()));
        }
    }

    private boolean expressionInterceptorNeeded(RabbitProducerProperties rabbitProducerProperties) {
        Expression routingKeyExpression = rabbitProducerProperties.getRoutingKeyExpression();
        Expression delayExpression = rabbitProducerProperties.getDelayExpression();
        return (routingKeyExpression != null && interceptorNeededPattern.matcher(routingKeyExpression.getExpressionString()).find()) || (delayExpression != null && interceptorNeededPattern.matcher(delayExpression.getExpressionString()).find());
    }

    private void checkConnectionFactoryIsErrorCapable() {
        if (!(this.connectionFactory instanceof CachingConnectionFactory)) {
            this.logger.warn("Unknown connection factory type, cannot determine error capabilities: " + this.connectionFactory.getClass());
            return;
        }
        CachingConnectionFactory cachingConnectionFactory = this.connectionFactory;
        if (!cachingConnectionFactory.isPublisherConfirms() && !cachingConnectionFactory.isPublisherReturns()) {
            this.logger.warn("Producer error channel is enabled, but the connection factory is not configured for returns or confirms; the error channel will receive no messages");
        } else if (!cachingConnectionFactory.isPublisherConfirms()) {
            this.logger.info("Producer error channel is enabled, but the connection factory is only configured to handle returned messages; negative acks will not be reported");
        } else {
            if (cachingConnectionFactory.isPublisherReturns()) {
                return;
            }
            this.logger.info("Producer error channel is enabled, but the connection factory is only configured to handle negatively acked messages; returned messages will not be reported");
        }
    }

    private Expression buildPartitionRoutingExpression(String str, boolean z) {
        return new SpelExpressionParser().parseExpression(z ? str + " + '-' + headers['scst_partition']" : "'" + str + "-' + headers['scst_partition']");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        Assert.state(!HeaderMode.embeddedHeaders.equals(extendedConsumerProperties.getHeaderMode()), "the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
        String name = consumerDestination.getName();
        RabbitConsumerProperties rabbitConsumerProperties = (RabbitConsumerProperties) extendedConsumerProperties.getExtension();
        ObservableListenerContainer createAndConfigureContainer = createAndConfigureContainer(consumerDestination, str, extendedConsumerProperties, name, rabbitConsumerProperties);
        String[] strArr = StringUtils.tokenizeToStringArray(name, ",", true, true);
        if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getContainerType() != RabbitConsumerProperties.ContainerType.STREAM || !((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isSuperStream()) {
            createAndConfigureContainer.setQueueNames(strArr);
        }
        getContainerCustomizer().configure(createAndConfigureContainer, consumerDestination.getName(), str);
        getApplicationContext().getBeanProvider(ObservationRegistry.class).ifAvailable(observationRegistry -> {
            createAndConfigureContainer.setObservationEnabled(true);
        });
        createAndConfigureContainer.afterPropertiesSet();
        AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(createAndConfigureContainer);
        amqpInboundChannelAdapter.setBindSourceMessage(true);
        amqpInboundChannelAdapter.setBeanFactory(getBeanFactory());
        amqpInboundChannelAdapter.setBeanName("inbound." + name);
        DefaultAmqpHeaderMapper inboundMapper = DefaultAmqpHeaderMapper.inboundMapper();
        inboundMapper.setRequestHeaderNames(rabbitConsumerProperties.getHeaderPatterns());
        amqpInboundChannelAdapter.setHeaderMapper(inboundMapper);
        AbstractMessageChannelBinder.ErrorInfrastructure registerErrorInfrastructure = registerErrorInfrastructure(consumerDestination, str, extendedConsumerProperties);
        if (extendedConsumerProperties.getMaxAttempts() > 1) {
            amqpInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
            amqpInboundChannelAdapter.setRecoveryCallback(registerErrorInfrastructure.getRecoverer());
        } else {
            amqpInboundChannelAdapter.setErrorMessageStrategy(errorMessageStrategy);
            amqpInboundChannelAdapter.setErrorChannel(registerErrorInfrastructure.getErrorChannel());
        }
        amqpInboundChannelAdapter.setMessageConverter(passThoughConverter);
        RabbitConsumerProperties.ContainerType containerType = rabbitConsumerProperties.getContainerType();
        if (extendedConsumerProperties.isBatchMode() && rabbitConsumerProperties.isEnableBatching() && RabbitConsumerProperties.ContainerType.SIMPLE.equals(containerType)) {
            amqpInboundChannelAdapter.setBatchMode(AmqpInboundChannelAdapter.BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
        }
        if (containerType.equals(RabbitConsumerProperties.ContainerType.STREAM)) {
            StreamUtils.configureAdapter(amqpInboundChannelAdapter);
        }
        return amqpInboundChannelAdapter;
    }

    private ObservableListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties, String str2, RabbitConsumerProperties rabbitConsumerProperties) {
        if (rabbitConsumerProperties.getContainerType().equals(RabbitConsumerProperties.ContainerType.STREAM)) {
            return StreamUtils.createContainer(consumerDestination, str, extendedConsumerProperties, str2, getApplicationContext());
        }
        boolean equals = rabbitConsumerProperties.getContainerType().equals(RabbitConsumerProperties.ContainerType.DIRECT);
        DirectMessageListenerContainer directMessageListenerContainer = equals ? new DirectMessageListenerContainer(this.connectionFactory) : new SimpleMessageListenerContainer(this.connectionFactory);
        directMessageListenerContainer.setBeanName(consumerDestination.getName() + "." + str + ".container");
        directMessageListenerContainer.setAcknowledgeMode(rabbitConsumerProperties.getAcknowledgeMode());
        directMessageListenerContainer.setChannelTransacted(rabbitConsumerProperties.isTransacted());
        directMessageListenerContainer.setDefaultRequeueRejected(rabbitConsumerProperties.isRequeueRejected());
        int concurrency = extendedConsumerProperties.getConcurrency();
        int i = concurrency > 0 ? concurrency : 1;
        if (equals) {
            setDMLCProperties(extendedConsumerProperties, directMessageListenerContainer, i);
        } else {
            setSMLCProperties(extendedConsumerProperties, (SimpleMessageListenerContainer) directMessageListenerContainer, i);
        }
        directMessageListenerContainer.setPrefetchCount(rabbitConsumerProperties.getPrefetch());
        directMessageListenerContainer.setRecoveryInterval(rabbitConsumerProperties.getRecoveryInterval());
        directMessageListenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
        directMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor[]{this.decompressingPostProcessor});
        directMessageListenerContainer.setMessagePropertiesConverter(inboundMessagePropertiesConverter);
        directMessageListenerContainer.setExclusive(rabbitConsumerProperties.isExclusive());
        directMessageListenerContainer.setMissingQueuesFatal(rabbitConsumerProperties.getMissingQueuesFatal());
        if (rabbitConsumerProperties.getFailedDeclarationRetryInterval() != null) {
            directMessageListenerContainer.setFailedDeclarationRetryInterval(rabbitConsumerProperties.getFailedDeclarationRetryInterval().longValue());
        }
        if (getApplicationEventPublisher() != null) {
            directMessageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
        } else if (getApplicationContext() != null) {
            directMessageListenerContainer.setApplicationEventPublisher(getApplicationContext());
        }
        if (StringUtils.hasText(rabbitConsumerProperties.getConsumerTagPrefix())) {
            AtomicInteger atomicInteger = new AtomicInteger();
            directMessageListenerContainer.setConsumerTagStrategy(str3 -> {
                return rabbitConsumerProperties.getConsumerTagPrefix() + "#" + atomicInteger.getAndIncrement();
            });
        }
        directMessageListenerContainer.setApplicationContext(getApplicationContext());
        return directMessageListenerContainer;
    }

    private void setSMLCProperties(ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties, SimpleMessageListenerContainer simpleMessageListenerContainer, int i) {
        RabbitConsumerProperties rabbitConsumerProperties = (RabbitConsumerProperties) extendedConsumerProperties.getExtension();
        simpleMessageListenerContainer.setConcurrentConsumers(i);
        int maxConcurrency = rabbitConsumerProperties.getMaxConcurrency();
        if (maxConcurrency > i) {
            simpleMessageListenerContainer.setMaxConcurrentConsumers(maxConcurrency);
        }
        simpleMessageListenerContainer.setDeBatchingEnabled(!extendedConsumerProperties.isBatchMode());
        simpleMessageListenerContainer.setBatchSize(rabbitConsumerProperties.getBatchSize());
        if (rabbitConsumerProperties.getQueueDeclarationRetries() != null) {
            simpleMessageListenerContainer.setDeclarationRetries(rabbitConsumerProperties.getQueueDeclarationRetries().intValue());
        }
        if (extendedConsumerProperties.isBatchMode() && rabbitConsumerProperties.isEnableBatching()) {
            simpleMessageListenerContainer.setConsumerBatchEnabled(true);
            simpleMessageListenerContainer.setDeBatchingEnabled(true);
        }
        if (rabbitConsumerProperties.getReceiveTimeout() != null) {
            simpleMessageListenerContainer.setReceiveTimeout(rabbitConsumerProperties.getReceiveTimeout().longValue());
        }
    }

    private void setDMLCProperties(ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties, DirectMessageListenerContainer directMessageListenerContainer, int i) {
        directMessageListenerContainer.setConsumersPerQueue(i);
        if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getMaxConcurrency() > i) {
            this.logger.warn("maxConcurrency is not supported by the direct container type");
        }
        if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getBatchSize() > 1) {
            this.logger.warn("batchSize is not supported by the direct container type");
        }
        if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getQueueDeclarationRetries() != null) {
            this.logger.warn("queueDeclarationRetries is not supported by the direct container type");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String str, String str2, ConsumerDestination consumerDestination, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        Assert.isTrue(!extendedConsumerProperties.isMultiplex(), "The Spring Integration polled MessageSource does not currently support muiltiple queues");
        AmqpMessageSource amqpMessageSource = new AmqpMessageSource(this.connectionFactory, consumerDestination.getName());
        amqpMessageSource.setRawMessageHeader(true);
        getMessageSourceCustomizer().configure(amqpMessageSource, consumerDestination.getName(), str2);
        return new AbstractMessageChannelBinder.PolledConsumerResources(amqpMessageSource, registerErrorInfrastructure(consumerDestination, str2, extendedConsumerProperties, true));
    }

    protected void postProcessPollableSource(DefaultPollableMessageSource defaultPollableMessageSource) {
        defaultPollableMessageSource.setAttributesProvider((attributeAccessor, message) -> {
            Object obj = message.getHeaders().get("amqp_raw_message");
            if (obj != null) {
                attributeAccessor.setAttribute("amqp_raw_message", obj);
            }
        });
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return errorMessageStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler getErrorMessageHandler(ConsumerDestination consumerDestination, String str, final ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        return ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isRepublishToDlq() ? new MessageHandler() { // from class: org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.2
            private static final long ACK_TIMEOUT = 10000;
            private final RabbitTemplate template;
            private final CachingConnectionFactory.ConfirmType confirmType;
            private final String exchange;
            private final String routingKey;
            private final int frameMaxHeadroom;
            private int maxStackTraceLength;
            private Boolean dlxPresent;

            {
                this.template = new RabbitTemplate(RabbitMessageChannelBinder.this.connectionFactory);
                this.template.setUsePublisherConnection(true);
                this.template.setChannelTransacted(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isTransacted());
                this.template.setMandatory(RabbitMessageChannelBinder.this.connectionFactory.isPublisherReturns());
                if (RabbitMessageChannelBinder.this.connectionFactory.isSimplePublisherConfirms()) {
                    this.confirmType = CachingConnectionFactory.ConfirmType.SIMPLE;
                } else if (RabbitMessageChannelBinder.this.connectionFactory.isPublisherConfirms()) {
                    this.confirmType = CachingConnectionFactory.ConfirmType.CORRELATED;
                } else {
                    this.confirmType = CachingConnectionFactory.ConfirmType.NONE;
                }
                this.exchange = RabbitMessageChannelBinder.this.deadLetterExchangeName((RabbitCommonProperties) extendedConsumerProperties.getExtension());
                this.routingKey = ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getDeadLetterRoutingKey();
                this.frameMaxHeadroom = ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getFrameMaxHeadroom();
                this.maxStackTraceLength = -1;
            }

            public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
                org.springframework.messaging.Message originalMessage;
                int maxFrame;
                List<Message> extractAmqpMessages = RabbitMessageChannelBinder.this.extractAmqpMessages(message, extendedConsumerProperties);
                if (!(message instanceof ErrorMessage)) {
                    RabbitMessageChannelBinder.this.logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message);
                    return;
                }
                if (extractAmqpMessages == null) {
                    RabbitMessageChannelBinder.this.logger.error("No raw message header in " + message);
                    return;
                }
                if (checkDlx()) {
                    Throwable th = (Throwable) message.getPayload();
                    if (!shouldRepublish(th)) {
                        if (RabbitMessageChannelBinder.this.logger.isDebugEnabled()) {
                            RabbitMessageChannelBinder.this.logger.debug("Skipping republish of: " + message);
                            return;
                        }
                        return;
                    }
                    String stackTraceAsString = RabbitMessageChannelBinder.this.getStackTraceAsString(th);
                    if (this.maxStackTraceLength < 0 && (maxFrame = RabbitUtils.getMaxFrame(this.template.getConnectionFactory())) > 0) {
                        this.maxStackTraceLength = maxFrame - this.frameMaxHeadroom;
                    }
                    if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) {
                        stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength);
                        RabbitMessageChannelBinder.this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; consider increasing frame_max on the broker or reduce the stack trace depth", th);
                    }
                    for (Message message2 : extractAmqpMessages) {
                        doSend(this.exchange, this.routingKey != null ? this.routingKey : adjustMessagePropertiesHeader(th, stackTraceAsString, message2).getConsumerQueue(), message2);
                    }
                    if (!((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getAcknowledgeMode().equals(AcknowledgeMode.MANUAL) || (originalMessage = ((ErrorMessage) message).getOriginalMessage()) == null) {
                        return;
                    }
                    try {
                        ((Channel) originalMessage.getHeaders().get("amqp_channel", Channel.class)).basicAck(((Long) originalMessage.getHeaders().get("amqp_deliveryTag", Long.class)).longValue(), false);
                    } catch (IOException e) {
                        RabbitMessageChannelBinder.this.logger.debug("Failed to ack original message", e);
                    }
                }
            }

            @NotNull
            private MessageProperties adjustMessagePropertiesHeader(Throwable th, String str2, Message message) {
                MessageProperties messageProperties = message.getMessageProperties();
                Map headers = messageProperties.getHeaders();
                headers.put("x-exception-stacktrace", str2);
                headers.put("x-exception-message", th.getCause() != null ? th.getCause().getMessage() : th.getMessage());
                headers.put("x-original-exchange", messageProperties.getReceivedExchange());
                headers.put("x-original-routingKey", messageProperties.getReceivedRoutingKey());
                if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getRepublishDeliveyMode() != null) {
                    messageProperties.setDeliveryMode(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getRepublishDeliveyMode());
                }
                return messageProperties;
            }

            private void doSend(String str2, String str3, Message message) {
                if (CachingConnectionFactory.ConfirmType.SIMPLE.equals(this.confirmType)) {
                    this.template.invoke(rabbitOperations -> {
                        rabbitOperations.send(str2, str3, message);
                        if (rabbitOperations.waitForConfirms(ACK_TIMEOUT)) {
                            return null;
                        }
                        throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
                    });
                    return;
                }
                if (!CachingConnectionFactory.ConfirmType.CORRELATED.equals(this.confirmType)) {
                    this.template.send(str2, str3, message);
                    return;
                }
                CorrelationData correlationData = new CorrelationData();
                this.template.send(str2, str3, message, correlationData);
                try {
                    if (!((CorrelationData.Confirm) correlationData.getFuture().get(ACK_TIMEOUT, TimeUnit.MILLISECONDS)).isAck()) {
                        throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
                    }
                    if (correlationData.getReturned() != null) {
                        RabbitMessageChannelBinder.this.logger.error("DLQ message was returned: " + message);
                        throw new AmqpRejectAndDontRequeueException("DLQ message was returned");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new AmqpRejectAndDontRequeueException(e);
                } catch (ExecutionException e2) {
                    throw new AmqpRejectAndDontRequeueException(e2.getCause());
                } catch (TimeoutException e3) {
                    throw new AmqpRejectAndDontRequeueException(e3);
                }
            }

            private boolean checkDlx() {
                if (this.dlxPresent == null) {
                    if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isAutoBindDlq()) {
                        this.dlxPresent = Boolean.TRUE;
                    } else {
                        RabbitTemplate rabbitTemplate = this.template;
                        ExtendedConsumerProperties extendedConsumerProperties2 = extendedConsumerProperties;
                        this.dlxPresent = (Boolean) rabbitTemplate.execute(channel -> {
                            String deadLetterExchangeName = RabbitMessageChannelBinder.this.deadLetterExchangeName((RabbitCommonProperties) extendedConsumerProperties2.getExtension());
                            try {
                                channel.exchangeDeclarePassive(deadLetterExchangeName);
                                return Boolean.TRUE;
                            } catch (IOException e) {
                                RabbitMessageChannelBinder.this.logger.warn("'republishToDlq' is true, but the '" + deadLetterExchangeName + "' dead letter exchange is not present; disabling 'republishToDlq'");
                                return Boolean.FALSE;
                            }
                        });
                    }
                }
                return this.dlxPresent.booleanValue();
            }

            private boolean shouldRepublish(Throwable th) {
                Throwable th2;
                Throwable th3 = th;
                while (true) {
                    th2 = th3;
                    if (th2 == null || (th2 instanceof AmqpRejectAndDontRequeueException) || (th2 instanceof ImmediateAcknowledgeAmqpException)) {
                        break;
                    }
                    th3 = th2.getCause();
                }
                return !(th2 instanceof ImmediateAcknowledgeAmqpException);
            }
        } : extendedConsumerProperties.getMaxAttempts() > 1 ? new MessageHandler() { // from class: org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.3
            private final BatchCapableRejectAndDontRequeueRecoverer recoverer = new BatchCapableRejectAndDontRequeueRecoverer();

            public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
                List<Message> extractAmqpMessages = RabbitMessageChannelBinder.this.extractAmqpMessages(message, extendedConsumerProperties);
                if (!(message instanceof ErrorMessage)) {
                    RabbitMessageChannelBinder.this.logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message);
                    throw new ListenerExecutionFailedException("Unexpected error message " + message, new AmqpRejectAndDontRequeueException(""), (Message[]) null);
                }
                if (extractAmqpMessages == null || extractAmqpMessages.isEmpty()) {
                    RabbitMessageChannelBinder.this.logger.error("No raw message header in " + message);
                    throw new ListenerExecutionFailedException("Unexpected error message " + message, new AmqpRejectAndDontRequeueException(""), (Message[]) extractAmqpMessages.toArray(i -> {
                        return new Message[i];
                    }));
                }
                this.recoverer.recover(extractAmqpMessages, (Throwable) message.getPayload());
            }
        } : super.getErrorMessageHandler(consumerDestination, str, extendedConsumerProperties);
    }

    private List<Message> extractAmqpMessages(org.springframework.messaging.Message<?> message, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        if (!extendedConsumerProperties.isBatchMode() && !((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isEnableBatching()) {
            return List.of((Message) StaticMessageHeaderAccessor.getSourceData(message));
        }
        this.logger.debug("Batch mode enabled: Extract list instead of single message");
        return (List) StaticMessageHeaderAccessor.getSourceData(message);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        MessageHandler errorMessageHandler = getErrorMessageHandler(consumerDestination, str, extendedConsumerProperties);
        if (errorMessageHandler != null) {
            return errorMessageHandler;
        }
        MessageHandler errorMessageHandler2 = super.getErrorMessageHandler(consumerDestination, str, extendedConsumerProperties);
        return message -> {
            AcknowledgmentCallback acknowledgmentCallback;
            Message message = (Message) message.getHeaders().get("amqp_raw_message");
            if (!(message instanceof ErrorMessage)) {
                this.logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message);
                return;
            }
            if (message == null) {
                if (errorMessageHandler2 != null) {
                    errorMessageHandler2.handleMessage(message);
                    return;
                }
                return;
            }
            Object payload = message.getPayload();
            if (!(payload instanceof MessagingException) || (acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(((MessagingException) payload).getFailedMessage())) == null) {
                return;
            }
            if (((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isRequeueRejected()) {
                acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
            } else {
                acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.REJECT);
            }
        };
    }

    private String deadLetterExchangeName(RabbitCommonProperties rabbitCommonProperties) {
        return rabbitCommonProperties.getDeadLetterExchange() == null ? applyPrefix(rabbitCommonProperties.getPrefix(), "DLX") : rabbitCommonProperties.getDeadLetterExchange();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterUnbindConsumer(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        this.provisioningProvider.cleanAutoDeclareContext(consumerDestination, extendedConsumerProperties);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterUnbindProducer(ProducerDestination producerDestination, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        this.provisioningProvider.cleanAutoDeclareContext(producerDestination, extendedProducerProperties);
    }

    private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties rabbitProducerProperties, boolean z) {
        BatchingRabbitTemplate batchingRabbitTemplate = rabbitProducerProperties.isBatchingEnabled() ? new BatchingRabbitTemplate(getBatchingStrategy(rabbitProducerProperties), (TaskScheduler) getApplicationContext().getBean("taskScheduler", TaskScheduler.class)) : new RabbitTemplate();
        batchingRabbitTemplate.setMessageConverter(passThoughConverter);
        batchingRabbitTemplate.setChannelTransacted(rabbitProducerProperties.isTransacted());
        batchingRabbitTemplate.setConnectionFactory(this.connectionFactory);
        batchingRabbitTemplate.setUsePublisherConnection(true);
        if (rabbitProducerProperties.isCompress()) {
            batchingRabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor[]{this.compressingPostProcessor});
        }
        batchingRabbitTemplate.setMandatory(z);
        if (this.rabbitProperties != null && this.rabbitProperties.getTemplate().getRetry().isEnabled()) {
            RabbitProperties.Retry retry = this.rabbitProperties.getTemplate().getRetry();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retry.getMaxAttempts());
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(retry.getInitialInterval().toMillis());
            exponentialBackOffPolicy.setMultiplier(retry.getMultiplier());
            exponentialBackOffPolicy.setMaxInterval(retry.getMaxInterval().toMillis());
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            batchingRabbitTemplate.setRetryTemplate(retryTemplate);
        }
        AbstractApplicationContext applicationContext = getApplicationContext();
        BatchingRabbitTemplate batchingRabbitTemplate2 = batchingRabbitTemplate;
        applicationContext.getBeanProvider(ObservationRegistry.class).ifAvailable(observationRegistry -> {
            batchingRabbitTemplate2.setObservationEnabled(true);
        });
        batchingRabbitTemplate.setApplicationContext(applicationContext);
        batchingRabbitTemplate.afterPropertiesSet();
        return batchingRabbitTemplate;
    }

    private BatchingStrategy getBatchingStrategy(RabbitProducerProperties rabbitProducerProperties) {
        return rabbitProducerProperties.getBatchingStrategyBeanName() != null ? (BatchingStrategy) getApplicationContext().getBean(rabbitProducerProperties.getBatchingStrategyBeanName(), BatchingStrategy.class) : new SimpleBatchingStrategy(rabbitProducerProperties.getBatchSize(), rabbitProducerProperties.getBatchBufferLimit(), rabbitProducerProperties.getBatchTimeout());
    }

    private String getStackTraceAsString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
