package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import net.jxta.util.TimeConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.BoundedPacketQueue;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerAdmin;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.Service;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/optional/activemq-ra-1.1-G1M3.rar:activemq-core-1.1-G1M3.jar:org/codehaus/activemq/service/boundedvm/TransientTopicBoundedMessageContainer.class
 */
/* loaded from: input_file:lib/activemq-1.1-G1M3.jar:org/codehaus/activemq/service/boundedvm/TransientTopicBoundedMessageContainer.class */
public class TransientTopicBoundedMessageContainer implements MessageContainer, Service, Runnable, MessageContainerAdmin {
    private BrokerClient client;
    private BoundedPacketQueue queue;
    private Thread worker;
    private Log log;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList();

    public TransientTopicBoundedMessageContainer(BrokerClient brokerClient, BoundedPacketQueue boundedPacketQueue) {
        this.client = brokerClient;
        this.queue = boundedPacketQueue;
        this.log = LogFactory.getLog(new StringBuffer().append("TransientTopicBoundedMessageContainer:- ").append(brokerClient).toString());
    }

    public boolean isInactive() {
        return this.subscriptions.isEmpty();
    }

    public BrokerClient getBrokerClient() {
        return this.client;
    }

    public void addConsumer(Filter filter, ConsumerInfo consumerInfo) {
        if (findMatch(consumerInfo) == null) {
            this.subscriptions.add(new TransientTopicSubscription(filter, consumerInfo));
        }
    }

    public void removeConsumer(ConsumerInfo consumerInfo) {
        TransientTopicSubscription findMatch = findMatch(consumerInfo);
        if (findMatch != null) {
            this.subscriptions.remove(findMatch);
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() {
        if (this.started.commit(false, true)) {
            this.worker = new Thread(this, "TransientTopicDispatcher");
            this.worker.setPriority(6);
            this.worker.start();
        }
    }

    public boolean targetAndDispatch(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        boolean z = false;
        if (!this.client.isClusteredConnection() || !brokerClient.isClusteredConnection()) {
            ArrayList arrayList = null;
            Iterator it = this.subscriptions.iterator();
            while (it.hasNext()) {
                TransientTopicSubscription transientTopicSubscription = (TransientTopicSubscription) it.next();
                if (transientTopicSubscription.isTarget(activeMQMessage)) {
                    if (arrayList == null) {
                        arrayList = new ArrayList();
                    }
                    arrayList.add(transientTopicSubscription);
                }
            }
            dispatchToQueue(activeMQMessage, arrayList);
            z = arrayList != null;
        }
        return z;
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() {
        this.started.set(false);
        this.queue.clear();
    }

    public void close() {
        if (this.started.get()) {
            stop();
        }
        this.queue.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        while (this.started.get()) {
            try {
                ActiveMQMessage activeMQMessage = (ActiveMQMessage) this.queue.dequeue(TimeConstants.TWO_SECONDS);
                if (activeMQMessage != null && !activeMQMessage.isExpired()) {
                    this.client.dispatch(activeMQMessage);
                    i++;
                    if (i == 250) {
                        i = 0;
                        Thread.yield();
                    }
                }
            } catch (Exception e) {
                stop();
                this.log.warn("stop dispatching", e);
            }
        }
    }

    private void dispatchToQueue(ActiveMQMessage activeMQMessage, List list) throws JMSException {
        if (list == null || list.isEmpty()) {
            return;
        }
        int[] iArr = new int[list.size()];
        for (int i = 0; i < list.size(); i++) {
            iArr[i] = ((TransientTopicSubscription) list.get(i)).getConsumerInfo().getConsumerNo();
        }
        ActiveMQMessage shallowCopy = activeMQMessage.shallowCopy();
        shallowCopy.setConsumerNos(iArr);
        try {
            this.queue.enqueue(shallowCopy);
        } catch (InterruptedException e) {
            this.log.warn("queue interuppted, closing", e);
            close();
        }
    }

    private TransientTopicSubscription findMatch(ConsumerInfo consumerInfo) {
        TransientTopicSubscription transientTopicSubscription = null;
        Iterator it = this.subscriptions.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TransientTopicSubscription transientTopicSubscription2 = (TransientTopicSubscription) it.next();
            if (transientTopicSubscription2.getConsumerInfo().equals(consumerInfo)) {
                transientTopicSubscription = transientTopicSubscription2;
                break;
            }
        }
        return transientTopicSubscription;
    }

    public boolean hasConsumerFor(ActiveMQDestination activeMQDestination) {
        Iterator it = this.subscriptions.iterator();
        while (it.hasNext()) {
            if (((TransientTopicSubscription) it.next()).getConsumerInfo().getDestination().matches(activeMQDestination)) {
                return true;
            }
        }
        return false;
    }

    @Override // org.codehaus.activemq.service.MessageContainer, org.codehaus.activemq.service.MessageContainerAdmin
    public String getDestinationName() {
        return "";
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public MessageIdentity addMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void delete(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
        return false;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    @Override // org.codehaus.activemq.service.MessageContainerAdmin
    public void empty() throws JMSException {
    }
}
