package org.apache.qpid.server.virtualhost;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.class */
public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousMessageStoreRecoverer.class);
    private AsynchronousRecoverer _asynchronousRecoverer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer.class */
    public static class AsynchronousRecoverer {
        public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000;
        private final QueueManagingVirtualHost<?> _virtualHost;
        private final EventLogger _eventLogger;
        private final MessageStore _store;
        private final MessageStoreLogSubject _logSubject;
        private final long _maxMessageId;
        private final Set<Queue<?>> _recoveringQueues;
        private final AtomicBoolean _recoveryComplete;
        private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages;
        private final ListeningExecutorService _queueRecoveryExecutor;
        private final MessageStore.MessageStoreReader _storeReader;
        private AtomicBoolean _continueRecovery;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$DistributedTransactionVisitor.class */
        public class DistributedTransactionVisitor implements DistributedTransactionHandler {
            private DistributedTransactionVisitor() {
            }

            @Override // org.apache.qpid.server.store.handler.DistributedTransactionHandler
            public boolean handle(Transaction.StoredXidRecord storedXidRecord, Transaction.EnqueueRecord[] enqueueRecordArr, Transaction.DequeueRecord[] dequeueRecordArr) {
                Xid xid = new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId());
                DtxRegistry dtxRegistry = AsynchronousRecoverer.this.getVirtualHost().getDtxRegistry();
                DtxBranch branch = dtxRegistry.getBranch(xid);
                if (branch == null) {
                    branch = new DtxBranch(storedXidRecord, dtxRegistry);
                    dtxRegistry.registerBranch(branch);
                }
                for (Transaction.EnqueueRecord enqueueRecord : enqueueRecordArr) {
                    final Queue<?> attainedQueue = AsynchronousRecoverer.this.getVirtualHost().getAttainedQueue(enqueueRecord.getResource().getId());
                    if (attainedQueue != null) {
                        long messageNumber = enqueueRecord.getMessage().getMessageNumber();
                        final ServerMessage recoveredMessage = AsynchronousRecoverer.this.getRecoveredMessage(messageNumber);
                        if (recoveredMessage != null) {
                            final MessageReference newReference = recoveredMessage.newReference();
                            final MessageEnqueueRecord[] messageEnqueueRecordArr = new MessageEnqueueRecord[1];
                            branch.enqueue(attainedQueue, recoveredMessage, new Action<MessageEnqueueRecord>() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.DistributedTransactionVisitor.1
                                @Override // org.apache.qpid.server.util.Action, org.apache.qpid.server.util.BaseAction
                                public void performAction(MessageEnqueueRecord messageEnqueueRecord) {
                                    messageEnqueueRecordArr[0] = messageEnqueueRecord;
                                }
                            });
                            branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.DistributedTransactionVisitor.2
                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void postCommit() {
                                    attainedQueue.enqueue(recoveredMessage, null, messageEnqueueRecordArr[0]);
                                    newReference.release();
                                }

                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void onRollback() {
                                    newReference.release();
                                }
                            });
                        } else {
                            AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber)));
                        }
                    } else {
                        AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), enqueueRecord.getResource().getId().toString()));
                    }
                }
                for (Transaction.DequeueRecord dequeueRecord : dequeueRecordArr) {
                    Queue<?> attainedQueue2 = AsynchronousRecoverer.this.getVirtualHost().getAttainedQueue(dequeueRecord.getEnqueueRecord().getQueueId());
                    if (attainedQueue2 != null) {
                        if (AsynchronousRecoverer.this.isRecovering(attainedQueue2)) {
                            AsynchronousRecoverer.this.recoverQueue(attainedQueue2);
                        }
                        long messageNumber2 = dequeueRecord.getEnqueueRecord().getMessageNumber();
                        if (AsynchronousRecoverer.this.getRecoveredMessage(messageNumber2) != null) {
                            final QueueEntry messageOnTheQueue = attainedQueue2.getMessageOnTheQueue(messageNumber2);
                            if (!messageOnTheQueue.acquire()) {
                                throw new ServerScopedRuntimeException("Distributed transaction dequeue handler failed to acquire " + messageOnTheQueue + " during recovery of queue " + attainedQueue2);
                            }
                            branch.dequeue(messageOnTheQueue.getEnqueueRecord());
                            branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.DistributedTransactionVisitor.3
                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void postCommit() {
                                    messageOnTheQueue.delete();
                                }

                                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                                public void onRollback() {
                                    messageOnTheQueue.release();
                                }
                            });
                        } else {
                            AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber2)));
                        }
                    } else {
                        AsynchronousRecoverer.this.getEventLogger().message(AsynchronousRecoverer.this.getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), dequeueRecord.getEnqueueRecord().getQueueId().toString()));
                    }
                }
                branch.setState(DtxBranch.State.PREPARED);
                branch.prePrepareTransaction();
                return AsynchronousRecoverer.this._continueRecovery.get();
            }

            private StringBuilder xidAsString(Xid xid) {
                return new StringBuilder("(").append(xid.getFormat()).append(',').append(Functions.str(xid.getGlobalId())).append(',').append(Functions.str(xid.getBranchId())).append(')');
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$MessageInstanceVisitor.class */
        public class MessageInstanceVisitor implements MessageInstanceHandler {
            private final Queue<?> _queue;
            long _recoveredCount;
            private int _numberOfUnknownMessageInstances;

            private MessageInstanceVisitor(Queue<?> queue) {
                this._queue = queue;
                this._numberOfUnknownMessageInstances = 0;
            }

            @Override // org.apache.qpid.server.store.handler.MessageInstanceHandler
            public boolean handle(MessageEnqueueRecord messageEnqueueRecord) {
                long messageNumber = messageEnqueueRecord.getMessageNumber();
                String name = this._queue.getName();
                if (messageNumber >= AsynchronousRecoverer.this._maxMessageId) {
                    return false;
                }
                ServerMessage<?> recoveredMessage = AsynchronousRecoverer.this.getRecoveredMessage(messageNumber);
                if (recoveredMessage != null) {
                    AsynchronousMessageStoreRecoverer.LOGGER.debug("Delivering message id '{}' to queue '{}'", Long.valueOf(recoveredMessage.getMessageNumber()), name);
                    this._queue.recover(recoveredMessage, messageEnqueueRecord);
                    this._recoveredCount++;
                } else {
                    AsynchronousMessageStoreRecoverer.LOGGER.debug("Message id '{}' referenced in log as enqueued in queue '{}' is unknown, entry will be discarded", Long.valueOf(messageNumber), name);
                    Transaction newTransaction = AsynchronousRecoverer.this._store.newTransaction();
                    newTransaction.dequeueMessage(messageEnqueueRecord);
                    newTransaction.commitTranAsync((Void) null);
                    this._numberOfUnknownMessageInstances++;
                }
                return AsynchronousRecoverer.this._continueRecovery.get();
            }

            long getRecoveredCount() {
                return this._recoveredCount;
            }

            int getNumberOfUnknownMessageInstances() {
                return this._numberOfUnknownMessageInstances;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$QueueRecoveringTask.class */
        public class QueueRecoveringTask implements Runnable {
            private final Queue<?> _queue;

            QueueRecoveringTask(Queue<?> queue) {
                this._queue = queue;
            }

            @Override // java.lang.Runnable
            public void run() {
                String name = Thread.currentThread().getName();
                Thread.currentThread().setName("Queue Recoverer : " + this._queue.getName() + " (vh: " + AsynchronousRecoverer.this.getVirtualHost().getName() + ")");
                try {
                    AsynchronousRecoverer.this.recoverQueue(this._queue);
                    Thread.currentThread().setName(name);
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    throw th;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer$AsynchronousRecoverer$RemoveOrphanedMessagesTask.class */
        public class RemoveOrphanedMessagesTask implements Runnable {
            RemoveOrphanedMessagesTask() {
            }

            @Override // java.lang.Runnable
            public void run() {
                String name = Thread.currentThread().getName();
                Thread.currentThread().setName("Orphaned message removal");
                try {
                    AsynchronousRecoverer.this.completeRecovery();
                    Thread.currentThread().setName(name);
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    throw th;
                }
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private AsynchronousRecoverer(QueueManagingVirtualHost<?> queueManagingVirtualHost) {
            this._recoveringQueues = new CopyOnWriteArraySet();
            this._recoveryComplete = new AtomicBoolean();
            this._recoveredMessages = new HashMap();
            this._queueRecoveryExecutor = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(Executors.defaultThreadFactory())));
            this._continueRecovery = new AtomicBoolean(true);
            this._virtualHost = queueManagingVirtualHost;
            this._eventLogger = queueManagingVirtualHost.getEventLogger();
            this._store = queueManagingVirtualHost.getMessageStore();
            this._storeReader = this._store.newMessageStoreReader();
            this._logSubject = new MessageStoreLogSubject(queueManagingVirtualHost.getName(), this._store.getClass().getSimpleName());
            this._maxMessageId = this._store.getNextMessageId();
            this._recoveringQueues.addAll(this._virtualHost.getChildren(Queue.class));
        }

        public ListenableFuture<Void> recover() {
            getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
            ArrayList arrayList = new ArrayList();
            if (this._recoveringQueues.isEmpty()) {
                return this._queueRecoveryExecutor.submit(new RemoveOrphanedMessagesTask(), (Object) null);
            }
            Iterator<Queue<?>> it = this._recoveringQueues.iterator();
            while (it.hasNext()) {
                arrayList.add(this._queueRecoveryExecutor.submit(new QueueRecoveringTask(it.next()), (Object) null));
            }
            return Futures.transform(Futures.allAsList(arrayList), list -> {
                return null;
            }, MoreExecutors.directExecutor());
        }

        public QueueManagingVirtualHost<?> getVirtualHost() {
            return this._virtualHost;
        }

        public EventLogger getEventLogger() {
            return this._eventLogger;
        }

        public MessageStore.MessageStoreReader getStoreReader() {
            return this._storeReader;
        }

        public MessageStoreLogSubject getLogSubject() {
            return this._logSubject;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isRecovering(Queue<?> queue) {
            return this._recoveringQueues.contains(queue);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recoverQueue(Queue<?> queue) {
            MessageInstanceVisitor messageInstanceVisitor = new MessageInstanceVisitor(queue);
            this._storeReader.visitMessageInstances(queue, messageInstanceVisitor);
            if (messageInstanceVisitor.getNumberOfUnknownMessageInstances() > 0) {
                AsynchronousMessageStoreRecoverer.LOGGER.info("Discarded {} entry(s) associated with queue '{}' as the referenced message does not exist.", Integer.valueOf(messageInstanceVisitor.getNumberOfUnknownMessageInstances()), queue.getName());
            }
            getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(Long.valueOf(messageInstanceVisitor.getRecoveredCount()), queue.getName()));
            getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
            queue.completeRecovery();
            this._recoveringQueues.remove(queue);
            if (this._recoveringQueues.isEmpty() && this._recoveryComplete.compareAndSet(false, true)) {
                completeRecovery();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void completeRecovery() {
            for (Map.Entry<Long, MessageReference<? extends ServerMessage<?>>> entry : this._recoveredMessages.entrySet()) {
                entry.getValue().release();
                entry.setValue(null);
            }
            final ArrayList<StoredMessage> arrayList = new ArrayList();
            getStoreReader().visitMessages(new MessageHandler() { // from class: org.apache.qpid.server.virtualhost.AsynchronousMessageStoreRecoverer.AsynchronousRecoverer.1
                @Override // org.apache.qpid.server.store.handler.MessageHandler
                public boolean handle(StoredMessage<?> storedMessage) {
                    long messageNumber = storedMessage.getMessageNumber();
                    if (!AsynchronousRecoverer.this._continueRecovery.get() || messageNumber >= AsynchronousRecoverer.this._maxMessageId) {
                        return false;
                    }
                    if (AsynchronousRecoverer.this._recoveredMessages.containsKey(Long.valueOf(messageNumber))) {
                        return true;
                    }
                    arrayList.add(storedMessage);
                    return true;
                }
            });
            int i = 0;
            for (StoredMessage storedMessage : arrayList) {
                if (this._continueRecovery.get()) {
                    AsynchronousMessageStoreRecoverer.LOGGER.debug("Message id '{}' is orphaned, removing", Long.valueOf(storedMessage.getMessageNumber()));
                    storedMessage.remove();
                    i++;
                }
            }
            if (i > 0) {
                AsynchronousMessageStoreRecoverer.LOGGER.info("Discarded {} orphaned message(s).", Integer.valueOf(i));
            }
            arrayList.clear();
            this._recoveredMessages.clear();
            this._storeReader.close();
            this._queueRecoveryExecutor.shutdown();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.qpid.server.store.StorableMessageMetaData] */
        public synchronized ServerMessage<?> getRecoveredMessage(long j) {
            StoredMessage<?> message;
            MessageReference messageReference = this._recoveredMessages.get(Long.valueOf(j));
            if (messageReference == null && (message = this._storeReader.getMessage(j)) != null) {
                messageReference = message.getMetaData().getType().createMessage(message).newReference();
                this._recoveredMessages.put(Long.valueOf(j), messageReference);
            }
            if (messageReference == null) {
                return null;
            }
            return messageReference.getMessage();
        }

        public void cancel() {
            this._continueRecovery.set(false);
            this._queueRecoveryExecutor.shutdown();
            try {
                if (!this._queueRecoveryExecutor.awaitTermination(Broker.DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    AsynchronousMessageStoreRecoverer.LOGGER.warn("Failed to gracefully shutdown queue recovery executor within permitted time period");
                    this._queueRecoveryExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this._storeReader.close();
            }
        }
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public ListenableFuture<Void> recover(QueueManagingVirtualHost<?> queueManagingVirtualHost) {
        this._asynchronousRecoverer = new AsynchronousRecoverer(queueManagingVirtualHost);
        return this._asynchronousRecoverer.recover();
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public void cancel() {
        if (this._asynchronousRecoverer != null) {
            this._asynchronousRecoverer.cancel();
        }
    }
}
