/*
 * Decompiled with CFR 0.152.
 */
package org.skyscreamer.nevado.jms;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.skyscreamer.nevado.jms.NevadoMessageConsumer;
import org.skyscreamer.nevado.jms.util.BackoffSleeper;

public class AsyncConsumerRunner
implements Runnable {
    private final Log _log = LogFactory.getLog(this.getClass());
    private final Connection _connection;
    private final Set<NevadoMessageConsumer> _asyncConsumers = new CopyOnWriteArraySet<NevadoMessageConsumer>();
    private volatile boolean _running = false;
    private final BackoffSleeper _sleeper = new BackoffSleeper(50L, 5000L, 1.5);
    private Thread runner;

    protected AsyncConsumerRunner(Connection connection) {
        this._connection = connection;
    }

    @Override
    public void run() {
        this._log.debug((Object)"Starting async loop");
        block0: while (this._running) {
            this._log.debug((Object)"Running async loop");
            boolean messageProcessed = false;
            for (NevadoMessageConsumer consumer : this._asyncConsumers) {
                boolean bl = messageProcessed = this.processMessage(consumer) || messageProcessed;
                if (this._running) continue;
                break block0;
            }
            if (messageProcessed) {
                this._sleeper.reset();
            }
            this._log.debug((Object)"Sleeping async loop");
            this._sleeper.sleep();
        }
        this._log.debug((Object)"Exiting async loop");
    }

    public void addAsyncConsumer(NevadoMessageConsumer asyncConsumer) {
        this._asyncConsumers.add(asyncConsumer);
    }

    public void removeAsyncConsumer(NevadoMessageConsumer asyncConsumer) {
        this._asyncConsumers.remove(asyncConsumer);
    }

    public int numAsyncConsumers() {
        return this._asyncConsumers.size();
    }

    private boolean processMessage(NevadoMessageConsumer consumer) {
        boolean messageProcessed;
        block6: {
            messageProcessed = false;
            if (consumer.getMessageListener() != null) {
                try {
                    if (consumer.processAsyncMessage()) {
                        messageProcessed = true;
                    }
                }
                catch (Throwable t) {
                    String errorMessage = "Unable to process message for consumer on " + consumer.getDestination();
                    this._log.error((Object)errorMessage, t);
                    ExceptionListener exceptionListener = null;
                    try {
                        exceptionListener = this._connection.getExceptionListener();
                    }
                    catch (JMSException e1) {
                        this._log.error((Object)"Unable to retrieve exception listener from connection", (Throwable)e1);
                    }
                    if (exceptionListener == null) break block6;
                    JMSException e = t instanceof JMSException ? (JMSException)t : new JMSException(errorMessage + ": " + t.getMessage());
                    exceptionListener.onException(e);
                }
            }
        }
        return messageProcessed;
    }

    synchronized void start() {
        if (!this._running) {
            this.runner = new Thread(this);
            this.runner.start();
            this._running = true;
        }
    }

    synchronized void stop() throws InterruptedException {
        if (this._running) {
            this._running = false;
            this._sleeper.interrupt();
            this.runner.join();
        }
    }
}

