/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.BoundedActiveMQMessageQueue;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerAdmin;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
import org.codehaus.activemq.service.boundedvm.TransientTopicSubscription;

public class TransientTopicBoundedMessageContainer
implements MessageContainer,
Service,
Runnable,
MessageContainerAdmin {
    private SynchronizedBoolean started;
    private TransientTopicBoundedMessageManager manager;
    private BrokerClient client;
    private BoundedActiveMQMessageQueue queue;
    private Thread worker;
    private CopyOnWriteArrayList subscriptions;
    private Log log;

    public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client, BoundedActiveMQMessageQueue queue) {
        this.manager = manager;
        this.client = client;
        this.queue = queue;
        this.started = new SynchronizedBoolean(false);
        this.subscriptions = new CopyOnWriteArrayList();
        this.log = LogFactory.getLog((String)("TransientTopicBoundedMessageContainer:- " + client));
    }

    public boolean isInactive() {
        return this.subscriptions.isEmpty();
    }

    public BrokerClient getBrokerClient() {
        return this.client;
    }

    public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
        TransientTopicSubscription ts = this.findMatch(info);
        if (ts == null) {
            ts = new TransientTopicSubscription(filter, info, this.client);
            this.subscriptions.add((Object)ts);
        }
        return ts;
    }

    public void removeConsumer(ConsumerInfo info) {
        TransientTopicSubscription ts = this.findMatch(info);
        if (ts != null) {
            this.subscriptions.remove((Object)ts);
        }
    }

    public void start() {
        if (this.started.commit(false, true) && this.manager.isDecoupledDispatch()) {
            this.worker = new Thread((Runnable)this, "TransientTopicDispatcher");
            this.worker.setPriority(7);
            this.worker.start();
        }
    }

    public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
        boolean result = false;
        if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
            ArrayList<TransientTopicSubscription> tmpList = null;
            Iterator i = this.subscriptions.iterator();
            while (i.hasNext()) {
                TransientTopicSubscription ts = (TransientTopicSubscription)i.next();
                if (!ts.isTarget(message)) continue;
                if (tmpList == null) {
                    tmpList = new ArrayList<TransientTopicSubscription>();
                }
                tmpList.add(ts);
            }
            this.dispatchToQueue(message, tmpList);
            result = tmpList != null;
        }
        return result;
    }

    public void stop() {
        this.started.set(false);
        this.queue.clear();
    }

    public void close() {
        if (this.started.get()) {
            this.stop();
        }
        this.queue.close();
    }

    public void run() {
        int count = 0;
        ActiveMQMessage message = null;
        while (this.started.get()) {
            try {
                message = this.queue.dequeue(2000L);
                if (message == null) continue;
                if (!message.isExpired()) {
                    this.client.dispatch(message);
                    if (++count != 250) continue;
                    count = 0;
                    Thread.yield();
                    continue;
                }
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug((Object)("Message: " + message + " has expired"));
            }
            catch (Exception e) {
                this.stop();
                this.log.warn((Object)"stop dispatching", (Throwable)e);
            }
        }
    }

    private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
        if (list != null && !list.isEmpty()) {
            int[] ids = new int[list.size()];
            for (int i = 0; i < list.size(); ++i) {
                TransientTopicSubscription ts = (TransientTopicSubscription)list.get(i);
                ids[i] = ts.getConsumerInfo().getConsumerNo();
            }
            message = message.shallowCopy();
            message.setConsumerNos(ids);
            if (this.manager.isDecoupledDispatch()) {
                try {
                    this.queue.enqueue(message);
                }
                catch (InterruptedException e) {
                    this.log.warn((Object)"queue interuppted, closing", (Throwable)e);
                    this.close();
                }
            } else {
                this.client.dispatch(message);
            }
        }
    }

    private TransientTopicSubscription findMatch(ConsumerInfo info) {
        TransientTopicSubscription result = null;
        Iterator i = this.subscriptions.iterator();
        while (i.hasNext()) {
            TransientTopicSubscription ts = (TransientTopicSubscription)i.next();
            if (!ts.getConsumerInfo().equals(info)) continue;
            result = ts;
            break;
        }
        return result;
    }

    public boolean hasConsumerFor(ActiveMQDestination destination) {
        Iterator i = this.subscriptions.iterator();
        while (i.hasNext()) {
            TransientTopicSubscription ts = (TransientTopicSubscription)i.next();
            ConsumerInfo info = ts.getConsumerInfo();
            if (!info.getDestination().matches(destination)) continue;
            return true;
        }
        return false;
    }

    public String getDestinationName() {
        return "";
    }

    public void addMessage(ActiveMQMessage msg) throws JMSException {
    }

    public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
    }

    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        return null;
    }

    public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
        return false;
    }

    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    public void empty() throws JMSException {
    }

    public boolean isDeadLetterQueue() {
        return false;
    }
}

