package org.apache.qpid.server.virtualhost;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.class */
public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer {
    private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer.class */
    public static class AsynchronousRecoverer {
        private final VirtualHostImpl<?, ?, ?> _virtualHost;
        private final EventLogger _eventLogger;
        private final MessageStore _store;
        private final MessageStoreLogSubject _logSubject;
        private final long _maxMessageId;
        private final Set<AMQQueue<?>> _recoveringQueues;
        private final AtomicBoolean _recoveryComplete;
        private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$DistributedTransactionVisitor.class */
        public class DistributedTransactionVisitor implements DistributedTransactionHandler {
            private DistributedTransactionVisitor() {
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v26, types: [org.apache.qpid.server.store.TransactionLogResource, org.apache.qpid.server.queue.AMQQueue] */
            /* JADX WARN: Type inference failed for: r0v61, types: [org.apache.qpid.server.store.TransactionLogResource, org.apache.qpid.server.queue.AMQQueue] */
            @Override // org.apache.qpid.server.store.handler.DistributedTransactionHandler
            public boolean handle(long j, byte[] bArr, byte[] bArr2, Transaction.Record[] recordArr, Transaction.Record[] recordArr2) {
                Xid xid = new Xid(j, bArr, bArr2);
                DtxRegistry dtxRegistry = AsynchronousRecoverer.this.getVirtualHost().getDtxRegistry();
                DtxBranch branch = dtxRegistry.getBranch(xid);
                if (branch == null) {
                    branch = new DtxBranch(xid, AsynchronousRecoverer.this.getStore(), AsynchronousRecoverer.this.getVirtualHost());
                    dtxRegistry.registerBranch(branch);
                }
                for (Transaction.Record record : recordArr) {
                    final ?? queue = AsynchronousRecoverer.this.getVirtualHost().getQueue(record.getResource().getId());
                    if (queue != 0) {
                        long messageNumber = record.getMessage().getMessageNumber();
                        final ServerMessage recoveredMessage = AsynchronousRecoverer.this.getRecoveredMessage(messageNumber);
                        if (recoveredMessage != null) {
                            final MessageReference newReference = recoveredMessage.newReference();
                            branch.enqueue(queue, recoveredMessage);
                            branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.DistributedTransactionVisitor.1
                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void postCommit() {
                                    queue.enqueue(recoveredMessage, null);
                                    newReference.release();
                                }

                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void onRollback() {
                                    newReference.release();
                                }
                            });
                        } else {
                            AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber)));
                        }
                    } else {
                        AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), record.getResource().getId().toString()));
                    }
                }
                for (Transaction.Record record2 : recordArr2) {
                    ?? queue2 = AsynchronousRecoverer.this.getVirtualHost().getQueue(record2.getResource().getId());
                    if (queue2 != 0) {
                        if (AsynchronousRecoverer.this.isRecovering(queue2)) {
                            AsynchronousRecoverer.this.recoverQueue(queue2);
                        }
                        long messageNumber2 = record2.getMessage().getMessageNumber();
                        ServerMessage recoveredMessage2 = AsynchronousRecoverer.this.getRecoveredMessage(messageNumber2);
                        if (recoveredMessage2 != null) {
                            final QueueEntry messageOnTheQueue = queue2.getMessageOnTheQueue(messageNumber2);
                            messageOnTheQueue.acquire();
                            branch.dequeue(queue2, recoveredMessage2);
                            branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.DistributedTransactionVisitor.2
                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void postCommit() {
                                    messageOnTheQueue.delete();
                                }

                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void onRollback() {
                                    messageOnTheQueue.release();
                                }
                            });
                        } else {
                            AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber2)));
                        }
                    } else {
                        AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), record2.getResource().getId().toString()));
                    }
                }
                branch.setState(DtxBranch.State.PREPARED);
                branch.prePrepareTransaction();
                return true;
            }

            private StringBuilder xidAsString(Xid xid) {
                return new StringBuilder("(").append(xid.getFormat()).append(',').append(Functions.str(xid.getGlobalId())).append(',').append(Functions.str(xid.getBranchId())).append(')');
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$MessageInstanceVisitor.class */
        public class MessageInstanceVisitor implements MessageInstanceHandler {
            private final AMQQueue<?> _queue;
            long _recoveredCount;

            private MessageInstanceVisitor(AMQQueue<?> aMQQueue) {
                this._queue = aMQQueue;
            }

            @Override // org.apache.qpid.server.store.handler.MessageInstanceHandler
            public boolean handle(UUID uuid, long j) {
                String name = this._queue.getName();
                if (j >= AsynchronousRecoverer.this._maxMessageId) {
                    return false;
                }
                ServerMessage<?> recoveredMessage = AsynchronousRecoverer.this.getRecoveredMessage(j);
                if (recoveredMessage != null) {
                    if (AsynchronousMessageStoreRecoverer._logger.isDebugEnabled()) {
                        AsynchronousMessageStoreRecoverer._logger.debug("On recovery, delivering " + recoveredMessage.getMessageNumber() + " to " + name);
                    }
                    this._queue.recover(recoveredMessage);
                    this._recoveredCount++;
                    return true;
                }
                AsynchronousMessageStoreRecoverer._logger.warn("Message id " + j + " referenced in log as enqueued in queue " + name + " is unknown, entry will be discarded");
                Transaction newTransaction = AsynchronousRecoverer.this.getStore().newTransaction();
                newTransaction.dequeueMessage(this._queue, new DummyMessage(j));
                newTransaction.commitTranAsync();
                return true;
            }

            public long getRecoveredCount() {
                return this._recoveredCount;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$QueueRecoveringTask.class */
        public class QueueRecoveringTask implements Runnable {
            private final AMQQueue<?> _queue;

            public QueueRecoveringTask(AMQQueue<?> aMQQueue) {
                this._queue = aMQQueue;
            }

            @Override // java.lang.Runnable
            public void run() {
                AsynchronousRecoverer.this.recoverQueue(this._queue);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private AsynchronousRecoverer(VirtualHostImpl<?, ?, ?> virtualHostImpl) {
            this._recoveringQueues = new CopyOnWriteArraySet();
            this._recoveryComplete = new AtomicBoolean();
            this._recoveredMessages = new HashMap();
            this._virtualHost = virtualHostImpl;
            this._eventLogger = virtualHostImpl.getEventLogger();
            this._store = virtualHostImpl.getMessageStore();
            this._logSubject = new MessageStoreLogSubject(virtualHostImpl.getName(), this._store.getClass().getSimpleName());
            this._maxMessageId = this._store.getNextMessageId();
            this._recoveringQueues.addAll(this._virtualHost.getQueues());
        }

        public void recover() {
            getStore().visitDistributedTransactions(new DistributedTransactionVisitor());
            for (AMQQueue<?> aMQQueue : this._recoveringQueues) {
                new Thread(new QueueRecoveringTask(aMQQueue), "Queue Recoverer : " + aMQQueue.getName() + " (vh: " + getVirtualHost().getName() + ")").start();
            }
        }

        public VirtualHostImpl<?, ?, ?> getVirtualHost() {
            return this._virtualHost;
        }

        public EventLogger getEventLogger() {
            return this._eventLogger;
        }

        public MessageStore getStore() {
            return this._store;
        }

        public MessageStoreLogSubject getLogSubject() {
            return this._logSubject;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isRecovering(AMQQueue<?> aMQQueue) {
            return this._recoveringQueues.contains(aMQQueue);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recoverQueue(AMQQueue<?> aMQQueue) {
            MessageInstanceVisitor messageInstanceVisitor = new MessageInstanceVisitor(aMQQueue);
            this._store.visitMessageInstances(aMQQueue, messageInstanceVisitor);
            getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(Long.valueOf(messageInstanceVisitor.getRecoveredCount()), aMQQueue.getName()));
            getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(aMQQueue.getName(), true));
            aMQQueue.completeRecovery();
            this._recoveringQueues.remove(aMQQueue);
            if (this._recoveringQueues.isEmpty() && this._recoveryComplete.compareAndSet(false, true)) {
                completeRecovery();
            }
        }

        private synchronized void completeRecovery() {
            for (Map.Entry<Long, MessageReference<? extends ServerMessage<?>>> entry : this._recoveredMessages.entrySet()) {
                entry.getValue().release();
                entry.setValue(null);
            }
            final ArrayList<StoredMessage> arrayList = new ArrayList();
            getStore().visitMessages(new MessageHandler() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.1
                @Override // org.apache.qpid.server.store.handler.MessageHandler
                public boolean handle(StoredMessage<?> storedMessage) {
                    long messageNumber = storedMessage.getMessageNumber();
                    if (!AsynchronousRecoverer.this._recoveredMessages.containsKey(Long.valueOf(messageNumber))) {
                        arrayList.add(storedMessage);
                    }
                    return messageNumber < AsynchronousRecoverer.this._maxMessageId - 1;
                }
            });
            for (StoredMessage storedMessage : arrayList) {
                AsynchronousMessageStoreRecoverer._logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
                storedMessage.remove();
            }
            arrayList.clear();
            this._recoveredMessages.clear();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.qpid.server.store.StorableMessageMetaData] */
        public synchronized ServerMessage<?> getRecoveredMessage(long j) {
            StoredMessage<?> message;
            MessageReference messageReference = this._recoveredMessages.get(Long.valueOf(j));
            if (messageReference == null && (message = this._store.getMessage(j)) != null) {
                messageReference = message.getMetaData().getType().createMessage(message).newReference();
                this._recoveredMessages.put(Long.valueOf(j), messageReference);
            }
            if (messageReference == null) {
                return null;
            }
            return messageReference.getMessage();
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$DummyMessage.class */
    private static class DummyMessage implements EnqueueableMessage {
        private final long _messageId;

        public DummyMessage(long j) {
            this._messageId = j;
        }

        @Override // org.apache.qpid.server.message.EnqueueableMessage
        public long getMessageNumber() {
            return this._messageId;
        }

        @Override // org.apache.qpid.server.message.EnqueueableMessage
        public boolean isPersistent() {
            return true;
        }

        @Override // org.apache.qpid.server.message.EnqueueableMessage
        public StoredMessage getStoredMessage() {
            return null;
        }
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public void recover(VirtualHostImpl virtualHostImpl) {
        new AsynchronousRecoverer(virtualHostImpl).recover();
    }
}
