package org.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.Filter;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.io.util.MemoryBoundedQueueManager;
import org.activemq.io.util.MemoryManageable;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.MessageContainerAdmin;
import org.activemq.service.MessageIdentity;
import org.activemq.service.QueueListEntry;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.Service;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.service.impl.DefaultQueueList;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.services.daemon.DaemonService;
import org.apache.derby.impl.sql.compile.SQLParserConstants;

/* loaded from: input_file:activemq-core-3.2.jar:org/activemq/service/boundedvm/DurableQueueBoundedMessageContainer.class */
public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin {
    private final MessageStore messageStore;
    private final MemoryBoundedQueueManager queueManager;
    private final ActiveMQDestination destination;
    private final Executor threadPool;
    private final DeadLetterPolicy deadLetterPolicy;
    private final Log log;
    private final MemoryBoundedQueue queue;
    private final DefaultQueueList subscriptions = new DefaultQueueList();
    private final SynchronizedBoolean started = new SynchronizedBoolean(false);
    private final SynchronizedBoolean running = new SynchronizedBoolean(false);
    private final Object dispatchMutex = new Object();
    private final Object subscriptionsMutex = new Object();
    private long idleTimestamp;

    public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor executor, MemoryBoundedQueueManager memoryBoundedQueueManager, ActiveMQDestination activeMQDestination, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.messageStore = messageStore;
        this.threadPool = executor;
        this.queueManager = memoryBoundedQueueManager;
        this.destination = activeMQDestination;
        this.deadLetterPolicy = deadLetterPolicy;
        this.queue = memoryBoundedQueueManager.getMemoryBoundedQueue(new StringBuffer().append("DURABLE_QUEUE:-").append(activeMQDestination.getPhysicalName()).toString());
        this.log = LogFactory.getLog(new StringBuffer().append("DurableQueueBoundedMessageContainer:- ").append(activeMQDestination).toString());
    }

    public boolean isActive() {
        return !this.subscriptions.isEmpty();
    }

    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    public long getIdleTimestamp() {
        return this.idleTimestamp;
    }

    public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo consumerInfo, BrokerClient brokerClient) throws JMSException {
        DurableQueueSubscription findMatch = findMatch(consumerInfo);
        if (findMatch == null) {
            findMatch = new DurableQueueSubscription(brokerClient, this.queueManager.getMemoryBoundedQueue(new StringBuffer().append("DURABLE_SUB:-").append(consumerInfo.getConsumerId()).toString()), this.queueManager.getMemoryBoundedQueue(new StringBuffer().append("DURABLE_SUB_ACKED:-").append(consumerInfo.getConsumerId()).toString()), filter, consumerInfo);
            synchronized (this.subscriptionsMutex) {
                this.idleTimestamp = 0L;
                this.subscriptions.add(findMatch);
                checkRunning();
            }
        }
        return findMatch;
    }

    public void removeConsumer(ConsumerInfo consumerInfo) throws JMSException {
        DurableQueueSubscription findMatch;
        synchronized (this.subscriptionsMutex) {
            findMatch = findMatch(consumerInfo);
            if (findMatch != null) {
                this.subscriptions.remove(findMatch);
                if (this.subscriptions.isEmpty()) {
                    this.running.commit(true, false);
                    this.idleTimestamp = System.currentTimeMillis();
                }
            }
        }
        if (findMatch != null) {
            List undeliveredMessages = findMatch.getUndeliveredMessages();
            for (int size = undeliveredMessages.size() - 1; size >= 0; size--) {
                this.queue.enqueueFirstNoBlock((MemoryManageable) undeliveredMessages.get(size));
            }
            if (findMatch.isBrowser()) {
                List listAckedMessages = findMatch.listAckedMessages();
                for (int size2 = listAckedMessages.size() - 1; size2 >= 0; size2--) {
                    this.queue.enqueueFirstNoBlock((MemoryManageable) listAckedMessages.get(size2));
                }
                findMatch.removeAllAckedMessages();
            }
            findMatch.close();
        }
    }

    @Override // org.activemq.service.Service
    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            this.messageStore.start();
            this.queueManager.setMemoryLimitEnforced(false);
            try {
                this.messageStore.recover(new RecoveryListener(this) { // from class: org.activemq.service.boundedvm.DurableQueueBoundedMessageContainer.1
                    private final DurableQueueBoundedMessageContainer this$0;

                    {
                        this.this$0 = this;
                    }

                    @Override // org.activemq.store.RecoveryListener
                    public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
                        this.this$0.recoverMessageToBeDelivered(messageIdentity);
                    }
                });
                this.queueManager.setMemoryLimitEnforced(true);
                checkRunning();
            } catch (Throwable th) {
                this.queueManager.setMemoryLimitEnforced(true);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
        this.queue.enqueue(new DurableMessagePointer(this.messageStore, getDestination(), this.messageStore.getMessage(messageIdentity)));
    }

    public void enqueue(ActiveMQMessage activeMQMessage) throws JMSException {
        DurableMessagePointer durableMessagePointer = new DurableMessagePointer(this.messageStore, getDestination(), activeMQMessage);
        if (activeMQMessage.isAdvisory()) {
            doAdvisoryDispatchMessage(durableMessagePointer);
        } else {
            this.messageStore.addMessage(activeMQMessage);
            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(this, durableMessagePointer) { // from class: org.activemq.service.boundedvm.DurableQueueBoundedMessageContainer.2
                private final DurableMessagePointer val$pointer;
                private final DurableQueueBoundedMessageContainer this$0;

                {
                    this.this$0 = this;
                    this.val$pointer = durableMessagePointer;
                }

                @Override // org.activemq.service.TransactionTask
                public void execute() throws Throwable {
                    this.this$0.queue.enqueue(this.val$pointer);
                    this.this$0.checkRunning();
                }
            });
        }
    }

    public void redeliver(DurableMessagePointer durableMessagePointer) {
        this.queue.enqueueFirstNoBlock(durableMessagePointer);
        checkRunning();
    }

    public void redeliver(List list) {
        this.queue.enqueueAllFirstNoBlock(list);
        checkRunning();
    }

    @Override // org.activemq.service.Service
    public void stop() {
        this.started.set(false);
        this.running.set(false);
        this.queue.clear();
    }

    public void close() throws JMSException {
        if (this.started.get()) {
            stop();
        }
        synchronized (this.subscriptionsMutex) {
            QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
            while (firstEntry != null) {
                ((DurableQueueSubscription) firstEntry.getElement()).close();
                firstEntry = this.subscriptions.getNextEntry(firstEntry);
            }
            this.subscriptions.clear();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        synchronized (this.dispatchMutex) {
            DurableMessagePointer durableMessagePointer = null;
            int i = 0;
            int i2 = DaemonService.TIMER_DELAY / SQLParserConstants.PLI;
            HashMap hashMap = new HashMap();
            while (this.started.get() && this.running.get()) {
                try {
                    boolean z = false;
                    boolean z2 = false;
                    synchronized (this.subscriptionsMutex) {
                        if (!this.subscriptions.isEmpty()) {
                            durableMessagePointer = (DurableMessagePointer) this.queue.dequeue(SQLParserConstants.PLI);
                            if (durableMessagePointer != null) {
                                ActiveMQMessage message = durableMessagePointer.getMessage();
                                if (!message.isExpired()) {
                                    QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
                                    while (true) {
                                        if (firstEntry == null) {
                                            break;
                                        }
                                        DurableQueueSubscription durableQueueSubscription = (DurableQueueSubscription) firstEntry.getElement();
                                        if (durableQueueSubscription.isTarget(message)) {
                                            z2 = true;
                                            if (message.isMessagePart()) {
                                                DurableQueueSubscription durableQueueSubscription2 = (DurableQueueSubscription) hashMap.get(message.getParentMessageID());
                                                if (durableQueueSubscription2 == null) {
                                                    durableQueueSubscription2 = durableQueueSubscription;
                                                    hashMap.put(message.getParentMessageID(), durableQueueSubscription2);
                                                }
                                                durableQueueSubscription2.doDispatch(durableMessagePointer);
                                                if (message.isLastMessagePart()) {
                                                    hashMap.remove(message.getParentMessageID());
                                                }
                                                durableMessagePointer = null;
                                                z = true;
                                                i = 0;
                                            } else if (durableQueueSubscription.canAcceptMessages()) {
                                                durableQueueSubscription.doDispatch(durableMessagePointer);
                                                durableMessagePointer = null;
                                                z = true;
                                                i = 0;
                                                this.subscriptions.rotate();
                                                break;
                                            }
                                        }
                                        firstEntry = this.subscriptions.getNextEntry(firstEntry);
                                    }
                                } else {
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug(new StringBuffer().append("expired message: ").append(durableMessagePointer).toString());
                                    }
                                    if (this.deadLetterPolicy != null) {
                                        this.deadLetterPolicy.sendToDeadLetter(durableMessagePointer.getMessage());
                                    }
                                    durableMessagePointer = null;
                                }
                            }
                        }
                    }
                    if (!z) {
                        if (durableMessagePointer != null) {
                            if (z2) {
                                this.queue.enqueueFirstNoBlock(durableMessagePointer);
                            } else {
                                this.queue.enqueueNoBlock(durableMessagePointer);
                            }
                        }
                        if (this.running.get()) {
                            int i3 = i;
                            i++;
                            if (i3 <= i2 || !this.queue.isEmpty()) {
                                Thread.sleep(SQLParserConstants.PLI);
                            } else {
                                synchronized (this.running) {
                                    this.running.commit(true, false);
                                }
                            }
                        } else {
                            continue;
                        }
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    this.log.warn("stop dispatching", th);
                    stop();
                }
            }
        }
    }

    private DurableQueueSubscription findMatch(ConsumerInfo consumerInfo) throws JMSException {
        DurableQueueSubscription durableQueueSubscription = null;
        synchronized (this.subscriptionsMutex) {
            QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
            while (true) {
                if (firstEntry == null) {
                    break;
                }
                DurableQueueSubscription durableQueueSubscription2 = (DurableQueueSubscription) firstEntry.getElement();
                if (durableQueueSubscription2.getConsumerInfo().equals(consumerInfo)) {
                    durableQueueSubscription = durableQueueSubscription2;
                    break;
                }
                firstEntry = this.subscriptions.getNextEntry(firstEntry);
            }
        }
        return durableQueueSubscription;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.activemq.service.MessageContainerAdmin
    public String getDestinationName() {
        return this.destination.getPhysicalName();
    }

    protected void clear() {
        this.queue.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeExpiredMessages() {
        long currentTimeMillis = System.currentTimeMillis();
        List contents = this.queue.getContents();
        for (int i = 0; i < contents.size(); i++) {
            DurableMessagePointer durableMessagePointer = (DurableMessagePointer) contents.get(i);
            if (durableMessagePointer.getMessage().isExpired(currentTimeMillis)) {
                this.queue.remove((MemoryManageable) durableMessagePointer);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("expired message: ").append(durableMessagePointer).toString());
                }
            }
        }
    }

    protected void checkRunning() {
        if (this.running.get() || !this.started.get() || this.subscriptions.isEmpty()) {
            return;
        }
        synchronized (this.running) {
            if (this.running.commit(false, true)) {
                try {
                    this.threadPool.execute(this);
                } catch (InterruptedException e) {
                    this.log.error(new StringBuffer().append(this).append(" Couldn't start executing ").toString(), e);
                }
            }
        }
    }

    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    @Override // org.activemq.service.MessageContainerAdmin
    public void empty() throws JMSException {
        if (!this.subscriptions.isEmpty()) {
            throw new JMSException("Cannot empty a queue while it is use.");
        }
        this.messageStore.removeAllMessages();
        this.queue.clear();
    }

    private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer durableMessagePointer) {
        ActiveMQMessage message = durableMessagePointer.getMessage();
        try {
            if (message.isAdvisory() && !message.isExpired()) {
                synchronized (this.subscriptionsMutex) {
                    QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
                    while (true) {
                        if (firstEntry == null) {
                            break;
                        }
                        DurableQueueSubscription durableQueueSubscription = (DurableQueueSubscription) firstEntry.getElement();
                        if (durableQueueSubscription.isTarget(message)) {
                            durableQueueSubscription.doDispatch(durableMessagePointer);
                            break;
                        }
                        firstEntry = this.subscriptions.getNextEntry(firstEntry);
                    }
                }
            }
        } catch (JMSException e) {
            this.log.warn("Failed to dispatch advisory", e);
        }
    }
}
