/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.Filter;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.io.util.MemoryBoundedQueueManager;
import org.activemq.io.util.MemoryManageable;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.MessageContainerAdmin;
import org.activemq.service.MessageIdentity;
import org.activemq.service.QueueListEntry;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.Service;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.service.boundedvm.DurableMessagePointer;
import org.activemq.service.boundedvm.DurableQueueSubscription;
import org.activemq.service.impl.DefaultQueueList;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DurableQueueBoundedMessageContainer
implements Service,
Runnable,
MessageContainerAdmin {
    private final MessageStore messageStore;
    private final MemoryBoundedQueueManager queueManager;
    private final ActiveMQDestination destination;
    private final Executor threadPool;
    private final RedeliveryPolicy redeliveryPolicy;
    private final DeadLetterPolicy deadLetterPolicy;
    private final Log log;
    private final MemoryBoundedQueue queue;
    private final DefaultQueueList subscriptions = new DefaultQueueList();
    private final SynchronizedBoolean started = new SynchronizedBoolean(false);
    private final SynchronizedBoolean running = new SynchronizedBoolean(false);
    private final Object dispatchMutex = new Object();
    private final Object subscriptionsMutex = new Object();
    private long idleTimestamp;

    public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager, ActiveMQDestination destination, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.messageStore = messageStore;
        this.threadPool = threadPool;
        this.queueManager = queueManager;
        this.destination = destination;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName());
        this.log = LogFactory.getLog((String)("DurableQueueBoundedMessageContainer:- " + destination));
    }

    public boolean isActive() {
        return !this.subscriptions.isEmpty();
    }

    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    public long getIdleTimestamp() {
        return this.idleTimestamp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client) throws JMSException {
        DurableQueueSubscription ts = this.findMatch(info);
        if (ts == null) {
            MemoryBoundedQueue queue = this.queueManager.getMemoryBoundedQueue("DURABLE_SUB:-" + info.getConsumerId());
            MemoryBoundedQueue ackQueue = this.queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-" + info.getConsumerId());
            ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info);
            Object object = this.subscriptionsMutex;
            synchronized (object) {
                this.idleTimestamp = 0L;
                this.subscriptions.add(ts);
                this.checkRunning();
            }
        }
        return ts;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeConsumer(ConsumerInfo info) throws JMSException {
        Object object = this.subscriptionsMutex;
        synchronized (object) {
            DurableQueueSubscription ts = this.findMatch(info);
            if (ts != null) {
                int i;
                this.subscriptions.remove(ts);
                if (this.subscriptions.isEmpty()) {
                    this.running.commit(true, false);
                    this.idleTimestamp = System.currentTimeMillis();
                }
                List list = ts.getUndeliveredMessages();
                for (i = list.size() - 1; i >= 0; --i) {
                    this.queue.enqueueFirstNoBlock((MemoryManageable)list.get(i));
                }
                if (ts.isBrowser()) {
                    list = ts.listAckedMessages();
                    for (i = list.size() - 1; i >= 0; --i) {
                        this.queue.enqueueFirstNoBlock((MemoryManageable)list.get(i));
                    }
                    ts.removeAllAckedMessages();
                }
                ts.close();
            }
        }
    }

    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            this.messageStore.start();
            this.messageStore.recover(new RecoveryListener(){

                public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
                    DurableQueueBoundedMessageContainer.this.recoverMessageToBeDelivered(messageIdentity);
                }
            });
            this.checkRunning();
        }
    }

    private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException {
        DurableMessagePointer pointer = new DurableMessagePointer(this.messageStore, this.getDestination(), this.messageStore.getMessage(msgId));
        this.queue.enqueue(pointer);
    }

    public void enqueue(ActiveMQMessage message) throws JMSException {
        final DurableMessagePointer pointer = new DurableMessagePointer(this.messageStore, this.getDestination(), message);
        if (message.isAdvisory()) {
            this.doAdvisoryDispatchMessage(pointer);
        } else {
            this.messageStore.addMessage(message);
            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){

                public void execute() throws Throwable {
                    DurableQueueBoundedMessageContainer.this.queue.enqueue(pointer);
                    DurableQueueBoundedMessageContainer.this.checkRunning();
                }
            });
        }
    }

    public void redeliver(DurableMessagePointer message) {
        this.queue.enqueueFirstNoBlock(message);
        this.checkRunning();
    }

    public void redeliver(List messages) {
        this.queue.enqueueAllFirstNoBlock(messages);
        this.checkRunning();
    }

    public void stop() {
        this.started.set(false);
        this.running.set(false);
        this.queue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        if (this.started.get()) {
            this.stop();
        }
        Object object = this.subscriptionsMutex;
        synchronized (object) {
            QueueListEntry entry = this.subscriptions.getFirstEntry();
            while (entry != null) {
                DurableQueueSubscription ts = (DurableQueueSubscription)entry.getElement();
                ts.close();
                entry = this.subscriptions.getNextEntry(entry);
            }
            this.subscriptions.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        Object object = this.dispatchMutex;
        synchronized (object) {
            boolean dispatched = false;
            boolean targeted = false;
            DurableMessagePointer messagePointer = null;
            int notDispatchedCount = 0;
            int sleepTime = 250;
            int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;
            HashMap<String, DurableQueueSubscription> messageParts = new HashMap<String, DurableQueueSubscription>();
            try {
                while (this.started.get() && this.running.get()) {
                    dispatched = false;
                    targeted = false;
                    Object object2 = this.subscriptionsMutex;
                    synchronized (object2) {
                        if (!this.subscriptions.isEmpty() && (messagePointer = (DurableMessagePointer)this.queue.dequeue(sleepTime)) != null) {
                            ActiveMQMessage message = messagePointer.getMessage();
                            if (!message.isExpired()) {
                                QueueListEntry entry = this.subscriptions.getFirstEntry();
                                while (entry != null) {
                                    DurableQueueSubscription ts = (DurableQueueSubscription)entry.getElement();
                                    if (ts.isTarget(message)) {
                                        targeted = true;
                                        if (message.isMessagePart()) {
                                            DurableQueueSubscription sameTarget = (DurableQueueSubscription)messageParts.get(message.getParentMessageID());
                                            if (sameTarget == null) {
                                                sameTarget = ts;
                                                messageParts.put(message.getParentMessageID(), sameTarget);
                                            }
                                            sameTarget.doDispatch(messagePointer);
                                            if (message.isLastMessagePart()) {
                                                messageParts.remove(message.getParentMessageID());
                                            }
                                            messagePointer = null;
                                            dispatched = true;
                                            notDispatchedCount = 0;
                                            break;
                                        }
                                        if (ts.canAcceptMessages()) {
                                            ts.doDispatch(messagePointer);
                                            messagePointer = null;
                                            dispatched = true;
                                            notDispatchedCount = 0;
                                            this.subscriptions.rotate();
                                            break;
                                        }
                                    }
                                    entry = this.subscriptions.getNextEntry(entry);
                                }
                            } else {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug((Object)("expired message: " + messagePointer));
                                }
                                if (this.deadLetterPolicy != null) {
                                    this.deadLetterPolicy.sendToDeadLetter(messagePointer.getMessage());
                                }
                                messagePointer = null;
                            }
                        }
                    }
                    if (dispatched) continue;
                    if (messagePointer != null) {
                        if (targeted) {
                            this.queue.enqueueFirstNoBlock(messagePointer);
                        } else {
                            this.queue.enqueueNoBlock(messagePointer);
                        }
                    }
                    if (!this.running.get()) continue;
                    if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping && this.queue.isEmpty()) {
                        object2 = this.running;
                        synchronized (object2) {
                            this.running.commit(true, false);
                            continue;
                        }
                    }
                    Thread.sleep(sleepTime);
                }
            }
            catch (InterruptedException ie) {
            }
            catch (Throwable e) {
                this.log.warn((Object)"stop dispatching", e);
                this.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
        DurableQueueSubscription result = null;
        Object object = this.subscriptionsMutex;
        synchronized (object) {
            QueueListEntry entry = this.subscriptions.getFirstEntry();
            while (entry != null) {
                DurableQueueSubscription ts = (DurableQueueSubscription)entry.getElement();
                if (ts.getConsumerInfo().equals(info)) {
                    result = ts;
                    break;
                }
                entry = this.subscriptions.getNextEntry(entry);
            }
        }
        return result;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    public String getDestinationName() {
        return this.destination.getPhysicalName();
    }

    protected void clear() {
        this.queue.clear();
    }

    protected void removeExpiredMessages() {
        long currentTime = System.currentTimeMillis();
        List list = this.queue.getContents();
        for (int i = 0; i < list.size(); ++i) {
            DurableMessagePointer msgPointer = (DurableMessagePointer)list.get(i);
            ActiveMQMessage message = msgPointer.getMessage();
            if (!message.isExpired(currentTime)) continue;
            this.queue.remove(msgPointer);
            if (!this.log.isDebugEnabled()) continue;
            this.log.debug((Object)("expired message: " + msgPointer));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void checkRunning() {
        if (!this.running.get() && this.started.get() && !this.subscriptions.isEmpty()) {
            SynchronizedBoolean synchronizedBoolean = this.running;
            synchronized (synchronizedBoolean) {
                if (this.running.commit(false, true)) {
                    try {
                        this.threadPool.execute((Runnable)this);
                    }
                    catch (InterruptedException e) {
                        this.log.error((Object)(this + " Couldn't start executing "), (Throwable)e);
                    }
                }
            }
        }
    }

    public MessageContainerAdmin getMessageContainerAdmin() {
        return this;
    }

    public void empty() throws JMSException {
        this.messageStore.removeAllMessages();
        this.queue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) {
        block7: {
            ActiveMQMessage message = messagePointer.getMessage();
            try {
                if (!message.isAdvisory() || message.isExpired()) break block7;
                Object object = this.subscriptionsMutex;
                synchronized (object) {
                    QueueListEntry entry = this.subscriptions.getFirstEntry();
                    while (entry != null) {
                        DurableQueueSubscription ts = (DurableQueueSubscription)entry.getElement();
                        if (ts.isTarget(message)) {
                            ts.doDispatch(messagePointer);
                            break;
                        }
                        entry = this.subscriptions.getNextEntry(entry);
                    }
                }
            }
            catch (JMSException jmsEx) {
                this.log.warn((Object)"Failed to dispatch advisory", (Throwable)jmsEx);
            }
        }
    }
}

