package org.springframework.cloud.stream.binder.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.aopalliance.aop.Advice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.UncategorizedAmqpException;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.support.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.class */
public class RabbitMessageChannelBinder extends AbstractBinder<MessageChannel, ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>> implements ExtendedPropertiesBinder<MessageChannel, RabbitConsumerProperties, RabbitProducerProperties> {
    private static final String DEAD_LETTER_EXCHANGE = "DLX";
    private final RabbitAdmin rabbitAdmin;
    private ConnectionFactory connectionFactory;
    private volatile String[] addresses;
    private volatile String[] adminAddresses;
    private volatile String[] nodes;
    private String username;
    private String password;
    private String vhost;
    private boolean useSSL;
    private Resource sslPropertiesLocation;
    private volatile boolean clustered;
    public static final AnonymousQueue.Base64UrlNamingStrategy ANONYMOUS_GROUP_NAME_GENERATOR = new AnonymousQueue.Base64UrlNamingStrategy("anonymous.");
    private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() { // from class: org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.1
        public MessageProperties toMessageProperties(AMQP.BasicProperties basicProperties, Envelope envelope, String str) {
            MessageProperties messageProperties = super.toMessageProperties(basicProperties, envelope, str);
            messageProperties.setDeliveryMode((MessageDeliveryMode) null);
            return messageProperties;
        }
    };
    private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final GenericApplicationContext autoDeclareContext = new GenericApplicationContext();
    private MessagePostProcessor decompressingPostProcessor = new DelegatingDecompressingPostProcessor();
    private MessagePostProcessor compressingPostProcessor = new GZipPostProcessor();
    private RabbitExtendedBindingProperties extendedBindingProperties = new RabbitExtendedBindingProperties();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        public ReceivingHandler() {
            setBeanFactory(RabbitMessageChannelBinder.this.getBeanFactory());
        }

        protected Object handleRequestMessage(Message<?> message) {
            return RabbitMessageChannelBinder.this.deserializePayloadIfNecessary(message).toMessage(getMessageBuilderFactory());
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler implements Lifecycle {
        private final MessageHandler delegate;
        private final String replyTo;
        private final ExtendedProducerProperties<RabbitProducerProperties> producerProperties;
        private final PartitionHandler partitionHandler;

        private SendingHandler(MessageHandler messageHandler, String str, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
            this.delegate = messageHandler;
            this.replyTo = str;
            this.producerProperties = extendedProducerProperties;
            ConfigurableListableBeanFactory beanFactory = RabbitMessageChannelBinder.this.getBeanFactory();
            setBeanFactory(beanFactory);
            this.partitionHandler = new PartitionHandler(beanFactory, RabbitMessageChannelBinder.this.evaluationContext, RabbitMessageChannelBinder.this.partitionSelector, extendedProducerProperties);
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            MessageValues serializePayloadIfNecessary = RabbitMessageChannelBinder.this.serializePayloadIfNecessary(message);
            if (this.replyTo != null) {
                serializePayloadIfNecessary.put("amqp_replyTo", this.replyTo);
            }
            if (this.producerProperties.isPartitioned()) {
                serializePayloadIfNecessary.put("partition", Integer.valueOf(this.partitionHandler.determinePartition(message)));
            }
            this.delegate.handleMessage(serializePayloadIfNecessary.toMessage(getMessageBuilderFactory()));
        }

        public void start() {
            if (this.delegate instanceof Lifecycle) {
                this.delegate.start();
            }
        }

        public void stop() {
            if (this.delegate instanceof Lifecycle) {
                this.delegate.stop();
            }
        }

        public boolean isRunning() {
            if (this.delegate instanceof Lifecycle) {
                return this.delegate.isRunning();
            }
            return true;
        }
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory) {
        Assert.notNull(connectionFactory, "connectionFactory must not be null");
        this.connectionFactory = connectionFactory;
        this.rabbitAdmin = new RabbitAdmin(connectionFactory);
        this.autoDeclareContext.refresh();
        this.rabbitAdmin.setApplicationContext(this.autoDeclareContext);
        this.rabbitAdmin.setIgnoreDeclarationExceptions(true);
        this.rabbitAdmin.afterPropertiesSet();
    }

    public void setDecompressingPostProcessor(MessagePostProcessor messagePostProcessor) {
        this.decompressingPostProcessor = messagePostProcessor;
    }

    public void setCompressingPostProcessor(MessagePostProcessor messagePostProcessor) {
        this.compressingPostProcessor = messagePostProcessor;
    }

    public void setAddresses(String[] strArr) {
        this.addresses = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setAdminAddresses(String[] strArr) {
        this.adminAddresses = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setNodes(String[] strArr) {
        this.nodes = (String[]) Arrays.copyOf(strArr, strArr.length);
        this.clustered = strArr.length > 1;
    }

    public void setUsername(String str) {
        this.username = str;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public void setExtendedBindingProperties(RabbitExtendedBindingProperties rabbitExtendedBindingProperties) {
        this.extendedBindingProperties = rabbitExtendedBindingProperties;
    }

    public void setVhost(String str) {
        this.vhost = str;
    }

    public void setUseSSL(boolean z) {
        this.useSSL = z;
    }

    public void setSslPropertiesLocation(Resource resource) {
        this.sslPropertiesLocation = resource;
    }

    public void onInit() {
        if (this.clustered) {
            Assert.state(this.addresses.length == this.adminAddresses.length && this.addresses.length == this.nodes.length, "'addresses', 'adminAddresses', and 'nodes' properties must have equal length");
            this.connectionFactory = new LocalizedQueueConnectionFactory(this.connectionFactory, this.addresses, this.adminAddresses, this.nodes, this.vhost, this.username, this.password, this.useSSL, this.sslPropertiesLocation);
        }
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public RabbitConsumerProperties m5getExtendedConsumerProperties(String str) {
        return this.extendedBindingProperties.m2getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public RabbitProducerProperties m4getExtendedProducerProperties(String str) {
        return this.extendedBindingProperties.m1getExtendedProducerProperties(str);
    }

    public Binding<MessageChannel> doBindConsumer(String str, String str2, MessageChannel messageChannel, ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        Queue queue;
        boolean z = !StringUtils.hasText(str2);
        String groupedName = z ? groupedName(str, ANONYMOUS_GROUP_NAME_GENERATOR.generateName()) : groupedName(str, str2);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring queue for inbound: " + groupedName + ", bound to: " + str);
        }
        String prefix = ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefix();
        String applyPrefix = applyPrefix(prefix, str);
        TopicExchange topicExchange = new TopicExchange(applyPrefix);
        declareExchange(applyPrefix, topicExchange);
        String applyPrefix2 = applyPrefix(prefix, groupedName);
        boolean z2 = !z && extendedConsumerProperties.isPartitioned();
        boolean z3 = !z && ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isDurableSubscription();
        if (z) {
            queue = new Queue(applyPrefix2, false, true, true);
        } else {
            if (z2) {
                applyPrefix2 = applyPrefix2 + ("-" + extendedConsumerProperties.getInstanceIndex());
            }
            queue = z3 ? new Queue(applyPrefix2, true, false, false, queueArgs(applyPrefix2, ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefix(), ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isAutoBindDlq())) : new Queue(applyPrefix2, false, false, true);
        }
        declareQueue(applyPrefix2, queue);
        if (z2) {
            declareBinding(queue.getName(), BindingBuilder.bind(queue).to(topicExchange).with(String.format("%s-%d", str, Integer.valueOf(extendedConsumerProperties.getInstanceIndex()))));
        } else {
            declareBinding(queue.getName(), BindingBuilder.bind(queue).to(topicExchange).with("#"));
        }
        Binding<MessageChannel> doRegisterConsumer = doRegisterConsumer(groupedName, str2, messageChannel, queue, extendedConsumerProperties);
        if (z3) {
            autoBindDLQ(applyPrefix(prefix, groupedName), applyPrefix2, ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefix(), ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isAutoBindDlq());
        }
        return doRegisterConsumer;
    }

    private Map<String, Object> queueArgs(String str, String str2, boolean z) {
        HashMap hashMap = new HashMap();
        if (z) {
            hashMap.put("x-dead-letter-exchange", applyPrefix(str2, DEAD_LETTER_EXCHANGE));
            hashMap.put("x-dead-letter-routing-key", str);
        }
        return hashMap;
    }

    private Binding<MessageChannel> doRegisterConsumer(final String str, String str2, MessageChannel messageChannel, Queue queue, final ExtendedConsumerProperties<RabbitConsumerProperties> extendedConsumerProperties) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.connectionFactory);
        simpleMessageListenerContainer.setAcknowledgeMode(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getAcknowledgeMode());
        simpleMessageListenerContainer.setChannelTransacted(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isTransacted());
        simpleMessageListenerContainer.setDefaultRequeueRejected(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isRequeueRejected());
        int concurrency = extendedConsumerProperties.getConcurrency();
        int i = concurrency > 0 ? concurrency : 1;
        simpleMessageListenerContainer.setConcurrentConsumers(i);
        int maxConcurrency = ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getMaxConcurrency();
        if (maxConcurrency > i) {
            simpleMessageListenerContainer.setMaxConcurrentConsumers(maxConcurrency);
        }
        simpleMessageListenerContainer.setPrefetchCount(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefetch());
        simpleMessageListenerContainer.setTxSize(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getTxSize());
        simpleMessageListenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(queue.getName() + "-"));
        simpleMessageListenerContainer.setQueues(new Queue[]{queue});
        int maxAttempts = extendedConsumerProperties.getMaxAttempts();
        if (maxAttempts > 1 || ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isRepublishToDlq()) {
            simpleMessageListenerContainer.setAdviceChain(new Advice[]{(RetryOperationsInterceptor) RetryInterceptorBuilder.stateless().maxAttempts(maxAttempts).backOffOptions(extendedConsumerProperties.getBackOffInitialInterval(), extendedConsumerProperties.getBackOffMultiplier(), extendedConsumerProperties.getBackOffMaxInterval()).recoverer(determineRecoverer(str, ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefix(), ((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).isRepublishToDlq())).build()});
        }
        simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor[]{this.decompressingPostProcessor});
        simpleMessageListenerContainer.setMessagePropertiesConverter(inboundMessagePropertiesConverter);
        simpleMessageListenerContainer.afterPropertiesSet();
        AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(simpleMessageListenerContainer);
        amqpInboundChannelAdapter.setBeanFactory(getBeanFactory());
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanFactory(getBeanFactory());
        directChannel.setBeanName(str + ".bridge");
        amqpInboundChannelAdapter.setOutputChannel(directChannel);
        amqpInboundChannelAdapter.setBeanName("inbound." + str);
        DefaultAmqpHeaderMapper defaultAmqpHeaderMapper = new DefaultAmqpHeaderMapper();
        defaultAmqpHeaderMapper.setRequestHeaderNames(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getRequestHeaderPatterns());
        defaultAmqpHeaderMapper.setReplyHeaderNames(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getReplyHeaderPatterns());
        amqpInboundChannelAdapter.setHeaderMapper(defaultAmqpHeaderMapper);
        amqpInboundChannelAdapter.afterPropertiesSet();
        DefaultBinding<MessageChannel> defaultBinding = new DefaultBinding<MessageChannel>(str, str2, messageChannel, amqpInboundChannelAdapter) { // from class: org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.2
            protected void afterUnbind() {
                RabbitMessageChannelBinder.this.cleanAutoDeclareContext(((RabbitConsumerProperties) extendedConsumerProperties.getExtension()).getPrefix(), str);
            }
        };
        ReceivingHandler receivingHandler = new ReceivingHandler();
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str + ".convert.bridge");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        amqpInboundChannelAdapter.start();
        return defaultBinding;
    }

    private MessageRecoverer determineRecoverer(String str, String str2, boolean z) {
        return z ? new RepublishMessageRecoverer(new RabbitTemplate(this.connectionFactory), deadLetterExchangeName(str2), applyPrefix(str2, str)) : new RejectAndDontRequeueRecoverer();
    }

    private AmqpOutboundEndpoint buildOutboundEndpoint(String str, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties, RabbitTemplate rabbitTemplate) {
        String prefix = ((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix();
        String applyPrefix = applyPrefix(prefix, str);
        TopicExchange topicExchange = new TopicExchange(applyPrefix);
        declareExchange(applyPrefix, topicExchange);
        AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(rabbitTemplate);
        amqpOutboundEndpoint.setExchangeName(topicExchange.getName());
        if (extendedProducerProperties.isPartitioned()) {
            amqpOutboundEndpoint.setExpressionRoutingKey(EXPRESSION_PARSER.parseExpression(buildPartitionRoutingExpression(str)));
        } else {
            amqpOutboundEndpoint.setRoutingKey(str);
        }
        for (String str2 : extendedProducerProperties.getRequiredGroups()) {
            String str3 = applyPrefix + "." + str2;
            if (extendedProducerProperties.isPartitioned()) {
                for (int i = 0; i < extendedProducerProperties.getPartitionCount(); i++) {
                    String str4 = "-" + i;
                    String str5 = str3 + str4;
                    Queue queue = new Queue(str5, true, false, false, queueArgs(str5, ((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix(), ((RabbitProducerProperties) extendedProducerProperties.getExtension()).isAutoBindDlq()));
                    declareQueue(queue.getName(), queue);
                    autoBindDLQ(str3, str3 + str4, ((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix(), ((RabbitProducerProperties) extendedProducerProperties.getExtension()).isAutoBindDlq());
                    declareBinding(queue.getName(), BindingBuilder.bind(queue).to(topicExchange).with(str + str4));
                }
            } else {
                Queue queue2 = new Queue(str3, true, false, false, queueArgs(str3, prefix, ((RabbitProducerProperties) extendedProducerProperties.getExtension()).isAutoBindDlq()));
                declareQueue(str3, queue2);
                autoBindDLQ(str3, str3, ((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix(), ((RabbitProducerProperties) extendedProducerProperties.getExtension()).isAutoBindDlq());
                declareBinding(str3, BindingBuilder.bind(queue2).to(topicExchange).with(str));
            }
        }
        configureOutboundHandler(amqpOutboundEndpoint, extendedProducerProperties);
        return amqpOutboundEndpoint;
    }

    private void configureOutboundHandler(AmqpOutboundEndpoint amqpOutboundEndpoint, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        DefaultAmqpHeaderMapper defaultAmqpHeaderMapper = new DefaultAmqpHeaderMapper();
        defaultAmqpHeaderMapper.setRequestHeaderNames(((RabbitProducerProperties) extendedProducerProperties.getExtension()).getRequestHeaderPatterns());
        defaultAmqpHeaderMapper.setReplyHeaderNames(((RabbitProducerProperties) extendedProducerProperties.getExtension()).getReplyHeaderPatterns());
        amqpOutboundEndpoint.setHeaderMapper(defaultAmqpHeaderMapper);
        amqpOutboundEndpoint.setDefaultDeliveryMode(((RabbitProducerProperties) extendedProducerProperties.getExtension()).getDeliveryMode());
        amqpOutboundEndpoint.setBeanFactory(getBeanFactory());
        amqpOutboundEndpoint.afterPropertiesSet();
    }

    public Binding<MessageChannel> doBindProducer(String str, MessageChannel messageChannel, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        String applyPrefix = applyPrefix(((RabbitProducerProperties) extendedProducerProperties.getExtension()).getPrefix(), str);
        declareExchange(applyPrefix, new TopicExchange(applyPrefix));
        return doRegisterProducer(str, messageChannel, buildOutboundEndpoint(str, extendedProducerProperties, buildRabbitTemplate((RabbitProducerProperties) extendedProducerProperties.getExtension())), extendedProducerProperties);
    }

    private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties rabbitProducerProperties) {
        BatchingRabbitTemplate batchingRabbitTemplate = rabbitProducerProperties.isBatchingEnabled() ? new BatchingRabbitTemplate(new SimpleBatchingStrategy(rabbitProducerProperties.getBatchSize(), rabbitProducerProperties.getBatchBufferLimit(), rabbitProducerProperties.getBatchTimeout()), (TaskScheduler) getApplicationContext().getBean("taskScheduler", TaskScheduler.class)) : new RabbitTemplate();
        batchingRabbitTemplate.setConnectionFactory(this.connectionFactory);
        if (rabbitProducerProperties.isCompress()) {
            batchingRabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor[]{this.compressingPostProcessor});
        }
        batchingRabbitTemplate.setChannelTransacted(rabbitProducerProperties.isTransacted());
        batchingRabbitTemplate.afterPropertiesSet();
        return batchingRabbitTemplate;
    }

    private Binding<MessageChannel> doRegisterProducer(String str, MessageChannel messageChannel, AmqpOutboundEndpoint amqpOutboundEndpoint, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        return doRegisterProducer(str, messageChannel, amqpOutboundEndpoint, null, extendedProducerProperties);
    }

    private Binding<MessageChannel> doRegisterProducer(String str, MessageChannel messageChannel, AmqpOutboundEndpoint amqpOutboundEndpoint, String str2, ExtendedProducerProperties<RabbitProducerProperties> extendedProducerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(amqpOutboundEndpoint, str2, extendedProducerProperties));
        eventDrivenConsumer.setBeanFactory(getBeanFactory());
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        DefaultBinding defaultBinding = new DefaultBinding(str, (String) null, messageChannel, eventDrivenConsumer);
        eventDrivenConsumer.start();
        return defaultBinding;
    }

    private void autoBindDLQ(String str, String str2, String str3, boolean z) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("autoBindDLQ=" + z + " for: " + str);
        }
        if (z) {
            String constructDLQName = constructDLQName(str);
            Queue queue = new Queue(constructDLQName);
            declareQueue(constructDLQName, queue);
            String deadLetterExchangeName = deadLetterExchangeName(str3);
            DirectExchange directExchange = new DirectExchange(deadLetterExchangeName);
            declareExchange(deadLetterExchangeName, directExchange);
            declareBinding(constructDLQName, BindingBuilder.bind(queue).to(directExchange).with(str2));
        }
    }

    public void declareQueue(String str, Queue queue) {
        try {
            this.rabbitAdmin.declareQueue(queue);
        } catch (UncategorizedAmqpException e) {
            if (!(e.getCause() instanceof NullPointerException)) {
                throw e;
            }
        } catch (AmqpConnectException e2) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Declaration of queue: " + queue.getName() + " deferred - connection not available");
            }
        }
        addToAutoDeclareContext(str, queue);
    }

    public void declareExchange(String str, Exchange exchange) {
        try {
            this.rabbitAdmin.declareExchange(exchange);
        } catch (AmqpConnectException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Declaration of exchange: " + exchange.getName() + " deferred - connection not available");
            }
        }
        addToAutoDeclareContext(str + ".exchange", exchange);
    }

    public void declareBinding(String str, org.springframework.amqp.core.Binding binding) {
        try {
            this.rabbitAdmin.declareBinding(binding);
        } catch (AmqpConnectException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Declaration of binding: " + str + ".binding deferred - connection not available");
            }
        }
        addToAutoDeclareContext(str + ".binding", binding);
    }

    private String deadLetterExchangeName(String str) {
        return str + DEAD_LETTER_EXCHANGE;
    }

    private void addToAutoDeclareContext(String str, Object obj) {
        synchronized (this.autoDeclareContext) {
            if (!this.autoDeclareContext.containsBean(str)) {
                this.autoDeclareContext.getBeanFactory().registerSingleton(str, obj);
            }
        }
    }

    public void cleanAutoDeclareContext(String str, String str2) {
        synchronized (this.autoDeclareContext) {
            removeSingleton(applyPrefix(str, str2) + ".binding");
            removeSingleton(applyPrefix(str, str2));
            String str3 = applyPrefix(str, str2) + ".dlq";
            removeSingleton(str3 + ".binding");
            removeSingleton(str3);
        }
    }

    private void removeSingleton(String str) {
        if (this.autoDeclareContext.containsBean(str)) {
            DefaultListableBeanFactory beanFactory = this.autoDeclareContext.getBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                beanFactory.destroySingleton(str);
            }
        }
    }

    public void doManualAck(LinkedList<MessageHeaders> linkedList) {
        Iterator<MessageHeaders> it = linkedList.iterator();
        HashMap hashMap = new HashMap();
        while (it.hasNext()) {
            MessageHeaders next = it.next();
            if (next.containsKey("amqp_channel")) {
                hashMap.put((Channel) next.get("amqp_channel"), (Long) next.get("amqp_deliveryTag"));
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            try {
                ((Channel) entry.getKey()).basicAck(((Long) entry.getValue()).longValue(), true);
            } catch (IOException e) {
                this.logger.error("Exception while manually acknowledging " + e);
            }
        }
    }
}
