package org.apache.storm.jms.spout;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/jms/spout/JmsSpout.class */
public class JmsSpout extends BaseRichSpout implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
    private static final int POLL_INTERVAL_MS = 50;
    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    private static final int RECOVERY_DELAY_MS = 10;
    private JmsTupleProducer tupleProducer;
    private JmsProvider jmsProvider;
    private LinkedBlockingQueue<Message> queue;
    private TreeSet<JmsMessageID> toCommit;
    private HashMap<JmsMessageID, Message> pendingMessages;
    private SpoutOutputCollector collector;
    private transient Connection connection;
    private transient Session session;
    private int jmsAcknowledgeMode = 1;
    private boolean distributed = true;
    private long messageSequence = 0;
    private boolean hasFailures = false;
    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
    private Timer recoveryTimer = null;
    private long recoveryPeriodMs = -1;

    /* loaded from: input_file:org/apache/storm/jms/spout/JmsSpout$RecoveryTask.class */
    private class RecoveryTask extends TimerTask {
        private RecoveryTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (JmsSpout.this.recoveryMutex) {
                if (JmsSpout.this.hasFailures()) {
                    try {
                        JmsSpout.RECOVERY_TASK_LOG.info("Recovering from a message failure.");
                        JmsSpout.this.getSession().recover();
                        JmsSpout.this.recovered();
                    } catch (JMSException e) {
                        JmsSpout.RECOVERY_TASK_LOG.warn("Could not recover jms session.", e);
                    }
                }
            }
        }
    }

    public void setJmsAcknowledgeMode(int i) {
        switch (i) {
            case 1:
            case 2:
            case 3:
                this.jmsAcknowledgeMode = i;
                return;
            default:
                throw new IllegalArgumentException("Unknown Acknowledge mode: " + i + " (See javax.jms.Session for valid values)");
        }
    }

    public int getJmsAcknowledgeMode() {
        return this.jmsAcknowledgeMode;
    }

    public void setJmsProvider(JmsProvider jmsProvider) {
        this.jmsProvider = jmsProvider;
    }

    public void setJmsTupleProducer(JmsTupleProducer jmsTupleProducer) {
        this.tupleProducer = jmsTupleProducer;
    }

    public void onMessage(Message message) {
        try {
            LOG.debug("Queuing msg [" + message.getJMSMessageID() + "]");
        } catch (JMSException e) {
        }
        this.queue.offer(message);
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        if (this.jmsProvider == null) {
            throw new IllegalStateException("JMS provider has not been set.");
        }
        if (this.tupleProducer == null) {
            throw new IllegalStateException("JMS Tuple Producer has not been set.");
        }
        Number number = (Number) map.get("topology.message.timeout.secs");
        Long valueOf = Long.valueOf(number != null ? number.longValue() : 30L);
        if (TimeUnit.SECONDS.toMillis(valueOf.longValue()) > this.recoveryPeriodMs) {
            LOG.warn("*** WARNING *** : Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured 'topology.message.timeout.secs' of " + valueOf + " secs. This could lead to a message replay flood!");
        }
        this.queue = new LinkedBlockingQueue<>();
        this.toCommit = new TreeSet<>();
        this.pendingMessages = new HashMap<>();
        this.collector = spoutOutputCollector;
        try {
            ConnectionFactory connectionFactory = this.jmsProvider.connectionFactory();
            Destination destination = this.jmsProvider.destination();
            this.connection = connectionFactory.createConnection();
            this.session = this.connection.createSession(false, this.jmsAcknowledgeMode);
            this.session.createConsumer(destination).setMessageListener(this);
            this.connection.start();
            if (isDurableSubscription() && this.recoveryPeriodMs > 0) {
                this.recoveryTimer = new Timer();
                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10L, this.recoveryPeriodMs);
            }
        } catch (Exception e) {
            LOG.warn("Error creating JMS connection.", e);
        }
    }

    public void close() {
        try {
            LOG.debug("Closing JMS connection.");
            this.session.close();
            this.connection.close();
        } catch (JMSException e) {
            LOG.warn("Error closing JMS connection.", e);
        }
    }

    public void nextTuple() {
        Message poll = this.queue.poll();
        if (poll == null) {
            Utils.sleep(50L);
            return;
        }
        LOG.debug("sending tuple: " + poll);
        try {
            Values tuple = this.tupleProducer.toTuple(poll);
            LOG.debug("Requested deliveryMode: " + toDeliveryModeString(poll.getJMSDeliveryMode()));
            LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
            if (isDurableSubscription()) {
                LOG.debug("Requesting acks.");
                long j = this.messageSequence;
                this.messageSequence = j + 1;
                JmsMessageID jmsMessageID = new JmsMessageID(j, poll.getJMSMessageID());
                this.collector.emit(tuple, jmsMessageID);
                this.pendingMessages.put(jmsMessageID, poll);
                this.toCommit.add(jmsMessageID);
            } else {
                this.collector.emit(tuple);
            }
        } catch (JMSException e) {
            LOG.warn("Unable to convert JMS message: " + poll);
        }
    }

    public void ack(Object obj) {
        Message remove = this.pendingMessages.remove(obj);
        if (!obj.equals(this.toCommit.first())) {
            this.toCommit.remove(obj);
            return;
        }
        if (remove == null) {
            LOG.warn("Couldn't acknowledge unknown JMS message ID: " + obj);
            return;
        }
        try {
            LOG.debug("Committing...");
            remove.acknowledge();
            LOG.debug("JMS Message acked: " + obj);
            this.toCommit.remove(obj);
        } catch (JMSException e) {
            LOG.warn("Error acknowldging JMS message: " + obj, e);
        }
    }

    public void fail(Object obj) {
        LOG.warn("Message failed: " + obj);
        this.pendingMessages.clear();
        this.toCommit.clear();
        synchronized (this.recoveryMutex) {
            this.hasFailures = true;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.tupleProducer.declareOutputFields(outputFieldsDeclarer);
    }

    public boolean hasFailures() {
        return this.hasFailures;
    }

    protected void recovered() {
        this.hasFailures = false;
    }

    public void setRecoveryPeriodMs(long j) {
        this.recoveryPeriodMs = j;
    }

    public boolean isDistributed() {
        return this.distributed;
    }

    public void setDistributed(boolean z) {
        this.distributed = z;
    }

    private static String toDeliveryModeString(int i) {
        switch (i) {
            case 1:
                return "AUTO_ACKNOWLEDGE";
            case 2:
                return "CLIENT_ACKNOWLEDGE";
            case 3:
                return "DUPS_OK_ACKNOWLEDGE";
            default:
                return "UNKNOWN";
        }
    }

    protected Session getSession() {
        return this.session;
    }

    private boolean isDurableSubscription() {
        return this.jmsAcknowledgeMode != 1;
    }
}
