/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.store.journal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import javax.jms.JMSException;
import org.activeio.journal.RecordLocation;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.MessageAck;
import org.activemq.service.MessageIdentity;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.activemq.store.cache.CacheMessageStore;
import org.activemq.store.cache.CacheMessageStoreAware;
import org.activemq.store.journal.JournalPersistenceAdapter;
import org.activemq.store.journal.JournalTransactionStore;
import org.activemq.util.Callback;
import org.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JournalMessageStore
implements MessageStore,
CacheMessageStoreAware {
    private static final Log log = LogFactory.getLog((Class)JournalMessageStore.class);
    protected final JournalPersistenceAdapter peristenceAdapter;
    protected final MessageStore longTermStore;
    protected final String destinationName;
    protected final TransactionTemplate transactionTemplate;
    private LinkedHashMap addedMessageIds = new LinkedHashMap();
    private ArrayList removedMessageLocations = new ArrayList();
    protected HashSet inFlightTxLocations = new HashSet();
    protected RecordLocation lastLocation;
    private MessageStore cacheMessageStore = this;
    protected final JournalTransactionStore transactionStore;
    private LinkedHashMap cpAddedMessageIds;
    int removedFromJournal;

    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) {
        this.peristenceAdapter = adapter;
        this.transactionStore = this.peristenceAdapter.getTransactionStore();
        this.longTermStore = checkpointStore;
        this.destinationName = destinationName;
        this.transactionTemplate = new TransactionTemplate(adapter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addMessage(final ActiveMQMessage message) throws JMSException {
        final boolean debug = log.isDebugEnabled();
        final RecordLocation location = this.peristenceAdapter.writePacket(this.destinationName, message, message.isReceiptRequired());
        if (!TransactionManager.isCurrentTransaction()) {
            if (debug) {
                log.debug((Object)("Journalled message add: " + message.getJMSMessageID() + " at " + location));
            }
            this.addMessage(message, location);
        } else {
            if (debug) {
                log.debug((Object)("Journalled in flight message add: " + message.getJMSMessageID() + " at " + location));
            }
            JournalMessageStore journalMessageStore = this;
            synchronized (journalMessageStore) {
                this.inFlightTxLocations.add(location);
            }
            Transaction tx = TransactionManager.getContexTransaction();
            this.transactionStore.addMessage(this, message, location);
            tx.addPostCommitTask(new TransactionTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void execute() throws Throwable {
                    if (debug) {
                        log.debug((Object)("In flight message add commit: " + message.getJMSMessageID() + " at " + location));
                    }
                    JournalMessageStore journalMessageStore = JournalMessageStore.this;
                    synchronized (journalMessageStore) {
                        JournalMessageStore.this.inFlightTxLocations.remove(location);
                        JournalMessageStore.this.addMessage(message, location);
                    }
                }
            });
            tx.addPostRollbackTask(new TransactionTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void execute() throws Throwable {
                    if (debug) {
                        log.debug((Object)("In flight message add rollback: " + message.getJMSMessageID() + " at " + location));
                    }
                    JournalMessageStore journalMessageStore = JournalMessageStore.this;
                    synchronized (journalMessageStore) {
                        JournalMessageStore.this.inFlightTxLocations.remove(location);
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addMessage(ActiveMQMessage message, RecordLocation location) {
        JournalMessageStore journalMessageStore = this;
        synchronized (journalMessageStore) {
            this.lastLocation = location;
            MessageIdentity id = message.getJMSMessageIdentity();
            this.addedMessageIds.put(id, location);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeMessage(final MessageAck ack) throws JMSException {
        final boolean debug = log.isDebugEnabled();
        final RecordLocation location = this.peristenceAdapter.writePacket(this.destinationName, ack, ack.isReceiptRequired());
        if (!TransactionManager.isCurrentTransaction()) {
            if (debug) {
                log.debug((Object)("Journalled message remove: " + ack.getMessageID() + " at " + location));
            }
            this.removeMessage(ack, location);
        } else {
            if (debug) {
                log.debug((Object)("Journalled in flight message remove: " + ack.getMessageID() + " at " + location));
            }
            JournalMessageStore journalMessageStore = this;
            synchronized (journalMessageStore) {
                this.inFlightTxLocations.add(location);
            }
            Transaction tx = TransactionManager.getContexTransaction();
            this.transactionStore.removeMessage(this, ack, location);
            tx.addPostCommitTask(new TransactionTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void execute() throws Throwable {
                    if (debug) {
                        log.debug((Object)("In flight message remove commit: " + ack.getMessageID() + " at " + location));
                    }
                    JournalMessageStore journalMessageStore = JournalMessageStore.this;
                    synchronized (journalMessageStore) {
                        JournalMessageStore.this.inFlightTxLocations.remove(location);
                        JournalMessageStore.this.removeMessage(ack, location);
                    }
                }
            });
            tx.addPostRollbackTask(new TransactionTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void execute() throws Throwable {
                    if (debug) {
                        log.debug((Object)("In flight message remove rollback: " + ack.getMessageID() + " at " + location));
                    }
                    JournalMessageStore journalMessageStore = JournalMessageStore.this;
                    synchronized (journalMessageStore) {
                        JournalMessageStore.this.inFlightTxLocations.remove(location);
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeMessage(MessageAck ack, RecordLocation location) {
        JournalMessageStore journalMessageStore = this;
        synchronized (journalMessageStore) {
            this.lastLocation = location;
            MessageIdentity id = ack.getMessageIdentity();
            RecordLocation msgLocation = (RecordLocation)this.addedMessageIds.remove(id);
            if (msgLocation == null) {
                this.removedMessageLocations.add(ack);
            } else {
                ++this.removedFromJournal;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RecordLocation checkpoint() throws JMSException {
        ArrayList cpActiveJournalLocations;
        ArrayList cpRemovedMessageLocations;
        JournalMessageStore journalMessageStore = this;
        synchronized (journalMessageStore) {
            this.cpAddedMessageIds = this.addedMessageIds;
            cpRemovedMessageLocations = this.removedMessageLocations;
            this.inFlightTxLocations.removeAll(this.removedMessageLocations);
            this.inFlightTxLocations.removeAll(this.addedMessageIds.values());
            cpActiveJournalLocations = new ArrayList(this.inFlightTxLocations);
            this.addedMessageIds = new LinkedHashMap();
            this.removedMessageLocations = new ArrayList();
            log.debug((Object)("removedFromJournal=" + this.removedFromJournal));
            this.removedFromJournal = 0;
        }
        final boolean debug = log.isDebugEnabled();
        if (debug) {
            log.debug((Object)("Checkpoint: " + this.destinationName));
        }
        final int[] messagesAdded = new int[]{0};
        final int[] messagesRemoved = new int[]{0};
        this.transactionTemplate.run(new Callback(){

            public void execute() throws Throwable {
                Iterator<Object> iterator = JournalMessageStore.this.cpAddedMessageIds.keySet().iterator();
                while (iterator.hasNext()) {
                    ActiveMQMessage msg;
                    MessageIdentity identity = (MessageIdentity)iterator.next();
                    if (debug) {
                        log.debug((Object)("Adding: " + identity.getMessageID()));
                    }
                    if ((msg = JournalMessageStore.this.getCacheMessage(identity)) == null) {
                        RecordLocation location = (RecordLocation)JournalMessageStore.this.cpAddedMessageIds.get(identity);
                        msg = (ActiveMQMessage)JournalMessageStore.this.peristenceAdapter.readPacket(location);
                    }
                    if (msg != null) {
                        try {
                            JournalMessageStore.this.longTermStore.addMessage(msg);
                            messagesAdded[0] = messagesAdded[0] + 1;
                        }
                        catch (Throwable e) {
                            log.warn((Object)("Message could not be added to long term store: " + e.getMessage()), e);
                        }
                        continue;
                    }
                    log.warn((Object)("Journal could not reload message: " + identity));
                }
                iterator = cpRemovedMessageLocations.iterator();
                while (iterator.hasNext()) {
                    try {
                        MessageAck ack = (MessageAck)iterator.next();
                        if (debug) {
                            log.debug((Object)("Removing: " + ack.getMessageID()));
                        }
                        JournalMessageStore.this.longTermStore.removeMessage(ack);
                        messagesRemoved[0] = messagesRemoved[0] + 1;
                    }
                    catch (Throwable e) {
                        log.debug((Object)("Message could not be removed from long term store: " + e.getMessage()), e);
                    }
                }
            }
        });
        log.debug((Object)("Added " + messagesAdded[0] + " message(s) and removed " + messagesRemoved[0] + " message(s). removedFromJournal=" + this.removedFromJournal));
        JournalMessageStore journalMessageStore2 = this;
        synchronized (journalMessageStore2) {
            this.cpAddedMessageIds = null;
        }
        Collections.sort(cpActiveJournalLocations);
        if (debug) {
            log.debug((Object)("In flight journal locations: " + cpActiveJournalLocations));
        }
        if (cpActiveJournalLocations.size() > 0) {
            return (RecordLocation)cpActiveJournalLocations.get(0);
        }
        return this.lastLocation;
    }

    private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
        return this.cacheMessageStore.getMessage(identity);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
        Object location;
        ActiveMQMessage answer = null;
        JournalMessageStore journalMessageStore = this;
        synchronized (journalMessageStore) {
            location = this.addedMessageIds.get(identity);
            if (location == null && this.cpAddedMessageIds != null) {
                location = this.cpAddedMessageIds.get(identity);
            }
        }
        if (location != null) {
            try {
                answer = (ActiveMQMessage)this.peristenceAdapter.readPacket((RecordLocation)location);
                if (answer != null) {
                    return answer;
                }
            }
            catch (Throwable e) {
                // empty catch block
            }
        }
        return this.longTermStore.getMessage(identity);
    }

    public void recover(RecoveryListener listener) throws JMSException {
        this.peristenceAdapter.checkpoint(true);
        this.longTermStore.recover(listener);
    }

    public void start() throws JMSException {
        this.longTermStore.start();
    }

    public void stop() throws JMSException {
        this.longTermStore.stop();
    }

    public MessageStore getLongTermMessageStore() {
        return this.longTermStore;
    }

    public void setCacheMessageStore(CacheMessageStore store) {
        this.cacheMessageStore = store;
        if (this.longTermStore instanceof CacheMessageStoreAware) {
            ((CacheMessageStoreAware)((Object)this.longTermStore)).setCacheMessageStore(store);
        }
    }

    public void removeAllMessages() throws JMSException {
        this.peristenceAdapter.checkpoint(true);
        this.longTermStore.removeAllMessages();
    }

    public void replayAddMessage(ActiveMQMessage msg) {
        try {
            ActiveMQMessage t = this.longTermStore.getMessage(msg.getJMSMessageIdentity());
            if (t == null) {
                this.longTermStore.addMessage(msg);
            }
        }
        catch (Throwable e) {
            log.debug((Object)("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'.  Message may have already been added. reason: " + e));
        }
    }

    public void replayRemoveMessage(MessageAck ack) {
        try {
            ActiveMQMessage t = this.longTermStore.getMessage(ack.getMessageIdentity());
            if (t != null) {
                this.longTermStore.removeMessage(ack);
            }
        }
        catch (Throwable e) {
            log.debug((Object)("Could not replay acknowledge for message '" + ack.getMessageID() + "'.  Message may have already been acknowledged. reason: " + e));
        }
    }
}

