package org.apache.qpid.server.virtualhost;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.class */
public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$DistributedTransactionVisitor.class */
    private static class DistributedTransactionVisitor implements DistributedTransactionHandler {
        private final QueueManagingVirtualHost<?> _virtualHost;
        private final EventLogger _eventLogger;
        private final MessageStoreLogSubject _logSubject;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        private DistributedTransactionVisitor(QueueManagingVirtualHost<?> queueManagingVirtualHost, EventLogger eventLogger, MessageStoreLogSubject messageStoreLogSubject, Map<Long, ServerMessage<?>> map, Map<Long, StoredMessage<?>> map2) {
            this._virtualHost = queueManagingVirtualHost;
            this._eventLogger = eventLogger;
            this._logSubject = messageStoreLogSubject;
            this._recoveredMessages = map;
            this._unusedMessages = map2;
        }

        @Override // org.apache.qpid.server.store.handler.DistributedTransactionHandler
        public boolean handle(Transaction.StoredXidRecord storedXidRecord, Transaction.EnqueueRecord[] enqueueRecordArr, Transaction.DequeueRecord[] dequeueRecordArr) {
            Xid xid = new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId());
            DtxRegistry dtxRegistry = this._virtualHost.getDtxRegistry();
            DtxBranch branch = dtxRegistry.getBranch(xid);
            if (branch == null) {
                branch = new DtxBranch(storedXidRecord, dtxRegistry);
                dtxRegistry.registerBranch(branch);
            }
            for (Transaction.EnqueueRecord enqueueRecord : enqueueRecordArr) {
                final Queue<?> attainedQueue = this._virtualHost.getAttainedQueue(enqueueRecord.getResource().getId());
                if (attainedQueue != null) {
                    long messageNumber = enqueueRecord.getMessage().getMessageNumber();
                    final ServerMessage<?> serverMessage = this._recoveredMessages.get(Long.valueOf(messageNumber));
                    this._unusedMessages.remove(Long.valueOf(messageNumber));
                    if (serverMessage != null) {
                        final MessageReference newReference = serverMessage.newReference();
                        final MessageEnqueueRecord[] messageEnqueueRecordArr = new MessageEnqueueRecord[1];
                        branch.enqueue(attainedQueue, serverMessage, messageEnqueueRecord -> {
                            messageEnqueueRecordArr[0] = messageEnqueueRecord;
                        });
                        branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.SynchronousMessageStoreRecoverer.DistributedTransactionVisitor.1
                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void postCommit() {
                                attainedQueue.enqueue(serverMessage, null, messageEnqueueRecordArr[0]);
                                newReference.release();
                            }

                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void onRollback() {
                                newReference.release();
                            }
                        });
                    } else {
                        this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber)));
                    }
                } else {
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), enqueueRecord.getResource().getId().toString()));
                }
            }
            for (Transaction.DequeueRecord dequeueRecord : dequeueRecordArr) {
                Queue<?> attainedQueue2 = this._virtualHost.getAttainedQueue(dequeueRecord.getEnqueueRecord().getQueueId());
                if (attainedQueue2 != null) {
                    long messageNumber2 = dequeueRecord.getEnqueueRecord().getMessageNumber();
                    ServerMessage<?> serverMessage2 = this._recoveredMessages.get(Long.valueOf(messageNumber2));
                    this._unusedMessages.remove(Long.valueOf(messageNumber2));
                    if (serverMessage2 != null) {
                        final QueueEntry messageOnTheQueue = attainedQueue2.getMessageOnTheQueue(messageNumber2);
                        if (!messageOnTheQueue.acquire()) {
                            throw new ServerScopedRuntimeException("Distributed transaction dequeue handler failed to acquire " + messageOnTheQueue + " during recovery of queue " + attainedQueue2);
                        }
                        branch.dequeue(messageOnTheQueue.getEnqueueRecord());
                        branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.SynchronousMessageStoreRecoverer.DistributedTransactionVisitor.2
                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void postCommit() {
                                messageOnTheQueue.delete();
                            }

                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void onRollback() {
                                messageOnTheQueue.release();
                            }
                        });
                    } else {
                        this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber2)));
                    }
                } else {
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), dequeueRecord.getEnqueueRecord().getQueueId().toString()));
                }
            }
            branch.setState(DtxBranch.State.PREPARED);
            branch.prePrepareTransaction();
            return true;
        }

        private StringBuilder xidAsString(Xid xid) {
            return new StringBuilder("(").append(xid.getFormat()).append(',').append(Functions.str(xid.getGlobalId())).append(',').append(Functions.str(xid.getBranchId())).append(')');
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$MessageInstanceVisitor.class */
    private static class MessageInstanceVisitor implements MessageInstanceHandler {
        private final QueueManagingVirtualHost<?> _virtualHost;
        private final MessageStore _store;
        private final Map<Queue<?>, Integer> _queueRecoveries;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;
        private final Map<UUID, Integer> _unknownQueuesWithMessages;
        private final Map<Queue<?>, Integer> _queuesWithUnknownMessages;

        private MessageInstanceVisitor(QueueManagingVirtualHost<?> queueManagingVirtualHost, MessageStore messageStore, Map<Queue<?>, Integer> map, Map<Long, ServerMessage<?>> map2, Map<Long, StoredMessage<?>> map3, Map<UUID, Integer> map4, Map<Queue<?>, Integer> map5) {
            this._virtualHost = queueManagingVirtualHost;
            this._store = messageStore;
            this._queueRecoveries = map;
            this._recoveredMessages = map2;
            this._unusedMessages = map3;
            this._unknownQueuesWithMessages = map4;
            this._queuesWithUnknownMessages = map5;
        }

        @Override // org.apache.qpid.server.store.handler.MessageInstanceHandler
        public boolean handle(MessageEnqueueRecord messageEnqueueRecord) {
            UUID queueId = messageEnqueueRecord.getQueueId();
            long messageNumber = messageEnqueueRecord.getMessageNumber();
            Queue<?> attainedQueue = this._virtualHost.getAttainedQueue(queueId);
            boolean z = true;
            if (attainedQueue != null) {
                String name = attainedQueue.getName();
                ServerMessage<?> serverMessage = this._recoveredMessages.get(Long.valueOf(messageNumber));
                this._unusedMessages.remove(Long.valueOf(messageNumber));
                if (serverMessage != null) {
                    SynchronousMessageStoreRecoverer.LOGGER.debug("Delivering message id '{}' to queue '{}'", Long.valueOf(serverMessage.getMessageNumber()), name);
                    this._queueRecoveries.merge(attainedQueue, 1, (num, num2) -> {
                        return Integer.valueOf(num.intValue() + 1);
                    });
                    attainedQueue.recover(serverMessage, messageEnqueueRecord);
                    z = false;
                } else {
                    SynchronousMessageStoreRecoverer.LOGGER.debug("Message id '{}' referenced in log as enqueued in queue '{}' is unknown, entry will be discarded", Long.valueOf(messageNumber), name);
                    this._queuesWithUnknownMessages.merge(attainedQueue, 1, (num3, num4) -> {
                        return Integer.valueOf(num3.intValue() + 1);
                    });
                }
            } else {
                SynchronousMessageStoreRecoverer.LOGGER.debug("Message id '{}' in log references queue with id '{}' which is not in the configuration, entry will be discarded", Long.valueOf(messageNumber), queueId);
                this._unknownQueuesWithMessages.merge(queueId, 1, (num5, num6) -> {
                    return Integer.valueOf(num5.intValue() + 1);
                });
            }
            if (!z) {
                return true;
            }
            Transaction newTransaction = this._store.newTransaction();
            newTransaction.dequeueMessage(messageEnqueueRecord);
            newTransaction.commitTranAsync((Void) null);
            return true;
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$MessageVisitor.class */
    private static class MessageVisitor implements MessageHandler {
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        MessageVisitor(Map<Long, ServerMessage<?>> map, Map<Long, StoredMessage<?>> map2) {
            this._recoveredMessages = map;
            this._unusedMessages = map2;
        }

        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.qpid.server.store.StorableMessageMetaData] */
        @Override // org.apache.qpid.server.store.handler.MessageHandler
        public boolean handle(StoredMessage<?> storedMessage) {
            this._recoveredMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage.getMetaData().getType().createMessage(storedMessage));
            this._unusedMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage);
            return true;
        }
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public ListenableFuture<Void> recover(QueueManagingVirtualHost<?> queueManagingVirtualHost) {
        EventLogger eventLogger = queueManagingVirtualHost.getEventLogger();
        MessageStore messageStore = queueManagingVirtualHost.getMessageStore();
        MessageStore.MessageStoreReader newMessageStoreReader = messageStore.newMessageStoreReader();
        MessageStoreLogSubject messageStoreLogSubject = new MessageStoreLogSubject(queueManagingVirtualHost.getName(), messageStore.getClass().getSimpleName());
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        TreeMap treeMap2 = new TreeMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERY_START());
        newMessageStoreReader.visitMessages(new MessageVisitor(hashMap, treeMap2));
        eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_START(null, false));
        try {
            newMessageStoreReader.visitMessageInstances(new MessageInstanceVisitor(queueManagingVirtualHost, messageStore, treeMap, hashMap, treeMap2, hashMap2, hashMap3));
            if (!hashMap2.isEmpty()) {
                hashMap2.forEach((uuid, num) -> {
                    LOGGER.info("Discarded {} entry(s) associated with queue id '{}' as a queue with this id does not appear in the configuration.", num, uuid);
                });
            }
            if (!hashMap3.isEmpty()) {
                hashMap3.forEach((queue, num2) -> {
                    LOGGER.info("Discarded {} entry(s) associated with queue '{}' as the referenced message does not exist.", num2, queue.getName());
                });
            }
            for (Map.Entry entry : treeMap.entrySet()) {
                Queue queue2 = (Queue) entry.getKey();
                eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERED((Integer) entry.getValue(), queue2.getName()));
                eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_COMPLETE(queue2.getName(), true));
                queue2.completeRecovery();
            }
            for (C c : queueManagingVirtualHost.getChildren(Queue.class)) {
                if (!treeMap.containsKey(c)) {
                    c.completeRecovery();
                }
            }
            newMessageStoreReader.visitDistributedTransactions(new DistributedTransactionVisitor(queueManagingVirtualHost, eventLogger, messageStoreLogSubject, hashMap, treeMap2));
            for (StoredMessage storedMessage : treeMap2.values()) {
                LOGGER.debug("Message id '{}' is orphaned, removing", Long.valueOf(storedMessage.getMessageNumber()));
                storedMessage.remove();
            }
            if (treeMap2.size() > 0) {
                LOGGER.info("Discarded {} orphaned message(s).", Integer.valueOf(treeMap2.size()));
            }
            eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
            eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERED(Integer.valueOf(hashMap.size() - treeMap2.size())));
            eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERY_COMPLETE());
            return Futures.immediateFuture((Object) null);
        } catch (Throwable th) {
            if (!hashMap2.isEmpty()) {
                hashMap2.forEach((uuid2, num3) -> {
                    LOGGER.info("Discarded {} entry(s) associated with queue id '{}' as a queue with this id does not appear in the configuration.", num3, uuid2);
                });
            }
            if (!hashMap3.isEmpty()) {
                hashMap3.forEach((queue3, num22) -> {
                    LOGGER.info("Discarded {} entry(s) associated with queue '{}' as the referenced message does not exist.", num22, queue3.getName());
                });
            }
            throw th;
        }
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public void cancel() {
    }
}
