package org.springframework.cloud.stream.binder.rocketmq.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.class */
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
    private ConsumerInstrumentation consumerInstrumentation;
    private InstrumentationManager instrumentationManager;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<? extends Object> recoveryCallback;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
    private final String destination;
    private final String group;
    private final ConsumersManager consumersManager;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter$CloudStreamMessageListener.class */
    public class CloudStreamMessageListener implements MessageListener, RetryListener {
        protected CloudStreamMessageListener() {
        }

        Acknowledgement consumeMessage(List<MessageExt> list) {
            try {
                if (RocketMQInboundChannelAdapter.this.retryTemplate != null) {
                    return (Acknowledgement) RocketMQInboundChannelAdapter.this.retryTemplate.execute(retryContext -> {
                        return doSendMsgs(list, retryContext);
                    }, new RecoveryCallback<Acknowledgement>() { // from class: org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.CloudStreamMessageListener.1
                        /* renamed from: recover, reason: merged with bridge method [inline-methods] */
                        public Acknowledgement m4recover(RetryContext retryContext2) throws Exception {
                            RocketMQInboundChannelAdapter.this.recoveryCallback.recover(retryContext2);
                            return ClassUtils.isAssignable(getClass(), MessageListenerConcurrently.class) ? Acknowledgement.buildConcurrentlyInstance() : Acknowledgement.buildOrderlyInstance();
                        }
                    });
                }
                Acknowledgement doSendMsgs = doSendMsgs(list, null);
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(instrumentationManager -> {
                    instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumed();
                });
                return doSendMsgs;
            } catch (Exception e) {
                RocketMQInboundChannelAdapter.logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", e);
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(instrumentationManager2 -> {
                    instrumentationManager2.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumedFailure();
                });
                throw new RuntimeException("RocketMQ Message hasn't been processed successfully. Caused by ", e);
            }
        }

        private Acknowledgement doSendMsgs(List<MessageExt> list, RetryContext retryContext) {
            ArrayList arrayList = new ArrayList();
            list.forEach(messageExt -> {
                String str = retryContext == null ? "" : "retryCount-" + String.valueOf(retryContext.getRetryCount()) + "|";
                RocketMQInboundChannelAdapter.logger.debug(str + "consuming msg:\n" + messageExt);
                RocketMQInboundChannelAdapter.logger.debug(str + "message body:\n" + new String(messageExt.getBody()));
                Acknowledgement acknowledgement = new Acknowledgement();
                Message build = MessageBuilder.withPayload(messageExt.getBody()).setHeaders(new RocketMQMessageHeaderAccessor().withAcknowledgment(acknowledgement).withTags(messageExt.getTags()).withKeys(messageExt.getKeys()).withFlag(Integer.valueOf(messageExt.getFlag())).withRocketMessage(messageExt)).build();
                arrayList.add(acknowledgement);
                RocketMQInboundChannelAdapter.this.sendMessage(build);
            });
            return (Acknowledgement) arrayList.get(0);
        }

        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            if (th != null) {
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(instrumentationManager -> {
                    instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumedFailure();
                });
            } else {
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(instrumentationManager2 -> {
                    instrumentationManager2.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumed();
                });
            }
        }

        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter$CloudStreamMessageListenerConcurrently.class */
    protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements MessageListenerConcurrently {
        protected CloudStreamMessageListenerConcurrently() {
            super();
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            Acknowledgement consumeMessage = consumeMessage(list);
            consumeConcurrentlyContext.setDelayLevelWhenNextConsume(consumeMessage.getConsumeConcurrentlyDelayLevel().intValue());
            return consumeMessage.getConsumeConcurrentlyStatus();
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter$CloudStreamMessageListenerOrderly.class */
    protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly {
        protected CloudStreamMessageListenerOrderly() {
            super();
        }

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            Acknowledgement consumeMessage = consumeMessage(list);
            consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(consumeMessage.getConsumeOrderlySuspendCurrentQueueTimeMill().longValue());
            return consumeMessage.getConsumeOrderlyStatus();
        }
    }

    public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties, String str, String str2, InstrumentationManager instrumentationManager) {
        this.consumersManager = consumersManager;
        this.consumerProperties = extendedConsumerProperties;
        this.destination = str;
        this.group = str2;
        this.instrumentationManager = instrumentationManager;
    }

    protected void doStart() {
        if (this.consumerProperties == null || !((RocketMQConsumerProperties) this.consumerProperties.getExtension()).getEnabled().booleanValue()) {
            return;
        }
        String tags = ((RocketMQConsumerProperties) this.consumerProperties.getExtension()).getTags();
        Boolean orderly = ((RocketMQConsumerProperties) this.consumerProperties.getExtension()).getOrderly();
        DefaultMQPushConsumer orCreateConsumer = this.consumersManager.getOrCreateConsumer(this.group, this.destination, this.consumerProperties);
        RetryListener cloudStreamMessageListenerOrderly = orderly.booleanValue() ? new CloudStreamMessageListenerOrderly() : new CloudStreamMessageListenerConcurrently();
        if (this.retryTemplate != null) {
            this.retryTemplate.registerListener(cloudStreamMessageListenerOrderly);
        }
        Iterable hashSet = tags == null ? new HashSet() : (Set) Arrays.stream(tags.split("\\|\\|")).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toSet());
        Optional.ofNullable(this.instrumentationManager).ifPresent(instrumentationManager -> {
            this.consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(this.destination);
            instrumentationManager.addHealthInstrumentation(this.consumerInstrumentation);
        });
        try {
            if (StringUtils.isEmpty(((RocketMQConsumerProperties) this.consumerProperties.getExtension()).getSql())) {
                orCreateConsumer.subscribe(this.destination, String.join(" || ", (Iterable<? extends CharSequence>) hashSet));
            } else {
                orCreateConsumer.subscribe(this.destination, MessageSelector.bySql(((RocketMQConsumerProperties) this.consumerProperties.getExtension()).getSql()));
            }
            Optional.ofNullable(this.consumerInstrumentation).ifPresent(consumerInstrumentation -> {
                consumerInstrumentation.markStartedSuccessfully();
            });
            orCreateConsumer.registerMessageListener(cloudStreamMessageListenerOrderly);
            try {
                this.consumersManager.startConsumer(this.group);
            } catch (MQClientException e) {
                logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e);
                throw new RuntimeException("RocketMQ Consumer startup failed.", e);
            }
        } catch (MQClientException e2) {
            Optional.ofNullable(this.consumerInstrumentation).ifPresent(consumerInstrumentation2 -> {
                consumerInstrumentation2.markStartFailed(e2);
            });
            logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e2.getErrorMessage(), e2);
            throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e2);
        }
    }

    protected void doStop() {
        this.consumersManager.stopConsumer(this.group);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }
}
