/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq;

import java.util.Iterator;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQMessageConsumer;
import org.codehaus.activemq.ActiveMQSession;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;

public class ActiveMQSessionExecutor
implements Runnable {
    private static final Log log = LogFactory.getLog((Class)ActiveMQSessionExecutor.class);
    private ActiveMQSession session;
    private MemoryBoundedQueue messageQueue;
    private boolean closed;
    private Thread runner;
    private boolean doDispatch;

    ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
        this.session = session;
        this.messageQueue = queue;
        this.doDispatch = true;
    }

    void setDoDispatch(boolean value) {
        this.doDispatch = value;
    }

    void execute(ActiveMQMessage message) {
        this.messageQueue.enqueue(message);
    }

    void executeFirst(ActiveMQMessage message) {
        this.messageQueue.enqueueFirstNoBlock(message);
    }

    public void run() {
        while (!this.closed && this.doDispatch) {
            ActiveMQMessage message = null;
            try {
                message = this.messageQueue.dequeue(100L);
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
            if (this.closed || message == null) continue;
            if (this.doDispatch) {
                Iterator i = this.session.consumers.iterator();
                while (i.hasNext()) {
                    ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
                    if (!message.isConsumerTarget(consumer.getConsumerNumber())) continue;
                    try {
                        consumer.processMessage(message.shallowCopy());
                    }
                    catch (JMSException e) {
                        this.session.connection.handleAsyncException(e);
                    }
                }
                continue;
            }
            this.messageQueue.enqueueFirstNoBlock(message);
        }
    }

    synchronized void start() {
        this.messageQueue.start();
        if (this.runner == null && this.doDispatch) {
            this.runner = new Thread((Runnable)this, "JmsSessionDispatcher: " + this.session.getSessionId());
            this.runner.setPriority(10);
            this.runner.start();
        }
    }

    synchronized void stop() {
        this.messageQueue.stop();
    }

    synchronized void close() {
        this.closed = true;
        this.messageQueue.close();
    }

    void clear() {
        this.messageQueue.clear();
    }

    ActiveMQMessage dequeueNoWait() {
        try {
            return this.messageQueue.dequeueNoWait();
        }
        catch (InterruptedException ie) {
            return null;
        }
    }

    protected void clearMessagesInProgress() {
        this.messageQueue.clear();
    }
}

