/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.server.impl;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.jboss.logging.Logger;

public class PostOfficeJournalLoader
implements JournalLoader {
    private static final Logger logger = Logger.getLogger(PostOfficeJournalLoader.class);
    protected final PostOffice postOffice;
    protected final PagingManager pagingManager;
    private final StorageManager storageManager;
    private final QueueFactory queueFactory;
    protected final NodeManager nodeManager;
    private final ManagementService managementService;
    private final GroupingHandler groupingHandler;
    private final Configuration configuration;
    private Map<Long, Queue> queues;

    public PostOfficeJournalLoader(PostOffice postOffice, PagingManager pagingManager, StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService managementService, GroupingHandler groupingHandler, Configuration configuration) {
        this.postOffice = postOffice;
        this.pagingManager = pagingManager;
        this.storageManager = storageManager;
        this.queueFactory = queueFactory;
        this.nodeManager = nodeManager;
        this.managementService = managementService;
        this.groupingHandler = groupingHandler;
        this.configuration = configuration;
        this.queues = new HashMap<Long, Queue>();
    }

    public PostOfficeJournalLoader(PostOffice postOffice, PagingManager pagingManager, StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService managementService, GroupingHandler groupingHandler, Configuration configuration, Map<Long, Queue> queues) {
        this(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
        this.queues = queues;
    }

    @Override
    public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap, List<QueueBindingInfo> queueBindingInfos) throws Exception {
        int duplicateID = 0;
        for (QueueBindingInfo queueBindingInfo : queueBindingInfos) {
            queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
            if (this.postOffice.getBinding(queueBindingInfo.getQueueName()) != null) {
                if (FilterUtils.isTopicIdentification(FilterImpl.createFilter(queueBindingInfo.getFilterString()))) {
                    long tx = this.storageManager.generateID();
                    this.storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
                    this.storageManager.commitBindings(tx);
                    continue;
                }
                SimpleString newName = queueBindingInfo.getQueueName().concat("-" + duplicateID++);
                ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString());
                queueBindingInfo.replaceQueueName(newName);
            }
            Queue queue = this.queueFactory.createQueueWith(new QueueConfiguration(queueBindingInfo.getQueueName()).setId(Long.valueOf(queueBindingInfo.getId())).setAddress(queueBindingInfo.getAddress()).setFilterString(queueBindingInfo.getFilterString()).setUser(queueBindingInfo.getUser()).setDurable(Boolean.valueOf(true)).setTemporary(Boolean.valueOf(false)).setAutoCreated(Boolean.valueOf(queueBindingInfo.isAutoCreated())).setPurgeOnNoConsumers(Boolean.valueOf(queueBindingInfo.isPurgeOnNoConsumers())).setMaxConsumers(Integer.valueOf(queueBindingInfo.getMaxConsumers())).setExclusive(Boolean.valueOf(queueBindingInfo.isExclusive())).setGroupRebalance(Boolean.valueOf(queueBindingInfo.isGroupRebalance())).setGroupBuckets(Integer.valueOf(queueBindingInfo.getGroupBuckets())).setGroupFirstKey(queueBindingInfo.getGroupFirstKey()).setLastValue(Boolean.valueOf(queueBindingInfo.isLastValue())).setLastValueKey(queueBindingInfo.getLastValueKey()).setNonDestructive(Boolean.valueOf(queueBindingInfo.isNonDestructive())).setConsumersBeforeDispatch(Integer.valueOf(queueBindingInfo.getConsumersBeforeDispatch())).setDelayBeforeDispatch(Long.valueOf(queueBindingInfo.getDelayBeforeDispatch())).setAutoDelete(Boolean.valueOf(queueBindingInfo.isAutoDelete())).setAutoDeleteDelay(Long.valueOf(queueBindingInfo.getAutoDeleteDelay())).setAutoDeleteMessageCount(Long.valueOf(queueBindingInfo.getAutoDeleteMessageCount())).setRoutingType(RoutingType.getType((byte)queueBindingInfo.getRoutingType())).setConfigurationManaged(Boolean.valueOf(queueBindingInfo.isConfigurationManaged())).setRingSize(Long.valueOf(queueBindingInfo.getRingSize())), this.pagingManager);
            queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)this.postOffice).getServer(), queueBindingInfo.getQueueName()));
            if (queueBindingInfo.getQueueStatusEncodings() != null) {
                for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
                    if (encoding.getStatus() != AddressQueueStatus.PAUSED) continue;
                    queue.reloadPause(encoding.getId());
                }
            }
            LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, this.nodeManager.getNodeId());
            this.queues.put(queue.getID(), queue);
            this.postOffice.addBinding(binding);
            this.managementService.registerQueue(queue, queue.getAddress(), this.storageManager);
        }
    }

    @Override
    public void initAddresses(List<AddressBindingInfo> addressBindingInfos) throws Exception {
        for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
            AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
            addressInfo.setId(addressBindingInfo.getId());
            if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {
                addressInfo.setStorageManager(this.storageManager);
                addressInfo.setPostOffice(this.postOffice);
                addressInfo.reloadPause(addressBindingInfo.getAddressStatusEncoding().getId());
            }
            this.postOffice.reloadAddressInfo(addressInfo);
        }
    }

    @Override
    public void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception {
        for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet()) {
            long queueID = entry.getKey();
            Map<Long, AddMessageRecord> queueRecords = entry.getValue();
            Queue queue = this.queues.get(queueID);
            if (queue == null) {
                if (queueRecords.values().size() == 0) continue;
                ActiveMQServerLogger.LOGGER.journalCannotFindQueueForMessage(queueID);
                continue;
            }
            queue.pause();
            Collection<AddMessageRecord> valueRecords = queueRecords.values();
            long currentTime = System.currentTimeMillis();
            for (AddMessageRecord record : valueRecords) {
                long scheduledDeliveryTime = record.getScheduledDeliveryTime();
                if (scheduledDeliveryTime != 0L && scheduledDeliveryTime <= currentTime) {
                    scheduledDeliveryTime = 0L;
                    record.getMessage().setScheduledDeliveryTime(Long.valueOf(0L));
                }
                if (scheduledDeliveryTime != 0L) {
                    record.getMessage().setScheduledDeliveryTime(Long.valueOf(scheduledDeliveryTime));
                }
                MessageReference ref = this.postOffice.reload(record.getMessage(), queue, null);
                ref.setDeliveryCount(record.getDeliveryCount());
                if (scheduledDeliveryTime == 0L) continue;
                record.getMessage().setScheduledDeliveryTime(Long.valueOf(0L));
            }
        }
    }

    @Override
    public void handleNoMessageReferences(Map<Long, Message> messages) {
        for (Message msg : messages.values()) {
            if (msg.getRefCount() != 0 || msg.getDurableCount() != 0) continue;
            ActiveMQServerLogger.LOGGER.journalUnreferencedMessage(msg.getMessageID());
            try {
                this.storageManager.deleteMessage(msg.getMessageID());
            }
            catch (Exception ignored) {
                ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(ignored, msg.getMessageID());
            }
        }
    }

    @Override
    public void handleGroupingBindings(List<GroupingInfo> groupingInfos) {
        for (GroupingInfo groupingInfo : groupingInfos) {
            if (this.groupingHandler == null) continue;
            this.groupingHandler.addGroupBinding(new GroupBinding(groupingInfo.getId(), groupingInfo.getGroupId(), groupingInfo.getClusterName()));
        }
    }

    @Override
    public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
        for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {
            SimpleString address = entry.getKey();
            DuplicateIDCache cache = this.postOffice.getDuplicateIDCache(address);
            if (!this.configuration.isPersistIDCache()) continue;
            cache.load(entry.getValue());
        }
    }

    @Override
    public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
        for (Queue queue : this.queues.values()) {
            if (queue.isPersistedPause()) continue;
            queue.resume();
        }
        if (System.getProperty("org.apache.activemq.opt.directblast") != null) {
            messageJournal.runDirectJournalBlast();
        }
    }

    @Override
    public void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception {
        Queue queue = this.queues.get(queueID);
        if (queue == null) {
            ActiveMQServerLogger.LOGGER.journalMessageInPreparedTX(queueID);
            return;
        }
        this.postOffice.reload(message, queue, tx);
    }

    @Override
    public void handlePreparedAcknowledge(long messageID, List<MessageReference> referencesToAck, long queueID) throws Exception {
        Queue queue = this.queues.get(queueID);
        if (queue == null) {
            ActiveMQServerLogger.LOGGER.journalMessageAckMissingQueueInPreparedTX(queueID);
        } else {
            MessageReference removed = queue.removeReferenceWithID(messageID);
            if (removed == null) {
                ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID);
            } else {
                referencesToAck.add(removed);
            }
        }
    }

    @Override
    public void handlePreparedTransaction(Transaction tx, List<MessageReference> referencesToAck, Xid xid, ResourceManager resourceManager) throws Exception {
        for (MessageReference ack : referencesToAck) {
            ack.getQueue().reacknowledge(tx, ack);
        }
        tx.setState(Transaction.State.PREPARED);
        resourceManager.putTransaction(xid, tx);
    }

    @Override
    public void recoverPendingPageCounters(List<PageCountPending> pendingNonTXPageCounter) throws Exception {
        TransactionImpl txRecoverCounter = new TransactionImpl(this.storageManager);
        Map<SimpleString, Map<Long, Map<Long, List<PageCountPending>>>> perAddressMap = this.generateMapsOnPendingCount(this.queues, pendingNonTXPageCounter, txRecoverCounter);
        for (Map.Entry<SimpleString, Map<Long, Map<Long, List<PageCountPending>>>> addressPageMapEntry : perAddressMap.entrySet()) {
            PagingStore store = this.pagingManager.getPageStore(addressPageMapEntry.getKey());
            Map<Long, Map<Long, List<PageCountPending>>> perPageMap = addressPageMapEntry.getValue();
            assert (perPageMap != null);
            for (Long pageId : perPageMap.keySet()) {
                Map<Long, List<PageCountPending>> perQueue = perPageMap.get(pageId);
                assert (perQueue != null);
                if (store != null && store.checkPageFileExists(pageId.intValue())) {
                    Page pg = store.createPage(pageId.intValue());
                    pg.open();
                    List<PagedMessage> pgMessages = pg.read(this.storageManager);
                    pg.close(false, false);
                    HashMap<Long, AtomicInteger> countsPerQueueOnPage = new HashMap<Long, AtomicInteger>();
                    HashMap<Long, AtomicLong> sizePerQueueOnPage = new HashMap<Long, AtomicLong>();
                    for (PagedMessage pagedMessage : pgMessages) {
                        if (pagedMessage.getTransactionID() > 0L) continue;
                        for (long q : pagedMessage.getQueueIDs()) {
                            AtomicInteger countQ = (AtomicInteger)countsPerQueueOnPage.get(q);
                            AtomicLong sizeQ = (AtomicLong)sizePerQueueOnPage.get(q);
                            if (countQ == null) {
                                countQ = new AtomicInteger(0);
                                countsPerQueueOnPage.put(q, countQ);
                            }
                            if (sizeQ == null) {
                                sizeQ = new AtomicLong(0L);
                                sizePerQueueOnPage.put(q, sizeQ);
                            }
                            countQ.incrementAndGet();
                            if (pagedMessage.getPersistentSize() <= 0L) continue;
                            sizeQ.addAndGet(pagedMessage.getPersistentSize());
                        }
                    }
                    for (Map.Entry entry : perQueue.entrySet()) {
                        Object object = ((List)entry.getValue()).iterator();
                        while (object.hasNext()) {
                            PageCountPending record = (PageCountPending)object.next();
                            logger.debug((Object)("Deleting pg tempCount " + record.getID()));
                            this.storageManager.deletePendingPageCounter(txRecoverCounter.getID(), record.getID());
                        }
                        PageSubscriptionCounter counter = store.getCursorProvider().getSubscription((Long)entry.getKey()).getCounter();
                        AtomicInteger value = (AtomicInteger)countsPerQueueOnPage.get(entry.getKey());
                        AtomicLong sizeValue = (AtomicLong)sizePerQueueOnPage.get(entry.getKey());
                        if (value == null) {
                            logger.debug((Object)("Page " + entry.getKey() + " wasn't open, so we will just ignore"));
                            continue;
                        }
                        logger.debug((Object)("Replacing counter " + value.get()));
                        counter.increment(txRecoverCounter, value.get(), sizeValue.get());
                    }
                    continue;
                }
                logger.debug((Object)("Page " + pageId + " didn't exist on address " + addressPageMapEntry.getKey() + ", so we are just removing records"));
                for (List<PageCountPending> records : perQueue.values()) {
                    for (PageCountPending record : records) {
                        logger.debug((Object)("Removing pending page counter " + record.getID()));
                        this.storageManager.deletePendingPageCounter(txRecoverCounter.getID(), record.getID());
                        txRecoverCounter.setContainsPersistent();
                    }
                }
            }
        }
        txRecoverCounter.commit();
    }

    @Override
    public void cleanUp() {
        this.queues.clear();
    }

    private Map<SimpleString, Map<Long, Map<Long, List<PageCountPending>>>> generateMapsOnPendingCount(Map<Long, Queue> queues, List<PageCountPending> pendingNonTXPageCounter, Transaction txRecoverCounter) throws Exception {
        HashMap<SimpleString, Map<Long, Map<Long, List<PageCountPending>>>> perAddressMap = new HashMap<SimpleString, Map<Long, Map<Long, List<PageCountPending>>>>();
        for (PageCountPending pgCount : pendingNonTXPageCounter) {
            LinkedList<PageCountPending> pendingCounters;
            HashMap<Long, LinkedList<PageCountPending>> perQueueMap;
            long queueID = pgCount.getQueueID();
            long pageID = pgCount.getPageID();
            Queue queue = queues.get(queueID);
            if (queue == null) {
                logger.debug((Object)("removing pending page counter id = " + pgCount.getID() + " as queueID=" + pgCount.getID() + " no longer exists"));
                this.storageManager.deletePendingPageCounter(txRecoverCounter.getID(), pgCount.getID());
                txRecoverCounter.setContainsPersistent();
                continue;
            }
            SimpleString address = queue.getAddress();
            HashMap perPageMap = (HashMap)perAddressMap.get(address);
            if (perPageMap == null) {
                perPageMap = new HashMap();
                perAddressMap.put(address, perPageMap);
            }
            if ((perQueueMap = (HashMap<Long, LinkedList<PageCountPending>>)perPageMap.get(pageID)) == null) {
                perQueueMap = new HashMap<Long, LinkedList<PageCountPending>>();
                perPageMap.put(pageID, perQueueMap);
            }
            if ((pendingCounters = (LinkedList<PageCountPending>)perQueueMap.get(queueID)) == null) {
                pendingCounters = new LinkedList<PageCountPending>();
                perQueueMap.put(queueID, pendingCounters);
            }
            pendingCounters.add(pgCount);
            perQueueMap.put(queueID, pendingCounters);
        }
        return perAddressMap;
    }
}

