package org.apache.activemq.store.amq;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.groovy.tools.shell.util.ANSI;

/* loaded from: input_file:artifacts/ESB/jar/activemq-core-5.2.0.jar:org/apache/activemq/store/amq/AMQMessageStore.class */
public class AMQMessageStore implements MessageStore {
    private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
    protected final AMQPersistenceAdapter peristenceAdapter;
    protected final AMQTransactionStore transactionStore;
    protected final ReferenceStore referenceStore;
    protected final ActiveMQDestination destination;
    protected final TransactionTemplate transactionTemplate;
    protected Location lastLocation;
    protected Location lastWrittenLocation;
    protected final TaskRunner asyncWriteTask;
    protected CountDownLatch flushLatch;
    private Map<MessageId, ReferenceStore.ReferenceData> cpAddedMessageIds;
    protected final Lock lock;
    protected Set<Location> inFlightTxLocations = new HashSet();
    private Map<MessageId, ReferenceStore.ReferenceData> messages = new LinkedHashMap();
    private List<MessageAck> messageAcks = new ArrayList();
    private final boolean debug = LOG.isDebugEnabled();
    private final AtomicReference<Location> mark = new AtomicReference<>();

    public AMQMessageStore(AMQPersistenceAdapter aMQPersistenceAdapter, ReferenceStore referenceStore, ActiveMQDestination activeMQDestination) {
        this.peristenceAdapter = aMQPersistenceAdapter;
        this.lock = referenceStore.getStoreLock();
        this.transactionStore = aMQPersistenceAdapter.getTransactionStore();
        this.referenceStore = referenceStore;
        this.destination = activeMQDestination;
        this.transactionTemplate = new TransactionTemplate(aMQPersistenceAdapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
        this.asyncWriteTask = aMQPersistenceAdapter.getTaskRunnerFactory().createTaskRunner(new Task() { // from class: org.apache.activemq.store.amq.AMQMessageStore.1
            @Override // org.apache.activemq.thread.Task
            public boolean iterate() {
                AMQMessageStore.this.asyncWrite();
                return false;
            }
        }, "Checkpoint: " + activeMQDestination);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void setMemoryUsage(MemoryUsage memoryUsage) {
        this.referenceStore.setMemoryUsage(memoryUsage);
    }

    @Override // org.apache.activemq.store.MessageStore
    public final void addMessage(ConnectionContext connectionContext, final Message message) throws IOException {
        final MessageId messageId = message.getMessageId();
        final Location writeCommand = this.peristenceAdapter.writeCommand(message, message.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (this.debug) {
                LOG.debug("Journalled message add for: " + messageId + ", at: " + writeCommand);
            }
            this.peristenceAdapter.addInProgressDataFile(this, writeCommand.getDataFileId());
            addMessage(message, writeCommand);
            return;
        }
        if (this.debug) {
            LOG.debug("Journalled transacted message add for: " + messageId + ", at: " + writeCommand);
        }
        this.lock.lock();
        try {
            this.inFlightTxLocations.add(writeCommand);
            this.lock.unlock();
            this.transactionStore.addMessage(this, message, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.amq.AMQMessageStore.2
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        AMQMessageStore.LOG.debug("Transacted message add commit for: " + messageId + ", at: " + writeCommand);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQMessageStore.this.lock.unlock();
                        AMQMessageStore.this.addMessage(message, writeCommand);
                    } catch (Throwable th) {
                        AMQMessageStore.this.lock.unlock();
                        throw th;
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        AMQMessageStore.LOG.debug("Transacted message add rollback for: " + messageId + ", at: " + writeCommand);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQMessageStore.this.lock.unlock();
                    } catch (Throwable th) {
                        AMQMessageStore.this.lock.unlock();
                        throw th;
                    }
                }
            });
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    final void addMessage(Message message, Location location) throws InterruptedIOException {
        ReferenceStore.ReferenceData referenceData = new ReferenceStore.ReferenceData();
        referenceData.setExpiration(message.getExpiration());
        referenceData.setFileId(location.getDataFileId());
        referenceData.setOffset(location.getOffset());
        this.lock.lock();
        try {
            this.lastLocation = location;
            this.messages.put(message.getMessageId(), referenceData);
            this.lock.unlock();
            if (this.messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
                flush();
            } else {
                try {
                    this.asyncWriteTask.wakeup();
                } catch (InterruptedException e) {
                    throw new InterruptedIOException();
                }
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public boolean replayAddMessage(ConnectionContext connectionContext, Message message, Location location) {
        MessageId messageId = message.getMessageId();
        try {
            if (this.referenceStore.getMessageReference(messageId) != null) {
                return false;
            }
            ReferenceStore.ReferenceData referenceData = new ReferenceStore.ReferenceData();
            referenceData.setExpiration(message.getExpiration());
            referenceData.setFileId(location.getDataFileId());
            referenceData.setOffset(location.getOffset());
            this.referenceStore.addMessageReference(connectionContext, messageId, referenceData);
            return true;
        } catch (Throwable th) {
            LOG.warn("Could not replay add for message '" + messageId + "'.  Message may have already been added. reason: " + th, th);
            return false;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeMessage(ConnectionContext connectionContext, final MessageAck messageAck) throws IOException {
        JournalQueueAck journalQueueAck = new JournalQueueAck();
        journalQueueAck.setDestination(this.destination);
        journalQueueAck.setMessageAck(messageAck);
        final Location writeCommand = this.peristenceAdapter.writeCommand(journalQueueAck, messageAck.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (this.debug) {
                LOG.debug("Journalled message remove for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
            }
            removeMessage(messageAck, writeCommand);
            return;
        }
        if (this.debug) {
            LOG.debug("Journalled transacted message remove for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
        }
        this.lock.lock();
        try {
            this.inFlightTxLocations.add(writeCommand);
            this.lock.unlock();
            this.transactionStore.removeMessage(this, messageAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.amq.AMQMessageStore.3
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        AMQMessageStore.LOG.debug("Transacted message remove commit for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQMessageStore.this.lock.unlock();
                        AMQMessageStore.this.removeMessage(messageAck, writeCommand);
                    } catch (Throwable th) {
                        AMQMessageStore.this.lock.unlock();
                        throw th;
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        AMQMessageStore.LOG.debug("Transacted message remove rollback for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQMessageStore.this.lock.unlock();
                    } catch (Throwable th) {
                        AMQMessageStore.this.lock.unlock();
                        throw th;
                    }
                }
            });
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    final void removeMessage(MessageAck messageAck, Location location) throws InterruptedIOException {
        this.lock.lock();
        try {
            this.lastLocation = location;
            ReferenceStore.ReferenceData remove = this.messages.remove(messageAck.getLastMessageId());
            if (remove == null) {
                this.messageAcks.add(messageAck);
            } else {
                this.peristenceAdapter.removeInProgressDataFile(this, remove.getFileId());
            }
            if (this.messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
                flush();
            } else if (remove == null) {
                try {
                    this.asyncWriteTask.wakeup();
                } catch (InterruptedException e) {
                    throw new InterruptedIOException();
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    public boolean replayRemoveMessage(ConnectionContext connectionContext, MessageAck messageAck) {
        try {
            if (this.referenceStore.getMessageReference(messageAck.getLastMessageId()) == null) {
                return false;
            }
            this.referenceStore.removeMessage(connectionContext, messageAck);
            return true;
        } catch (Throwable th) {
            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + th);
            return false;
        }
    }

    public void flush() throws InterruptedIOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("flush starting ...");
        }
        this.lock.lock();
        try {
            if (this.lastWrittenLocation == this.lastLocation) {
                return;
            }
            if (this.flushLatch == null) {
                this.flushLatch = new CountDownLatch(1);
            }
            CountDownLatch countDownLatch = this.flushLatch;
            this.lock.unlock();
            try {
                this.asyncWriteTask.wakeup();
                countDownLatch.await();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("flush finished");
                }
            } catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        } finally {
            this.lock.unlock();
        }
    }

    synchronized void asyncWrite() {
        try {
            this.lock.lock();
            try {
                CountDownLatch countDownLatch = this.flushLatch;
                this.flushLatch = null;
                this.lock.unlock();
                this.mark.set(doAsyncWrite());
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (IOException e) {
            LOG.error("Checkpoint failed: " + e, e);
        }
    }

    protected Location doAsyncWrite() throws IOException {
        final int maxCheckpointMessageAddSize = this.peristenceAdapter.getMaxCheckpointMessageAddSize();
        this.lock.lock();
        try {
            this.cpAddedMessageIds = this.messages;
            final List<MessageAck> list = this.messageAcks;
            ArrayList arrayList = new ArrayList(this.inFlightTxLocations);
            this.messages = new LinkedHashMap();
            this.messageAcks = new ArrayList();
            Location location = this.lastLocation;
            this.lock.unlock();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Doing batch update... adding: " + this.cpAddedMessageIds.size() + " removing: " + list.size() + ANSI.Renderer.CODE_TEXT_SEPARATOR);
            }
            this.transactionTemplate.run(new Callback() { // from class: org.apache.activemq.store.amq.AMQMessageStore.4
                @Override // org.apache.activemq.util.Callback
                public void execute() throws Exception {
                    int i = 0;
                    PersistenceAdapter persistenceAdapter = AMQMessageStore.this.transactionTemplate.getPersistenceAdapter();
                    ConnectionContext context = AMQMessageStore.this.transactionTemplate.getContext();
                    for (Map.Entry entry : AMQMessageStore.this.cpAddedMessageIds.entrySet()) {
                        try {
                            AMQMessageStore.this.referenceStore.addMessageReference(context, (MessageId) entry.getKey(), (ReferenceStore.ReferenceData) entry.getValue());
                            AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, ((ReferenceStore.ReferenceData) entry.getValue()).getFileId());
                        } catch (Throwable th) {
                            AMQMessageStore.LOG.warn("Message could not be added to long term store: " + th.getMessage(), th);
                        }
                        i++;
                        if (i >= maxCheckpointMessageAddSize) {
                            persistenceAdapter.commitTransaction(context);
                            persistenceAdapter.beginTransaction(context);
                            i = 0;
                        }
                    }
                    persistenceAdapter.commitTransaction(context);
                    persistenceAdapter.beginTransaction(context);
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        try {
                            AMQMessageStore.this.referenceStore.removeMessage(AMQMessageStore.this.transactionTemplate.getContext(), (MessageAck) it.next());
                        } catch (Throwable th2) {
                            AMQMessageStore.LOG.warn("Message could not be removed from long term store: " + th2.getMessage(), th2);
                        }
                    }
                }
            });
            LOG.debug("Batch update done.");
            this.lock.lock();
            try {
                this.cpAddedMessageIds = null;
                this.lastWrittenLocation = location;
                this.lock.unlock();
                if (arrayList.size() <= 0) {
                    return location;
                }
                Collections.sort(arrayList);
                return (Location) arrayList.get(0);
            } finally {
            }
        } finally {
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public Message getMessage(MessageId messageId) throws IOException {
        Location location = getLocation(messageId);
        if (location == null) {
            return null;
        }
        DataStructure readCommand = this.peristenceAdapter.readCommand(location);
        try {
            return (Message) readCommand;
        } catch (ClassCastException e) {
            throw new IOException("Could not read message " + messageId + " at location " + location + ", expected a message, but got: " + readCommand);
        }
    }

    protected Location getLocation(MessageId messageId) throws IOException {
        this.lock.lock();
        try {
            ReferenceStore.ReferenceData referenceData = this.messages.get(messageId);
            if (referenceData == null && this.cpAddedMessageIds != null) {
                referenceData = this.cpAddedMessageIds.get(messageId);
            }
            if (referenceData == null) {
                referenceData = this.referenceStore.getMessageReference(messageId);
                if (referenceData == null) {
                    return null;
                }
            }
            Location location = new Location();
            location.setDataFileId(referenceData.getFileId());
            location.setOffset(referenceData.getOffset());
            return location;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recover(MessageRecoveryListener messageRecoveryListener) throws Exception {
        flush();
        this.referenceStore.recover(new RecoveryListenerAdapter(this, messageRecoveryListener));
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        this.referenceStore.start();
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        flush();
        this.asyncWriteTask.shutdown();
        this.referenceStore.stop();
    }

    public ReferenceStore getReferenceStore() {
        return this.referenceStore;
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        flush();
        this.referenceStore.removeAllMessages(connectionContext);
    }

    @Override // org.apache.activemq.store.MessageStore
    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    public void addMessageReference(ConnectionContext connectionContext, MessageId messageId, long j, String str) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    public String getMessageReference(MessageId messageId) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    @Override // org.apache.activemq.store.MessageStore
    public int getMessageCount() throws IOException {
        flush();
        return this.referenceStore.getMessageCount();
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recoverNextMessages(int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        RecoveryListenerAdapter recoveryListenerAdapter = new RecoveryListenerAdapter(this, messageRecoveryListener);
        this.referenceStore.recoverNextMessages(i, recoveryListenerAdapter);
        if (recoveryListenerAdapter.size() == 0 && recoveryListenerAdapter.hasSpace()) {
            flush();
            this.referenceStore.recoverNextMessages(i, recoveryListenerAdapter);
        }
    }

    Message getMessage(ReferenceStore.ReferenceData referenceData) throws IOException {
        Location location = new Location();
        location.setDataFileId(referenceData.getFileId());
        location.setOffset(referenceData.getOffset());
        DataStructure readCommand = this.peristenceAdapter.readCommand(location);
        try {
            return (Message) readCommand;
        } catch (ClassCastException e) {
            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: " + readCommand);
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void resetBatching() {
        this.referenceStore.resetBatching();
    }

    public Location getMark() {
        return this.mark.get();
    }
}
