/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerAdmin;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.service.boundedvm.TransientQueueSubscription;
import org.codehaus.activemq.service.impl.DefaultQueueList;

public class TransientQueueBoundedMessageContainer
implements MessageContainer,
Service,
Runnable,
MessageContainerAdmin {
    private MemoryBoundedQueueManager queueManager;
    private ActiveMQDestination destination;
    private SynchronizedBoolean started;
    private SynchronizedBoolean running;
    private MemoryBoundedQueue queue;
    private DefaultQueueList subscriptions;
    private Executor threadPool;
    private Log log;
    private long idleTimestamp;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;

    public TransientQueueBoundedMessageContainer(Executor threadPool, MemoryBoundedQueueManager queueManager, ActiveMQDestination destination, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.threadPool = threadPool;
        this.queueManager = queueManager;
        this.destination = destination;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.queue = queueManager.getMemoryBoundedQueue("TRANSIENT_QUEUE:-" + destination.getPhysicalName());
        this.started = new SynchronizedBoolean(false);
        this.running = new SynchronizedBoolean(false);
        this.subscriptions = new DefaultQueueList();
        this.log = LogFactory.getLog((String)("TransientQueueBoundedMessageContainer:- " + destination));
    }

    public boolean isActive() {
        return !this.subscriptions.isEmpty();
    }

    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    public long getIdleTimestamp() {
        return this.idleTimestamp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TransientQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client) throws JMSException {
        TransientQueueSubscription ts = this.findMatch(info);
        if (ts == null) {
            MemoryBoundedQueue queue = this.queueManager.getMemoryBoundedQueue(info.getConsumerId());
            ts = new TransientQueueSubscription(client, queue, filter, info);
            DefaultQueueList defaultQueueList = this.subscriptions;
            synchronized (defaultQueueList) {
                this.idleTimestamp = 0L;
                this.subscriptions.add(ts);
                if (this.started.get()) {
                    SynchronizedBoolean synchronizedBoolean = this.running;
                    synchronized (synchronizedBoolean) {
                        if (this.running.commit(false, true)) {
                            try {
                                this.threadPool.execute((Runnable)this);
                            }
                            catch (InterruptedException e) {
                                JMSException jmsEx = new JMSException(this.toString() + " Failed to start running dispatch thread");
                                jmsEx.setLinkedException((Exception)e);
                                throw jmsEx;
                            }
                        }
                    }
                }
            }
        }
        return ts;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeConsumer(ConsumerInfo info) throws JMSException {
        TransientQueueSubscription ts = this.findMatch(info);
        if (ts != null) {
            DefaultQueueList defaultQueueList = this.subscriptions;
            synchronized (defaultQueueList) {
                this.subscriptions.remove(ts);
                if (this.subscriptions.isEmpty()) {
                    this.running.commit(true, false);
                    this.idleTimestamp = System.currentTimeMillis();
                }
            }
            List list = ts.getUndeliveredMessages();
            ListIterator i = list.listIterator(list.size());
            while (i.hasPrevious()) {
                ActiveMQMessage message = (ActiveMQMessage)i.previous();
                message.setJMSRedelivered(true);
                this.queue.enqueueFirstNoBlock(message);
            }
            list.clear();
            ts.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws JMSException {
        if (this.started.commit(false, true) && !this.subscriptions.isEmpty()) {
            SynchronizedBoolean synchronizedBoolean = this.running;
            synchronized (synchronizedBoolean) {
                if (this.running.commit(false, true)) {
                    try {
                        this.threadPool.execute((Runnable)this);
                    }
                    catch (InterruptedException e) {
                        JMSException jmsEx = new JMSException(this.toString() + " Failed to start");
                        jmsEx.setLinkedException((Exception)e);
                        throw jmsEx;
                    }
                }
            }
        }
    }

    public void enqueue(ActiveMQMessage message) {
        this.queue.enqueue(message);
        this.startRunning();
    }

    public void enqueueFirst(ActiveMQMessage message) {
        this.queue.enqueueFirstNoBlock(message);
        this.startRunning();
    }

    public void stop() {
        this.started.set(false);
        this.running.set(false);
        this.queue.clear();
    }

    public void close() throws JMSException {
        if (this.started.get()) {
            this.stop();
        }
        this.queue.close();
        QueueListEntry entry = this.subscriptions.getFirstEntry();
        while (entry != null) {
            TransientQueueSubscription ts = (TransientQueueSubscription)entry.getElement();
            ts.close();
            entry = this.subscriptions.getNextEntry(entry);
        }
        this.subscriptions.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        boolean dispatched = false;
        boolean targeted = false;
        ActiveMQMessage message = null;
        int notDispatchedCount = 0;
        int sleepTime = 250;
        int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;
        HashMap<String, TransientQueueSubscription> messageParts = new HashMap<String, TransientQueueSubscription>();
        try {
            while (this.started.get() && this.running.get()) {
                QueueListEntry entry;
                dispatched = false;
                targeted = false;
                if (!this.subscriptions.isEmpty() && (message = this.queue.dequeue(2000L)) != null) {
                    if (!message.isExpired()) {
                        entry = this.subscriptions.getFirstEntry();
                        while (entry != null) {
                            TransientQueueSubscription ts = (TransientQueueSubscription)entry.getElement();
                            if (ts.isTarget(message)) {
                                targeted = true;
                                if (message.isMessagePart()) {
                                    TransientQueueSubscription sameTarget = (TransientQueueSubscription)messageParts.get(message.getParentMessageID());
                                    if (sameTarget == null) {
                                        sameTarget = ts;
                                        messageParts.put(message.getParentMessageID(), sameTarget);
                                    }
                                    sameTarget.doDispatch(message);
                                    if (message.isLastMessagePart()) {
                                        messageParts.remove(message.getParentMessageID());
                                    }
                                    message = null;
                                    dispatched = true;
                                    notDispatchedCount = 0;
                                    break;
                                }
                                if (ts.canAcceptMessages()) {
                                    ts.doDispatch(message);
                                    message = null;
                                    dispatched = true;
                                    notDispatchedCount = 0;
                                    this.subscriptions.rotate();
                                    break;
                                }
                            }
                            entry = this.subscriptions.getNextEntry(entry);
                        }
                    } else {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug((Object)("expired message: " + message));
                        }
                        this.deadLetterPolicy.sendToDeadLetter(message);
                        message = null;
                    }
                }
                if (dispatched) continue;
                if (message != null) {
                    if (targeted) {
                        this.queue.enqueueFirstNoBlock(message);
                    } else {
                        this.queue.enqueueNoBlock(message);
                    }
                }
                if (!this.running.get()) continue;
                if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping && this.queue.isEmpty()) {
                    entry = this.running;
                    synchronized (entry) {
                        this.running.commit(true, false);
                        continue;
                    }
                }
                Thread.sleep(sleepTime);
            }
        }
        catch (InterruptedException ie) {
        }
        catch (Throwable e) {
            this.log.warn((Object)"stop dispatching", e);
            this.stop();
        }
    }

    private TransientQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
        TransientQueueSubscription result = null;
        QueueListEntry entry = this.subscriptions.getFirstEntry();
        while (entry != null) {
            TransientQueueSubscription ts = (TransientQueueSubscription)entry.getElement();
            if (ts.getConsumerInfo().equals(info)) {
                result = ts;
                break;
            }
            entry = this.subscriptions.getNextEntry(entry);
        }
        return result;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    public String getDestinationName() {
        return this.destination.getPhysicalName();
    }

    public void addMessage(ActiveMQMessage msg) throws JMSException {
    }

    public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
    }

    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        return null;
    }

    public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
        return false;
    }

    protected void clear() {
        this.queue.clear();
    }

    protected void removeExpiredMessages() {
        long currentTime = System.currentTimeMillis();
        List list = this.queue.getContents();
        for (int i = 0; i < list.size(); ++i) {
            ActiveMQMessage msg = (ActiveMQMessage)list.get(i);
            if (!msg.isExpired(currentTime)) continue;
            this.queue.remove(msg);
            if (!this.log.isDebugEnabled()) continue;
            this.log.debug((Object)("expired message: " + msg));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startRunning() {
        if (!this.running.get() && this.started.get() && !this.subscriptions.isEmpty()) {
            SynchronizedBoolean synchronizedBoolean = this.running;
            synchronized (synchronizedBoolean) {
                if (this.running.commit(false, true)) {
                    try {
                        this.threadPool.execute((Runnable)this);
                    }
                    catch (InterruptedException e) {
                        this.log.error((Object)(this + " Couldn't start executing "), (Throwable)e);
                    }
                }
            }
        }
    }

    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    public void empty() throws JMSException {
    }

    public boolean isDeadLetterQueue() {
        return false;
    }
}

