package acromusashi.stream.component.rabbitmq.spout;

import acromusashi.stream.component.rabbitmq.RabbitmqClient;
import acromusashi.stream.component.rabbitmq.RabbitmqCommunicateException;
import acromusashi.stream.constants.FieldName;
import acromusashi.stream.helper.SpringContextHelper;
import acromusashi.stream.spout.AmConfigurationSpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.text.MessageFormat;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/component/rabbitmq/spout/RabbitMqSpout.class */
public class RabbitMqSpout extends AmConfigurationSpout {
    private static final long serialVersionUID = -7039267927348254032L;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSpout.class);
    protected transient RabbitmqClient rabbitmqClient;
    protected String queueName;
    protected String targetQueueName;
    protected MessageKeyExtractor messageKeyExtractor;
    protected SpringContextHelper contextHelper;

    @Override // acromusashi.stream.spout.AmConfigurationSpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.rabbitmqClient = (RabbitmqClient) this.contextHelper.getComponent(RabbitmqClient.class);
        this.targetQueueName = this.queueName + topologyContext.getThisTaskIndex();
    }

    public void nextTuple() {
        try {
            Object receive = this.rabbitmqClient.receive(this.targetQueueName);
            if (receive == null) {
                return;
            }
            try {
                getCollector().emit(new Values(new Object[]{this.messageKeyExtractor.extractMessageKey(receive), receive.toString()}));
            } catch (RabbitmqCommunicateException e) {
                logger.warn(MessageFormat.format("MessageKey extract failed. QueueName={0}, ReceiveData={1}", this.targetQueueName, receive.toString()), e);
            }
        } catch (RabbitmqCommunicateException e2) {
            logger.warn(MessageFormat.format("Message receive failed. QueueName={0}", this.targetQueueName), e2);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{FieldName.MESSAGE_KEY, FieldName.MESSAGE_VALUE}));
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public void setMessageKeyExtractor(MessageKeyExtractor messageKeyExtractor) {
        this.messageKeyExtractor = messageKeyExtractor;
    }

    public void setContextHelper(SpringContextHelper springContextHelper) {
        this.contextHelper = springContextHelper;
    }
}
