/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.service.impl;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.TopicMessageContainer;
import org.codehaus.activemq.service.impl.MessagePointer;
import org.codehaus.activemq.service.impl.SubscriptionImpl;

public class DurableTopicSubscription
extends SubscriptionImpl {
    private static final Log log = LogFactory.getLog((Class)DurableTopicSubscription.class);
    private String persistentKey;

    public DurableTopicSubscription(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        super(dispatcher, client, info, filter, redeliveryPolicy, deadLetterPolicy);
    }

    public synchronized void messageConsumed(MessageAck ack) throws JMSException {
        if (ack.isExpired() || !ack.isMessageRead() && !this.isBrowser()) {
            super.messageConsumed(ack);
        } else {
            HashMap<MessageContainer, MessagePointer> lastMessagePointersPerContainer = new HashMap<MessageContainer, MessagePointer>();
            boolean found = false;
            QueueListEntry queueEntry = this.messagePtrs.getFirstEntry();
            while (queueEntry != null) {
                MessagePointer pointer = (MessagePointer)queueEntry.getElement();
                this.messagePtrs.remove(queueEntry);
                lastMessagePointersPerContainer.put(pointer.getContainer(), pointer);
                this.unconsumedMessagesDispatched.decrement();
                if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
                    found = true;
                    break;
                }
                queueEntry = this.messagePtrs.getNextEntry(queueEntry);
            }
            if (!found) {
                log.warn((Object)("Did not find a matching message for identity: " + ack.getMessageIdentity()));
            }
            Iterator iter = lastMessagePointersPerContainer.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = iter.next();
                TopicMessageContainer container = (TopicMessageContainer)entry.getKey();
                MessagePointer pointer = (MessagePointer)entry.getValue();
                container.setLastAcknowledgedMessageID(this, pointer.getMessageIdentity());
            }
            this.dispatch.wakeup(this);
        }
    }

    public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
        MessagePointer pointer = new MessagePointer(container, ack.getMessageIdentity());
        pointer.setRedelivered(true);
        this.messagePtrs.add(pointer);
        this.unconsumedMessagesDispatched.increment();
    }

    public String getPersistentKey() {
        if (this.persistentKey == null) {
            this.persistentKey = "[" + this.getClientId() + ":" + this.getSubscriberName() + "]";
        }
        return this.persistentKey;
    }
}

