package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/jdbc/JDBCMessageStore.class */
public class JDBCMessageStore extends AbstractMessageStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JDBCMessageStore.class);
    protected final WireFormat wireFormat;
    protected final JDBCAdapter adapter;
    protected final JDBCPersistenceAdapter persistenceAdapter;
    protected AtomicLong lastRecoveredSequenceId;
    protected AtomicLong lastRecoveredPriority;
    protected ActiveMQMessageAudit audit;
    protected final LinkedList<Long> pendingAdditions;

    /* loaded from: input_file:org/apache/activemq/store/jdbc/JDBCMessageStore$Duration.class */
    class Duration {
        static final int LIMIT = 100;
        final long start = System.currentTimeMillis();
        final String name;

        Duration(String str) {
            this.name = str;
        }

        void end() {
            end(null);
        }

        void end(Object obj) {
            long currentTimeMillis = System.currentTimeMillis() - this.start;
            if (currentTimeMillis > 100) {
                System.err.println(this.name + " took a long time: " + currentTimeMillis + "ms " + obj);
            }
        }
    }

    public JDBCMessageStore(JDBCPersistenceAdapter jDBCPersistenceAdapter, JDBCAdapter jDBCAdapter, WireFormat wireFormat, ActiveMQDestination activeMQDestination, ActiveMQMessageAudit activeMQMessageAudit) throws IOException {
        super(activeMQDestination);
        this.lastRecoveredSequenceId = new AtomicLong(-1L);
        this.lastRecoveredPriority = new AtomicLong(126L);
        this.pendingAdditions = new LinkedList<>();
        this.persistenceAdapter = jDBCPersistenceAdapter;
        this.adapter = jDBCAdapter;
        this.wireFormat = wireFormat;
        this.audit = activeMQMessageAudit;
        if (activeMQDestination.isQueue() && jDBCPersistenceAdapter.getBrokerService().shouldRecordVirtualDestination(activeMQDestination)) {
            recordDestinationCreation(activeMQDestination);
        }
    }

    private void recordDestinationCreation(ActiveMQDestination activeMQDestination) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                transactionContext = this.persistenceAdapter.getTransactionContext();
                if (this.adapter.doGetLastAckedDurableSubscriberMessageId(transactionContext, activeMQDestination, activeMQDestination.getQualifiedName(), activeMQDestination.getQualifiedName()) < 0) {
                    this.adapter.doRecordDestination(transactionContext, activeMQDestination);
                }
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to record destination: " + activeMQDestination + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void addMessage(ConnectionContext connectionContext, final Message message) throws IOException {
        final long nextSequenceId;
        MessageId messageId = message.getMessageId();
        if (this.audit != null && this.audit.isDuplicate(message)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.destination.getPhysicalName() + " ignoring duplicated (add) message, already stored: " + messageId);
                return;
            }
            return;
        }
        XATransactionId xid = connectionContext != null ? connectionContext.getXid() : null;
        try {
            byte[] byteArray = ByteSequenceData.toByteArray(this.wireFormat.marshal(message));
            TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext(connectionContext);
            synchronized (this.pendingAdditions) {
                nextSequenceId = this.persistenceAdapter.getNextSequenceId();
                message.getMessageId().setEntryLocator(Long.valueOf(nextSequenceId));
                if (xid == null) {
                    this.pendingAdditions.add(Long.valueOf(nextSequenceId));
                    transactionContext.onCompletion(new Runnable() { // from class: org.apache.activemq.store.jdbc.JDBCMessageStore.1
                        @Override // java.lang.Runnable
                        public void run() {
                            message.getMessageId().setFutureOrSequenceLong(Long.valueOf(nextSequenceId));
                        }
                    });
                    if (this.indexListener != null) {
                        this.indexListener.onAdd(new IndexListener.MessageContext(connectionContext, message, new Runnable() { // from class: org.apache.activemq.store.jdbc.JDBCMessageStore.2
                            @Override // java.lang.Runnable
                            public void run() {
                                synchronized (JDBCMessageStore.this.pendingAdditions) {
                                    JDBCMessageStore.this.pendingAdditions.remove(Long.valueOf(nextSequenceId));
                                }
                            }
                        }));
                    } else {
                        this.pendingAdditions.remove(Long.valueOf(nextSequenceId));
                    }
                }
            }
            try {
                try {
                    this.adapter.doAddMessage(transactionContext, nextSequenceId, messageId, this.destination, byteArray, message.getExpiration(), isPrioritizedMessages() ? message.getPriority() : (byte) 0, xid);
                    transactionContext.close();
                    if (xid == null) {
                        onAdd(message, nextSequenceId, message.getPriority());
                    }
                } catch (SQLException e) {
                    JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                    throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, (Exception) e);
                }
            } catch (Throwable th) {
                transactionContext.close();
                throw th;
            }
        } catch (IOException e2) {
            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e2, (Exception) e2);
        }
    }

    private long minPendingSequeunceId() {
        synchronized (this.pendingAdditions) {
            if (this.pendingAdditions.isEmpty()) {
                return this.persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1;
            }
            return this.pendingAdditions.get(0).longValue();
        }
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
    public void updateMessage(Message message) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                this.adapter.doUpdateMessage(transactionContext, this.destination, message.getMessageId(), ByteSequenceData.toByteArray(this.wireFormat.marshal(message)));
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onAdd(Message message, long j, byte b) {
    }

    public void addMessageReference(ConnectionContext connectionContext, MessageId messageId, long j, String str) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext(connectionContext);
        try {
            try {
                this.adapter.doAddMessageReference(transactionContext, this.persistenceAdapter.getNextSequenceId(), messageId, this.destination, j, str);
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public Message getMessage(MessageId messageId) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                byte[] doGetMessage = this.adapter.doGetMessage(transactionContext, messageId);
                if (doGetMessage == null) {
                    return null;
                }
                Message message = (Message) this.wireFormat.unmarshal(new ByteSequence(doGetMessage));
                transactionContext.close();
                return message;
            } catch (IOException e) {
                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, (Exception) e);
            } catch (SQLException e2) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e2);
                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e2, (Exception) e2);
            }
        } finally {
            transactionContext.close();
        }
    }

    public String getMessageReference(MessageId messageId) throws IOException {
        long brokerSequenceId = messageId.getBrokerSequenceId();
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                try {
                    String doGetMessageReference = this.adapter.doGetMessageReference(transactionContext, brokerSequenceId);
                    transactionContext.close();
                    return doGetMessageReference;
                } catch (IOException e) {
                    throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, (Exception) e);
                }
            } catch (SQLException e2) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e2);
                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e2, (Exception) e2);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
        long longValue = messageAck.getLastMessageId().getFutureOrSequenceLong() != null ? ((Long) messageAck.getLastMessageId().getFutureOrSequenceLong()).longValue() : this.persistenceAdapter.getStoreSequenceIdForMessageId(connectionContext, messageAck.getLastMessageId(), this.destination)[0];
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext(connectionContext);
        try {
            try {
                this.adapter.doRemoveMessage(transactionContext, longValue, connectionContext != null ? connectionContext.getXid() : null);
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to broker message: " + messageAck.getLastMessageId() + " in container: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recover(final MessageRecoveryListener messageRecoveryListener) throws Exception {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                transactionContext = this.persistenceAdapter.getTransactionContext();
                this.adapter.doRecover(transactionContext, this.destination, new JDBCMessageRecoveryListener() { // from class: org.apache.activemq.store.jdbc.JDBCMessageStore.3
                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessage(long j, byte[] bArr) throws Exception {
                        Message message = (Message) JDBCMessageStore.this.wireFormat.unmarshal(new ByteSequence(bArr));
                        message.getMessageId().setBrokerSequenceId(j);
                        return messageRecoveryListener.recoverMessage(message);
                    }

                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessageReference(String str) throws Exception {
                        return messageRecoveryListener.recoverMessageReference(new MessageId(str));
                    }
                });
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext(connectionContext);
        try {
            try {
                this.adapter.doRemoveAllMessages(transactionContext, this.destination);
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public int getMessageCount() throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                int doGetMessageCount = this.adapter.doGetMessageCount(transactionContext, this.destination);
                transactionContext.close();
                return doGetMessageCount;
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to get Message Count: " + this.destination + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recoverNextMessages(int i, final MessageRecoveryListener messageRecoveryListener) throws Exception {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace(this + " recoverNext lastRecovered:" + this.lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
                }
                this.adapter.doRecoverNextMessages(transactionContext, this.destination, minPendingSequeunceId(), this.lastRecoveredSequenceId.get(), this.lastRecoveredPriority.get(), i, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { // from class: org.apache.activemq.store.jdbc.JDBCMessageStore.4
                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessage(long j, byte[] bArr) throws Exception {
                        Message message = (Message) JDBCMessageStore.this.wireFormat.unmarshal(new ByteSequence(bArr));
                        message.getMessageId().setBrokerSequenceId(j);
                        message.getMessageId().setFutureOrSequenceLong(Long.valueOf(j));
                        messageRecoveryListener.recoverMessage(message);
                        JDBCMessageStore.this.lastRecoveredSequenceId.set(j);
                        JDBCMessageStore.this.lastRecoveredPriority.set(message.getPriority());
                        return true;
                    }

                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessageReference(String str) throws Exception {
                        if (!messageRecoveryListener.hasSpace()) {
                            return false;
                        }
                        messageRecoveryListener.recoverMessageReference(new MessageId(str));
                        return true;
                    }
                });
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                transactionContext.close();
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void resetBatching() {
        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " resetBatching, existing last recovered seqId: " + this.lastRecoveredSequenceId.get());
        }
        this.lastRecoveredSequenceId.set(-1L);
        this.lastRecoveredPriority.set(126L);
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
    public void setBatch(MessageId messageId) {
        try {
            long[] storeSequenceIdForMessageId = this.persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, this.destination);
            this.lastRecoveredSequenceId.set(storeSequenceIdForMessageId[0]);
            this.lastRecoveredPriority.set(storeSequenceIdForMessageId[1]);
        } catch (IOException e) {
            this.lastRecoveredSequenceId.set(-1L);
            this.lastRecoveredPriority.set(126L);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " setBatch: new sequenceId: " + this.lastRecoveredSequenceId.get() + ", priority: " + this.lastRecoveredPriority.get());
        }
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
    public void setPrioritizedMessages(boolean z) {
        super.setPrioritizedMessages(z);
    }

    public String toString() {
        return this.destination.getPhysicalName() + ",pendingSize:" + this.pendingAdditions.size();
    }
}
