package org.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.Filter;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.io.util.MemoryBoundedQueueManager;
import org.activemq.io.util.MemoryManageable;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.MessageAck;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.MessageContainer;
import org.activemq.service.MessageContainerAdmin;
import org.activemq.service.MessageIdentity;
import org.activemq.service.QueueListEntry;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.Service;
import org.activemq.service.impl.DefaultQueueList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.services.daemon.DaemonService;
import org.apache.derby.impl.sql.compile.SQLParserConstants;

/* loaded from: input_file:org/activemq/service/boundedvm/TransientQueueBoundedMessageContainer.class */
public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable, MessageContainerAdmin {
    private MemoryBoundedQueueManager queueManager;
    private ActiveMQDestination destination;
    private MemoryBoundedQueue queue;
    private Executor threadPool;
    private Log log;
    private long idleTimestamp;
    private DeadLetterPolicy deadLetterPolicy;
    private final Object dispatchMutex = new Object();
    private final Object subscriptionMutex = new Object();
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean running = new SynchronizedBoolean(false);
    private DefaultQueueList subscriptions = new DefaultQueueList();

    public TransientQueueBoundedMessageContainer(Executor executor, MemoryBoundedQueueManager memoryBoundedQueueManager, ActiveMQDestination activeMQDestination, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.threadPool = executor;
        this.queueManager = memoryBoundedQueueManager;
        this.destination = activeMQDestination;
        this.deadLetterPolicy = deadLetterPolicy;
        this.queue = memoryBoundedQueueManager.getMemoryBoundedQueue(new StringBuffer().append("TRANSIENT_QUEUE:-").append(activeMQDestination.getPhysicalName()).toString());
        this.log = LogFactory.getLog(new StringBuffer().append("TransientQueueBoundedMessageContainer:- ").append(activeMQDestination).toString());
    }

    public boolean isActive() {
        return !this.subscriptions.isEmpty();
    }

    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    public long getIdleTimestamp() {
        return this.idleTimestamp;
    }

    public TransientQueueSubscription addConsumer(Filter filter, ConsumerInfo consumerInfo, BrokerClient brokerClient) throws JMSException {
        TransientQueueSubscription transientQueueSubscription;
        synchronized (this.subscriptionMutex) {
            TransientQueueSubscription findMatch = findMatch(consumerInfo);
            if (findMatch == null) {
                findMatch = new TransientQueueSubscription(brokerClient, this.queueManager.getMemoryBoundedQueue(new StringBuffer().append("TRANSIENT_QUEUE_SUB:-").append(consumerInfo.getConsumerId()).toString()), this.queueManager.getMemoryBoundedQueue(new StringBuffer().append("TRANSIENT_QUEUE_SUB_ACKED:-").append(consumerInfo.getConsumerId()).toString()), filter, consumerInfo);
                this.idleTimestamp = 0L;
                this.subscriptions.add(findMatch);
                if (this.started.get()) {
                    synchronized (this.running) {
                        if (this.running.commit(false, true)) {
                            try {
                                this.threadPool.execute(this);
                            } catch (InterruptedException e) {
                                JMSException jMSException = new JMSException(new StringBuffer().append(toString()).append(" Failed to start running dispatch thread").toString());
                                jMSException.setLinkedException(e);
                                throw jMSException;
                            }
                        }
                    }
                }
            }
            transientQueueSubscription = findMatch;
        }
        return transientQueueSubscription;
    }

    public void removeConsumer(ConsumerInfo consumerInfo) throws JMSException {
        synchronized (this.subscriptionMutex) {
            TransientQueueSubscription findMatch = findMatch(consumerInfo);
            if (findMatch != null) {
                this.subscriptions.remove(findMatch);
                if (this.subscriptions.isEmpty()) {
                    this.running.commit(true, false);
                    this.idleTimestamp = System.currentTimeMillis();
                }
                List undeliveredMessages = findMatch.getUndeliveredMessages();
                for (int size = undeliveredMessages.size() - 1; size >= 0; size--) {
                    this.queue.enqueueFirstNoBlock((MemoryManageable) undeliveredMessages.get(size));
                }
                if (findMatch.isBrowser()) {
                    List listAckedMessages = findMatch.listAckedMessages();
                    for (int size2 = listAckedMessages.size() - 1; size2 >= 0; size2--) {
                        this.queue.enqueueFirstNoBlock((MemoryManageable) listAckedMessages.get(size2));
                    }
                    findMatch.removeAllAckedMessages();
                }
                findMatch.close();
            }
        }
    }

    @Override // org.activemq.service.Service
    public void start() throws JMSException {
        if (!this.started.commit(false, true) || this.subscriptions.isEmpty()) {
            return;
        }
        synchronized (this.running) {
            if (this.running.commit(false, true)) {
                try {
                    this.threadPool.execute(this);
                } catch (InterruptedException e) {
                    JMSException jMSException = new JMSException(new StringBuffer().append(toString()).append(" Failed to start").toString());
                    jMSException.setLinkedException(e);
                    throw jMSException;
                }
            }
        }
    }

    public void enqueue(ActiveMQMessage activeMQMessage) {
        if (activeMQMessage.isAdvisory()) {
            doAdvisoryDispatchMessage(activeMQMessage);
        } else {
            this.queue.enqueue(activeMQMessage);
            startRunning();
        }
    }

    public void redeliver(ActiveMQMessage activeMQMessage) {
        this.queue.enqueueFirstNoBlock(activeMQMessage);
        startRunning();
    }

    public void redeliver(List list) {
        this.queue.enqueueAllFirstNoBlock(list);
        startRunning();
    }

    @Override // org.activemq.service.Service
    public void stop() {
        this.started.set(false);
        this.running.set(false);
        this.queue.clear();
    }

    public void close() throws JMSException {
        if (this.started.get()) {
            stop();
        }
        this.queue.close();
        synchronized (this.subscriptionMutex) {
            QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
            while (firstEntry != null) {
                ((TransientQueueSubscription) firstEntry.getElement()).close();
                firstEntry = this.subscriptions.getNextEntry(firstEntry);
            }
            this.subscriptions.clear();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        synchronized (this.dispatchMutex) {
            ActiveMQMessage activeMQMessage = null;
            int i = 0;
            int i2 = DaemonService.TIMER_DELAY / SQLParserConstants.PRECISION;
            HashMap hashMap = new HashMap();
            while (this.started.get() && this.running.get()) {
                try {
                    boolean z = false;
                    boolean z2 = false;
                    synchronized (this.subscriptionMutex) {
                        if (!this.subscriptions.isEmpty()) {
                            activeMQMessage = (ActiveMQMessage) this.queue.dequeue(SQLParserConstants.PRECISION);
                            if (activeMQMessage != null) {
                                if (!activeMQMessage.isExpired()) {
                                    QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
                                    while (true) {
                                        if (firstEntry == null) {
                                            break;
                                        }
                                        TransientQueueSubscription transientQueueSubscription = (TransientQueueSubscription) firstEntry.getElement();
                                        if (transientQueueSubscription.isTarget(activeMQMessage)) {
                                            z2 = true;
                                            if (activeMQMessage.isMessagePart()) {
                                                TransientQueueSubscription transientQueueSubscription2 = (TransientQueueSubscription) hashMap.get(activeMQMessage.getParentMessageID());
                                                if (transientQueueSubscription2 == null) {
                                                    transientQueueSubscription2 = transientQueueSubscription;
                                                    hashMap.put(activeMQMessage.getParentMessageID(), transientQueueSubscription2);
                                                }
                                                transientQueueSubscription2.doDispatch(activeMQMessage);
                                                if (activeMQMessage.isLastMessagePart()) {
                                                    hashMap.remove(activeMQMessage.getParentMessageID());
                                                }
                                                activeMQMessage = null;
                                                z = true;
                                                i = 0;
                                            } else if (transientQueueSubscription.canAcceptMessages()) {
                                                transientQueueSubscription.doDispatch(activeMQMessage);
                                                activeMQMessage = null;
                                                z = true;
                                                i = 0;
                                                this.subscriptions.rotate();
                                                break;
                                            }
                                        }
                                        firstEntry = this.subscriptions.getNextEntry(firstEntry);
                                    }
                                } else {
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug(new StringBuffer().append("expired message: ").append(activeMQMessage).toString());
                                    }
                                    this.deadLetterPolicy.sendToDeadLetter(activeMQMessage);
                                    activeMQMessage = null;
                                }
                            }
                        }
                    }
                    if (!z) {
                        if (activeMQMessage != null) {
                            if (z2) {
                                this.queue.enqueueFirstNoBlock(activeMQMessage);
                            } else {
                                this.queue.enqueueNoBlock(activeMQMessage);
                            }
                        }
                        if (this.running.get()) {
                            int i3 = i;
                            i++;
                            if (i3 <= i2 || !this.queue.isEmpty()) {
                                Thread.sleep(SQLParserConstants.PRECISION);
                            } else {
                                synchronized (this.running) {
                                    this.running.commit(true, false);
                                }
                            }
                        } else {
                            continue;
                        }
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    this.log.warn("stop dispatching", th);
                    stop();
                }
            }
        }
    }

    private TransientQueueSubscription findMatch(ConsumerInfo consumerInfo) throws JMSException {
        TransientQueueSubscription transientQueueSubscription = null;
        synchronized (this.subscriptionMutex) {
            QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
            while (true) {
                if (firstEntry == null) {
                    break;
                }
                TransientQueueSubscription transientQueueSubscription2 = (TransientQueueSubscription) firstEntry.getElement();
                if (transientQueueSubscription2.getConsumerInfo().equals(consumerInfo)) {
                    transientQueueSubscription = transientQueueSubscription2;
                    break;
                }
                firstEntry = this.subscriptions.getNextEntry(firstEntry);
            }
        }
        return transientQueueSubscription;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.activemq.service.MessageContainer, org.activemq.service.MessageContainerAdmin
    public String getDestinationName() {
        return this.destination.getPhysicalName();
    }

    @Override // org.activemq.service.MessageContainer
    public void addMessage(ActiveMQMessage activeMQMessage) throws JMSException {
    }

    @Override // org.activemq.service.MessageContainer
    public void delete(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
    }

    @Override // org.activemq.service.MessageContainer
    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        return null;
    }

    @Override // org.activemq.service.MessageContainer
    public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    @Override // org.activemq.service.MessageContainer
    public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    @Override // org.activemq.service.MessageContainer
    public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
        return false;
    }

    protected void clear() {
        this.queue.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeExpiredMessages() {
        long currentTimeMillis = System.currentTimeMillis();
        List contents = this.queue.getContents();
        for (int i = 0; i < contents.size(); i++) {
            ActiveMQMessage activeMQMessage = (ActiveMQMessage) contents.get(i);
            if (activeMQMessage.isExpired(currentTimeMillis)) {
                this.queue.remove((MemoryManageable) activeMQMessage);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("expired message: ").append(activeMQMessage).toString());
                }
            }
        }
    }

    protected void startRunning() {
        if (this.running.get() || !this.started.get() || this.subscriptions.isEmpty()) {
            return;
        }
        synchronized (this.running) {
            if (this.running.commit(false, true)) {
                try {
                    this.threadPool.execute(this);
                } catch (InterruptedException e) {
                    this.log.error(new StringBuffer().append(this).append(" Couldn't start executing ").toString(), e);
                }
            }
        }
    }

    @Override // org.activemq.service.MessageContainer
    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    @Override // org.activemq.service.MessageContainerAdmin
    public void empty() throws JMSException {
    }

    @Override // org.activemq.service.MessageContainer
    public boolean isDeadLetterQueue() {
        return false;
    }

    private synchronized void doAdvisoryDispatchMessage(ActiveMQMessage activeMQMessage) {
        if (activeMQMessage != null) {
            try {
                if (activeMQMessage.isAdvisory() && !activeMQMessage.isExpired()) {
                    synchronized (this.subscriptionMutex) {
                        QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
                        while (true) {
                            if (firstEntry == null) {
                                break;
                            }
                            TransientQueueSubscription transientQueueSubscription = (TransientQueueSubscription) firstEntry.getElement();
                            if (transientQueueSubscription.isTarget(activeMQMessage)) {
                                transientQueueSubscription.doDispatch(activeMQMessage);
                                break;
                            }
                            firstEntry = this.subscriptions.getNextEntry(firstEntry);
                        }
                    }
                }
            } catch (JMSException e) {
                this.log.warn("Failed to dispatch advisory", e);
            }
        }
    }
}
