/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.AndFilter;
import org.activemq.filter.DestinationMap;
import org.activemq.filter.Filter;
import org.activemq.filter.FilterFactory;
import org.activemq.filter.FilterFactoryImpl;
import org.activemq.filter.NoLocalFilter;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.MessageAck;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.Dispatcher;
import org.activemq.service.MessageContainer;
import org.activemq.service.QueueList;
import org.activemq.service.QueueListEntry;
import org.activemq.service.QueueMessageContainer;
import org.activemq.service.QueueMessageContainerManager;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.Subscription;
import org.activemq.service.SubscriptionContainer;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.service.impl.DefaultQueueList;
import org.activemq.service.impl.DispatcherImpl;
import org.activemq.service.impl.DurableQueueMessageContainer;
import org.activemq.service.impl.MessageContainerManagerSupport;
import org.activemq.service.impl.SubscriptionContainerImpl;
import org.activemq.service.impl.SubscriptionImpl;
import org.activemq.store.PersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DurableQueueMessageContainerManager
extends MessageContainerManagerSupport
implements QueueMessageContainerManager {
    private static final Log log = LogFactory.getLog((Class)DurableQueueMessageContainerManager.class);
    private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
    private PersistenceAdapter persistenceAdapter;
    protected SubscriptionContainer subscriptionContainer;
    protected FilterFactory filterFactory;
    protected Map activeSubscriptions = new ConcurrentHashMap();
    protected Map browsers = new ConcurrentHashMap();
    protected Map messagePartSubscribers = new ConcurrentHashMap();
    protected DestinationMap destinationMap = new DestinationMap();
    private Object subscriptionMutex = new Object();

    public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy, deadLetterPolicy), new FilterFactoryImpl(), new DispatcherImpl());
    }

    public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
        super(dispatcher);
        this.persistenceAdapter = persistenceAdapter;
        this.subscriptionContainer = subscriptionContainer;
        this.filterFactory = filterFactory;
    }

    private boolean isManagerFor(ActiveMQDestination destination) {
        return destination != null && destination.isQueue() && !destination.isTemporary();
    }

    private boolean isManagerFor(ActiveMQDestination destination, boolean persistentOp) {
        return this.isManagerFor(destination) && persistentOp;
    }

    public Map getLocalDestinations() {
        HashMap<String, ActiveMQDestination> localDestinations = new HashMap<String, ActiveMQDestination>();
        Iterator iter = this.subscriptionContainer.subscriptionIterator();
        while (iter.hasNext()) {
            Subscription sub = (Subscription)iter.next();
            if (!sub.isLocalSubscription()) continue;
            ActiveMQDestination dest = sub.getDestination();
            localDestinations.put(dest.getPhysicalName(), dest);
        }
        return Collections.unmodifiableMap(localDestinations);
    }

    public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        if (!this.isManagerFor(info.getDestination())) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Adding consumer: " + info));
        }
        this.getContainer(info.getDestination().getPhysicalName());
        Subscription sub = this.subscriptionContainer.makeSubscription(this.dispatcher, client, info, this.createFilter(info));
        this.dispatcher.addActiveSubscription(client, sub);
        this.updateActiveSubscriptions(sub);
        sub.setActive(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Removing consumer: " + info));
        }
        if (info.getDestination() != null && info.getDestination().isQueue()) {
            Object object = this.subscriptionMutex;
            synchronized (object) {
                Subscription sub = this.subscriptionContainer.removeSubscription(info.getConsumerId());
                if (sub != null) {
                    sub.setActive(false);
                    sub.clear();
                    this.dispatcher.removeActiveSubscription(client, sub);
                    Iterator iter = this.messageContainers.values().iterator();
                    while (iter.hasNext()) {
                        QueueMessageContainer container = (QueueMessageContainer)iter.next();
                        if (!container.getDestinationName().equals(sub.getDestination().getPhysicalName())) continue;
                        QueueList list = this.getSubscriptionList(container);
                        list.remove(sub);
                        if (list.isEmpty()) {
                            this.activeSubscriptions.remove(sub.getDestination().getPhysicalName());
                        }
                        list = this.getBrowserList(container);
                        list.remove(sub);
                        if (!list.isEmpty()) continue;
                        this.browsers.remove(sub.getDestination().getPhysicalName());
                    }
                }
            }
        }
    }

    public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
    }

    public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
        ActiveMQDestination dest = (ActiveMQDestination)message.getJMSDestination();
        if (!this.isManagerFor(dest, message.getJMSDeliveryMode() == 2)) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Dispaching message: " + message));
        }
        this.getContainer(((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
        Set set = this.destinationMap.get(message.getJMSActiveMQDestination());
        Iterator i = set.iterator();
        while (i.hasNext()) {
            QueueMessageContainer container = (QueueMessageContainer)i.next();
            container.addMessage(message);
            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){

                public void execute() throws Throwable {
                    DurableQueueMessageContainerManager.this.dispatcher.wakeup();
                    DurableQueueMessageContainerManager.this.updateSendStats(client, message);
                }
            });
        }
    }

    public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
        if (!this.isManagerFor(ack.getDestination(), ack.isPersistent())) {
            return;
        }
        Subscription sub = this.subscriptionContainer.getSubscription(ack.getConsumerId());
        if (sub == null) {
            return;
        }
        sub.messageConsumed(ack);
        if (ack.isMessageRead()) {
            this.updateAcknowledgeStats(client, sub);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void poll() throws JMSException {
        Object object = this.subscriptionMutex;
        synchronized (object) {
            Iterator iter = this.activeSubscriptions.keySet().iterator();
            while (iter.hasNext()) {
                QueueMessageContainer container = (QueueMessageContainer)iter.next();
                QueueList browserList = (QueueList)this.browsers.get(container);
                this.doPeek(container, browserList);
                QueueList list = (QueueList)this.activeSubscriptions.get(container);
                this.doPoll(container, list);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageContainer getContainer(String destinationName) throws JMSException {
        MessageContainer container = (MessageContainer)this.messageContainers.get(destinationName);
        if (container == null) {
            Object object = this.subscriptionMutex;
            synchronized (object) {
                container = super.getContainer(destinationName);
            }
        }
        return container;
    }

    protected MessageContainer createContainer(String destinationName) throws JMSException {
        DurableQueueMessageContainer container = new DurableQueueMessageContainer(this.persistenceAdapter, this.persistenceAdapter.createQueueMessageStore(destinationName), destinationName);
        Iterator iter = this.subscriptionContainer.subscriptionIterator();
        while (iter.hasNext()) {
            Subscription sub = (Subscription)iter.next();
            if (sub.isBrowser()) {
                this.updateBrowsers(container, sub);
                continue;
            }
            this.updateActiveSubscriptions(container, sub);
        }
        ActiveMQQueue key = new ActiveMQQueue(destinationName);
        this.destinationMap.put(key, container);
        return container;
    }

    protected Destination createDestination(String destinationName) {
        return new ActiveMQQueue(destinationName);
    }

    private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
        if (browsers != null && browsers.size() > 0) {
            for (int i = 0; i < browsers.size(); ++i) {
                SubscriptionImpl sub = (SubscriptionImpl)browsers.get(i);
                int count = 0;
                ActiveMQMessage msg = null;
                do {
                    if ((msg = container.peekNext(sub.getLastMessageIdentity())) == null) continue;
                    if (sub.isTarget(msg)) {
                        System.out.println("browser dispatch: " + msg.getJMSMessageID());
                        sub.addMessage(container, msg);
                        this.dispatcher.wakeup(sub);
                        continue;
                    }
                    sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
                } while (msg != null && !sub.isAtPrefetchLimit() && count++ < 50);
            }
        }
    }

    private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
        int count = 0;
        ActiveMQMessage msg = null;
        if (subList != null && subList.size() > 0) {
            do {
                boolean dispatched = false;
                msg = container.poll();
                if (msg == null) continue;
                QueueListEntry entry = subList.getFirstEntry();
                boolean targeted = false;
                while (entry != null) {
                    SubscriptionImpl sub = (SubscriptionImpl)entry.getElement();
                    if (sub.isTarget(msg)) {
                        targeted = true;
                        if (msg.isMessagePart()) {
                            SubscriptionImpl sameTarget = (SubscriptionImpl)this.messagePartSubscribers.get(msg.getParentMessageID());
                            if (sameTarget == null) {
                                sameTarget = sub;
                                this.messagePartSubscribers.put(msg.getParentMessageID(), sameTarget);
                            }
                            sameTarget.addMessage(container, msg);
                            if (msg.isLastMessagePart()) {
                                this.messagePartSubscribers.remove(msg.getParentMessageID());
                            }
                            dispatched = true;
                            this.dispatcher.wakeup(sameTarget);
                            break;
                        }
                        if (!sub.isAtPrefetchLimit()) {
                            System.out.println("dispatching: " + msg.getJMSMessageID());
                            sub.addMessage(container, msg);
                            dispatched = true;
                            this.dispatcher.wakeup(sub);
                            subList.rotate();
                            break;
                        }
                    }
                    entry = subList.getNextEntry(entry);
                }
                if (dispatched) continue;
                if (!targeted) break;
                container.returnMessage(msg.getJMSMessageIdentity());
                break;
            } while (msg != null && count++ < 50);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
        Object object = this.subscriptionMutex;
        synchronized (object) {
            boolean processedSubscriptionContainer = false;
            String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
            Iterator iter = this.messageContainers.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = iter.next();
                String destinationName = (String)entry.getKey();
                QueueMessageContainer container = (QueueMessageContainer)entry.getValue();
                if (destinationName.equals(subscriptionPhysicalName)) {
                    processedSubscriptionContainer = true;
                }
                this.processSubscription(subscription, container);
            }
            if (!processedSubscriptionContainer) {
                this.processSubscription(subscription, (QueueMessageContainer)this.getContainer(subscriptionPhysicalName));
            }
        }
    }

    protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
        if (subscription.isBrowser()) {
            this.updateBrowsers(container, subscription);
        } else {
            this.updateActiveSubscriptions(container, subscription);
        }
    }

    private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
        if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
            container.reset();
            QueueList list = this.getSubscriptionList(container);
            if (!list.contains(sub)) {
                list.add(sub);
            }
        }
    }

    private QueueList getSubscriptionList(QueueMessageContainer container) {
        QueueList list = (QueueList)this.activeSubscriptions.get(container);
        if (list == null) {
            list = new DefaultQueueList();
            this.activeSubscriptions.put(container, list);
        }
        return list;
    }

    private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
        if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
            container.reset();
            QueueList list = this.getBrowserList(container);
            if (!list.contains(sub)) {
                list.add(sub);
            }
        }
    }

    private QueueList getBrowserList(QueueMessageContainer container) {
        QueueList list = (QueueList)this.browsers.get(container);
        if (list == null) {
            list = new DefaultQueueList();
            this.browsers.put(container, list);
        }
        return list;
    }

    protected Filter createFilter(ConsumerInfo info) throws JMSException {
        Filter filter = this.filterFactory.createFilter(info.getDestination(), info.getSelector());
        if (info.isNoLocal()) {
            filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
        }
        return filter;
    }

    public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
        if (!dest.isQueue()) {
            return;
        }
        super.createMessageContainer(dest);
    }

    public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
        if (!dest.isQueue()) {
            return;
        }
        super.destroyMessageContainer(dest);
        this.destinationMap.removeAll(dest);
    }

    public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage message) throws JMSException {
        QueueMessageContainer container = (QueueMessageContainer)this.getContainer(deadLetterName);
        container.setDeadLetterQueue(true);
        container.addMessage(message);
        this.dispatcher.wakeup();
    }
}

