package org.apache.qpid.server.virtualhost;

import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.class */
public class VirtualHostConfigRecoveryHandler implements MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, TransactionLogRecoveryHandler.QueueEntryRecoveryHandler, TransactionLogRecoveryHandler.DtxRecordRecoveryHandler {
    private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
    private final VirtualHost _virtualHost;
    private final Map<String, Integer> _queueRecoveries = new TreeMap();
    private final Map<Long, ServerMessage> _recoveredMessages = new HashMap();
    private final Map<Long, StoredMessage> _unusedMessages = new HashMap();
    private final ExchangeRegistry _exchangeRegistry;
    private final ExchangeFactory _exchangeFactory;
    private MessageStoreLogSubject _logSubject;
    private MessageStore _store;

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler$DummyMessage.class */
    private static class DummyMessage implements EnqueableMessage {
        private final long _messageId;

        public DummyMessage(long j) {
            this._messageId = j;
        }

        @Override // org.apache.qpid.server.message.EnqueableMessage
        public long getMessageNumber() {
            return this._messageId;
        }

        @Override // org.apache.qpid.server.message.EnqueableMessage
        public boolean isPersistent() {
            return true;
        }

        @Override // org.apache.qpid.server.message.EnqueableMessage
        public StoredMessage getStoredMessage() {
            return null;
        }
    }

    public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, ExchangeRegistry exchangeRegistry, ExchangeFactory exchangeFactory) {
        this._virtualHost = virtualHost;
        this._exchangeRegistry = exchangeRegistry;
        this._exchangeFactory = exchangeFactory;
    }

    @Override // org.apache.qpid.server.store.TransactionLogRecoveryHandler
    public VirtualHostConfigRecoveryHandler begin(MessageStore messageStore) {
        this._logSubject = new MessageStoreLogSubject(this._virtualHost.getName(), messageStore.getClass().getSimpleName());
        this._store = messageStore;
        CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_START(null, false));
        return this;
    }

    @Override // org.apache.qpid.server.store.MessageStoreRecoveryHandler
    public MessageStoreRecoveryHandler.StoredMessageRecoveryHandler begin() {
        return this;
    }

    @Override // org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler
    public void message(StoredMessage storedMessage) {
        this._recoveredMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage.getMetaData().getType().createMessage(storedMessage));
        this._unusedMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage);
    }

    @Override // org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler
    public void completeMessageRecovery() {
    }

    @Override // org.apache.qpid.server.store.TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
    public void dtxRecord(long j, byte[] bArr, byte[] bArr2, Transaction.Record[] recordArr, Transaction.Record[] recordArr2) {
        Xid xid = new Xid(j, bArr, bArr2);
        DtxRegistry dtxRegistry = this._virtualHost.getDtxRegistry();
        DtxBranch branch = dtxRegistry.getBranch(xid);
        if (branch == null) {
            branch = new DtxBranch(xid, this._store, this._virtualHost);
            dtxRegistry.registerBranch(branch);
        }
        for (Transaction.Record record : recordArr) {
            final AMQQueue queue = this._virtualHost.getQueue(record.getQueue().getId());
            if (queue != null) {
                long messageNumber = record.getMessage().getMessageNumber();
                final ServerMessage serverMessage = this._recoveredMessages.get(Long.valueOf(messageNumber));
                this._unusedMessages.remove(Long.valueOf(messageNumber));
                if (serverMessage != null) {
                    final MessageReference newReference = serverMessage.newReference();
                    branch.enqueue(queue, serverMessage);
                    branch.addPostTransactionAcion(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler.1
                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void postCommit() {
                            try {
                                queue.enqueue(serverMessage, true, null);
                                newReference.release();
                            } catch (AMQException e) {
                                VirtualHostConfigRecoveryHandler._logger.error("Unable to enqueue message " + serverMessage.getMessageNumber() + " into queue " + queue.getName() + " (from XA transaction)", e);
                                throw new RuntimeException((Throwable) e);
                            }
                        }

                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void onRollback() {
                            newReference.release();
                        }
                    });
                } else {
                    CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber)));
                }
            } else {
                CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), record.getQueue().getId().toString()));
            }
        }
        for (Transaction.Record record2 : recordArr2) {
            AMQQueue queue2 = this._virtualHost.getQueue(record2.getQueue().getId());
            if (queue2 != null) {
                long messageNumber2 = record2.getMessage().getMessageNumber();
                ServerMessage serverMessage2 = this._recoveredMessages.get(Long.valueOf(messageNumber2));
                this._unusedMessages.remove(Long.valueOf(messageNumber2));
                if (serverMessage2 != null) {
                    final QueueEntry messageOnTheQueue = queue2.getMessageOnTheQueue(messageNumber2);
                    messageOnTheQueue.acquire();
                    branch.dequeue(queue2, serverMessage2);
                    branch.addPostTransactionAcion(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler.2
                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void postCommit() {
                            messageOnTheQueue.discard();
                        }

                        @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                        public void onRollback() {
                            messageOnTheQueue.release();
                        }
                    });
                } else {
                    CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber2)));
                }
            } else {
                CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), record2.getQueue().getId().toString()));
            }
        }
        try {
            branch.setState(DtxBranch.State.PREPARED);
            branch.prePrepareTransaction();
        } catch (AMQStoreException e) {
            _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " + ((Object) xidAsString(xid)), e);
            throw new RuntimeException((Throwable) e);
        }
    }

    private static StringBuilder xidAsString(Xid xid) {
        return new StringBuilder("(").append(xid.getFormat()).append(',').append(Functions.str(xid.getGlobalId())).append(',').append(Functions.str(xid.getBranchId())).append(')');
    }

    @Override // org.apache.qpid.server.store.TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
    public void completeDtxRecordRecovery() {
        for (StoredMessage storedMessage : this._unusedMessages.values()) {
            _logger.warn("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
            storedMessage.remove();
        }
        CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
    }

    public void complete() {
    }

    @Override // org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
    public void queueEntry(final UUID uuid, long j) {
        AMQQueue queue = this._virtualHost.getQueue(uuid);
        try {
            if (queue != null) {
                String name = queue.getName();
                ServerMessage serverMessage = this._recoveredMessages.get(Long.valueOf(j));
                this._unusedMessages.remove(Long.valueOf(j));
                if (serverMessage != null) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("On recovery, delivering " + serverMessage.getMessageNumber() + " to " + name);
                    }
                    Integer num = this._queueRecoveries.get(name);
                    if (num == null) {
                        num = 0;
                    }
                    queue.enqueue(serverMessage);
                    this._queueRecoveries.put(name, Integer.valueOf(num.intValue() + 1));
                } else {
                    _logger.warn("Message id " + j + " referenced in log as enqueued in queue " + name + " is unknown, entry will be discarded");
                    Transaction newTransaction = this._store.newTransaction();
                    newTransaction.dequeueMessage(queue, new DummyMessage(j));
                    newTransaction.commitTranAsync();
                }
            } else {
                _logger.warn("Message id " + j + " in log references queue with id " + uuid + " which is not in the configuration, entry will be discarded");
                Transaction newTransaction2 = this._store.newTransaction();
                newTransaction2.dequeueMessage(new TransactionLogResource() { // from class: org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler.3
                    @Override // org.apache.qpid.server.store.TransactionLogResource
                    public UUID getId() {
                        return uuid;
                    }
                }, new DummyMessage(j));
                newTransaction2.commitTranAsync();
            }
        } catch (AMQException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
    public TransactionLogRecoveryHandler.DtxRecordRecoveryHandler completeQueueEntryRecovery() {
        for (Map.Entry<String, Integer> entry : this._queueRecoveries.entrySet()) {
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
        }
        return this;
    }
}
