/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.store.journal;

import java.util.HashMap;
import java.util.Iterator;
import javax.jms.JMSException;
import org.activeio.journal.RecordLocation;
import org.activemq.message.ConsumerInfo;
import org.activemq.service.MessageContainer;
import org.activemq.service.MessageIdentity;
import org.activemq.service.SubscriberEntry;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.RecoveryListener;
import org.activemq.store.TopicMessageStore;
import org.activemq.store.journal.JournalAck;
import org.activemq.store.journal.JournalMessageStore;
import org.activemq.store.journal.JournalPersistenceAdapter;
import org.activemq.util.Callback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JournalTopicMessageStore
extends JournalMessageStore
implements TopicMessageStore {
    private static final Log log = LogFactory.getLog((Class)JournalTopicMessageStore.class);
    private MessageContainer messageContainer;
    private TopicMessageStore longTermStore;
    private HashMap ackedLastAckLocations = new HashMap();

    public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, String destinationName) {
        super(adapter, checkpointStore, destinationName);
        this.longTermStore = checkpointStore;
    }

    public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException {
        this.longTermStore.recoverSubscription(subscriptionId, lastDispatchedMessage, listener);
    }

    public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
        return this.longTermStore.getSubscriberEntry(info);
    }

    public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
        this.longTermStore.setSubscriberEntry(info, subscriberEntry);
    }

    public MessageIdentity getLastestMessageIdentity() throws JMSException {
        return this.longTermStore.getLastestMessageIdentity();
    }

    public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
        this.longTermStore.incrementMessageCount(messageId);
    }

    public void decrementMessageCountAndMaybeDelete(MessageIdentity messageId) throws JMSException {
        this.longTermStore.decrementMessageCountAndMaybeDelete(messageId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setLastAcknowledgedMessageIdentity(final String subscription, final MessageIdentity messageIdentity) throws JMSException {
        final RecordLocation location = this.peristenceAdapter.writePacket(this.destinationName, subscription, messageIdentity, false);
        if (!TransactionManager.isCurrentTransaction()) {
            this.acknowledge(subscription, messageIdentity, location);
        } else {
            JournalTopicMessageStore journalTopicMessageStore = this;
            synchronized (journalTopicMessageStore) {
                this.inFlightTxLocations.add(location);
            }
            Transaction tx = TransactionManager.getContexTransaction();
            JournalAck ack = new JournalAck(this.destinationName, subscription, messageIdentity.getMessageID(), tx.getTransactionId());
            this.transactionStore.acknowledge(this, ack, location);
            tx.addPostCommitTask(new TransactionTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void execute() throws Throwable {
                    JournalTopicMessageStore journalTopicMessageStore = JournalTopicMessageStore.this;
                    synchronized (journalTopicMessageStore) {
                        JournalTopicMessageStore.this.inFlightTxLocations.remove(location);
                        JournalTopicMessageStore.this.acknowledge(subscription, messageIdentity, location);
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acknowledge(String subscription, MessageIdentity messageIdentity, RecordLocation location) {
        JournalTopicMessageStore journalTopicMessageStore = this;
        synchronized (journalTopicMessageStore) {
            this.lastLocation = location;
            this.ackedLastAckLocations.put(subscription, messageIdentity);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RecordLocation checkpoint() throws JMSException {
        HashMap cpAckedLastAckLocations;
        RecordLocation rc = super.checkpoint();
        JournalTopicMessageStore journalTopicMessageStore = this;
        synchronized (journalTopicMessageStore) {
            cpAckedLastAckLocations = this.ackedLastAckLocations;
            this.ackedLastAckLocations = new HashMap();
        }
        this.transactionTemplate.run(new Callback(){

            public void execute() throws Throwable {
                Iterator iterator = cpAckedLastAckLocations.keySet().iterator();
                while (iterator.hasNext()) {
                    String subscription = (String)iterator.next();
                    MessageIdentity identity = (MessageIdentity)cpAckedLastAckLocations.get(subscription);
                    JournalTopicMessageStore.this.longTermStore.setLastAcknowledgedMessageIdentity(subscription, identity);
                }
            }
        });
        return rc;
    }

    public TopicMessageStore getLongTermTopicMessageStore() {
        return this.longTermStore;
    }

    public void deleteSubscription(String subscription) throws JMSException {
        this.longTermStore.deleteSubscription(subscription);
    }

    public void replayAcknowledge(String subscription, MessageIdentity identity) {
        try {
            this.longTermStore.setLastAcknowledgedMessageIdentity(subscription, identity);
        }
        catch (Throwable e) {
            log.debug((Object)("Could not replay acknowledge for message '" + identity.getMessageID() + "'.  Message may have already been acknowledged. reason: " + e));
        }
    }
}

