/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.postoffice.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.AddressManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
import org.hornetq.core.postoffice.impl.BindingsImpl;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.postoffice.impl.SimpleAddressManager;
import org.hornetq.core.postoffice.impl.WildcardAddressManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;

public class PostOfficeImpl
implements PostOffice,
NotificationListener,
BindingsFactory {
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
    private final AddressManager addressManager;
    private final QueueFactory queueFactory;
    private final StorageManager storageManager;
    private final PagingManager pagingManager;
    private volatile boolean started;
    private final ManagementService managementService;
    private final Reaper reaperRunnable = new Reaper();
    private volatile Thread reaperThread;
    private final long reaperPeriod;
    private final int reaperPriority;
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
    private final int idCacheSize;
    private final boolean persistIDCache;
    private final Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
    private final Object notificationLock = new Object();
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
    private final HornetQServer server;

    public PostOfficeImpl(HornetQServer server, StorageManager storageManager, PagingManager pagingManager, QueueFactory bindableFactory, ManagementService managementService, long reaperPeriod, int reaperPriority, boolean enableWildCardRouting, int idCacheSize, boolean persistIDCache, HierarchicalRepository<AddressSettings> addressSettingsRepository) {
        this.storageManager = storageManager;
        this.queueFactory = bindableFactory;
        this.managementService = managementService;
        this.pagingManager = pagingManager;
        this.reaperPeriod = reaperPeriod;
        this.reaperPriority = reaperPriority;
        this.addressManager = enableWildCardRouting ? new WildcardAddressManager(this) : new SimpleAddressManager(this);
        this.idCacheSize = idCacheSize;
        this.persistIDCache = persistIDCache;
        this.addressSettingsRepository = addressSettingsRepository;
        this.server = server;
    }

    @Override
    public synchronized void start() throws Exception {
        this.managementService.addNotificationListener(this);
        if (this.pagingManager != null) {
            this.pagingManager.setPostOffice(this);
        }
        this.queueFactory.setPostOffice(this);
        this.started = true;
        this.startExpiryScanner();
    }

    @Override
    public synchronized void stop() throws Exception {
        this.started = false;
        this.managementService.removeNotificationListener(this);
        this.reaperRunnable.stop();
        if (this.reaperThread != null) {
            this.reaperThread.join();
            this.reaperThread = null;
        }
        this.addressManager.clear();
        this.queueInfos.clear();
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNotification(Notification notification) {
        Object object = this.notificationLock;
        synchronized (object) {
            NotificationType type = notification.getType();
            switch (type) {
                case BINDING_ADDED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_BINDING_TYPE)) {
                        throw new IllegalArgumentException("Binding type not specified");
                    }
                    Integer bindingType = props.getIntProperty(ManagementHelper.HDR_BINDING_TYPE);
                    if (bindingType == 2) {
                        return;
                    }
                    SimpleString routingName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
                    if (!props.containsProperty(ManagementHelper.HDR_BINDING_ID)) {
                        throw new IllegalArgumentException("ID is null");
                    }
                    long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw new IllegalArgumentException("Distance is null");
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
                    this.queueInfos.put(clusterName, info);
                    break;
                }
                case BINDING_REMOVED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    QueueInfo info = this.queueInfos.remove(clusterName);
                    if (info != null) break;
                    throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
                }
                case CONSUMER_CREATED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    QueueInfo info = this.queueInfos.get(clusterName);
                    if (info == null) {
                        throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
                    }
                    info.incrementConsumers();
                    if (filterString != null) {
                        List<SimpleString> filterStrings = info.getFilterStrings();
                        if (filterStrings == null) {
                            filterStrings = new ArrayList<SimpleString>();
                            info.setFilterStrings(filterStrings);
                        }
                        filterStrings.add(filterString);
                    }
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw new IllegalStateException("No distance");
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    if (distance <= 0) break;
                    SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    if (queueName == null) {
                        throw new IllegalStateException("No queue name");
                    }
                    Binding binding = this.getBinding(queueName);
                    if (binding == null) break;
                    Queue queue = (Queue)binding.getBindable();
                    AddressSettings addressSettings = this.addressSettingsRepository.getMatch(binding.getAddress().toString());
                    long redistributionDelay = addressSettings.getRedistributionDelay();
                    if (redistributionDelay == -1L) break;
                    queue.addRedistributor(redistributionDelay);
                    break;
                }
                case CONSUMER_CLOSED: {
                    TypedProperties props = notification.getProperties();
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    if (clusterName == null) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    QueueInfo info = this.queueInfos.get(clusterName);
                    if (info == null) {
                        return;
                    }
                    info.decrementConsumers();
                    if (filterString != null) {
                        List<SimpleString> filterStrings = info.getFilterStrings();
                        filterStrings.remove(filterString);
                    }
                    if (info.getNumberOfConsumers() != 0) break;
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    if (distance != 0) break;
                    SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    if (queueName == null) {
                        throw new IllegalStateException("No queue name");
                    }
                    Binding binding = this.getBinding(queueName);
                    if (binding == null) {
                        throw new IllegalStateException("No queue " + queueName);
                    }
                    Queue queue = (Queue)binding.getBindable();
                    AddressSettings addressSettings = this.addressSettingsRepository.getMatch(binding.getAddress().toString());
                    long redistributionDelay = addressSettings.getRedistributionDelay();
                    if (redistributionDelay == -1L) break;
                    queue.addRedistributor(redistributionDelay);
                    break;
                }
            }
        }
    }

    @Override
    public synchronized void addBinding(Binding binding) throws Exception {
        this.addressManager.addBinding(binding);
        TypedProperties props = new TypedProperties();
        props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
        props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
        props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
        props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
        props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
        Filter filter = binding.getFilter();
        if (filter != null) {
            props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
        }
        String uid = UUIDGenerator.getInstance().generateStringUUID();
        this.managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
    }

    @Override
    public synchronized Binding removeBinding(SimpleString uniqueName) throws Exception {
        Binding binding = this.addressManager.removeBinding(uniqueName);
        if (binding == null) {
            throw new HornetQException(100);
        }
        if (this.addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
            this.pagingManager.deletePageStore(binding.getAddress());
            this.managementService.unregisterAddress(binding.getAddress());
        }
        if (binding.getType() == BindingType.LOCAL_QUEUE) {
            this.managementService.unregisterQueue(uniqueName, binding.getAddress());
        } else if (binding.getType() == BindingType.DIVERT) {
            this.managementService.unregisterDivert(uniqueName);
        }
        TypedProperties props = new TypedProperties();
        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
        props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
        props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
        props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
        this.managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
        return binding;
    }

    @Override
    public Bindings getBindingsForAddress(SimpleString address) {
        Bindings bindings = this.addressManager.getBindingsForRoutingAddress(address);
        if (bindings == null) {
            bindings = this.createBindings();
        }
        return bindings;
    }

    @Override
    public Binding getBinding(SimpleString name) {
        return this.addressManager.getBinding(name);
    }

    @Override
    public Bindings getMatchingBindings(SimpleString address) {
        return this.addressManager.getMatchingBindings(address);
    }

    @Override
    public void route(ServerMessage message, boolean direct) throws Exception {
        this.route(message, (Transaction)null, direct);
    }

    @Override
    public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception {
        this.route(message, new RoutingContextImpl(tx), direct);
    }

    @Override
    public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception {
        Bindings bindings;
        if (message.getRefCount() > 0) {
            throw new IllegalStateException("Message cannot be routed more than once");
        }
        SimpleString address = message.getAddress();
        this.setPagingStore(message);
        Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
        DuplicateIDCache cache = null;
        byte[] duplicateIDBytes = null;
        if (duplicateID != null && (cache = this.getDuplicateIDCache(message.getAddress())).contains(duplicateIDBytes = duplicateID instanceof SimpleString ? ((SimpleString)duplicateID).getData() : (byte[])duplicateID)) {
            if (context.getTransaction() == null) {
                log.warn("Duplicate message detected - message will not be routed");
            } else {
                log.warn("Duplicate message detected - transaction will be rejected");
                context.getTransaction().markAsRollbackOnly(null);
            }
            return;
        }
        boolean startedTx = false;
        if (cache != null) {
            if (context.getTransaction() == null) {
                TransactionImpl newTX = new TransactionImpl(this.storageManager);
                context.setTransaction(newTX);
                startedTx = true;
            }
            cache.addToCache(duplicateIDBytes, context.getTransaction());
        }
        if (context.getTransaction() == null) {
            if (message.page(true)) {
                return;
            }
        } else {
            boolean depage;
            boolean bl = depage = context.getTransaction().getProperty(3) != null;
            if (!depage && message.storeIsPaging()) {
                this.getPageOperation(context.getTransaction()).addMessageToPage(message);
                return;
            }
        }
        if ((bindings = this.addressManager.getBindingsForRoutingAddress(address)) != null) {
            bindings.route(message, context);
        }
        if (context.getQueueCount() == 0) {
            AddressSettings addressSettings = this.addressSettingsRepository.getMatch(address.toString());
            boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
            if (sendToDLA) {
                SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
                if (dlaAddress == null) {
                    log.warn("Did not route to any bindings for address " + address + " and sendToDLAOnNoRoute is true " + "but there is no DLA configured for the address, the message will be ignored.");
                } else {
                    message.setOriginalHeaders(message, false);
                    message.setAddress(dlaAddress);
                    this.route(message, context.getTransaction(), false);
                }
            }
        } else {
            this.processRoute(message, context, direct);
        }
        if (startedTx) {
            context.getTransaction().commit();
        }
    }

    @Override
    public MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception {
        this.setPagingStore(message);
        MessageReference reference = message.createReference(queue);
        if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
            Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
        }
        message.incrementDurableRefCount();
        message.incrementRefCount();
        if (tx == null) {
            queue.addLast(reference, false);
        } else {
            ArrayList<MessageReference> refs = new ArrayList<MessageReference>(1);
            refs.add(reference);
            tx.addOperation(new AddOperation(refs));
        }
        return reference;
    }

    @Override
    public boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception {
        RoutingContextImpl context;
        boolean routed;
        Bindings bindings = this.addressManager.getBindingsForRoutingAddress(message.getAddress());
        boolean res = false;
        if (bindings != null && (routed = bindings.redistribute(message, originatingQueue, context = new RoutingContextImpl(tx)))) {
            this.processRoute(message, context, false);
            res = true;
        }
        return res;
    }

    @Override
    public PagingManager getPagingManager() {
        return this.pagingManager;
    }

    @Override
    public DuplicateIDCache getDuplicateIDCache(SimpleString address) {
        DuplicateIDCache oldCache;
        DuplicateIDCache cache = (DuplicateIDCache)this.duplicateIDCaches.get(address);
        if (cache == null && (oldCache = this.duplicateIDCaches.putIfAbsent(address, cache = new DuplicateIDCacheImpl(address, this.idCacheSize, this.storageManager, this.persistIDCache))) != null) {
            cache = oldCache;
        }
        return cache;
    }

    @Override
    public Object getNotificationLock() {
        return this.notificationLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception {
        Binding binding = this.addressManager.getBinding(queueName);
        if (binding == null) {
            throw new IllegalStateException("Cannot find queue " + queueName);
        }
        Queue queue = (Queue)binding.getBindable();
        Object object = this.notificationLock;
        synchronized (object) {
            ServerMessage message = new ServerMessageImpl(this.storageManager.generateUniqueID(), 50);
            message.setAddress(queueName);
            message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
            this.routeQueueInfo(message, queue, false);
            for (QueueInfo info : this.queueInfos.values()) {
                if (!info.getAddress().startsWith(address)) continue;
                message = this.createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                this.routeQueueInfo(message, queue, true);
                int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
                for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; ++i) {
                    message = this.createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
                    message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                    message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                    message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                    message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                    this.routeQueueInfo(message, queue, true);
                }
                if (info.getFilterStrings() == null) continue;
                for (SimpleString filterString : info.getFilterStrings()) {
                    message = this.createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
                    message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                    message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                    message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                    message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
                    message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                    this.routeQueueInfo(message, queue, true);
                }
            }
        }
    }

    private void setPagingStore(ServerMessage message) throws Exception {
        PagingStore store = this.pagingManager.getPageStore(message.getAddress());
        message.setPagingStore(store);
    }

    private void routeQueueInfo(ServerMessage message, Queue queue, boolean applyFilters) throws Exception {
        if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
            RoutingContextImpl context = new RoutingContextImpl(null);
            queue.route(message, context);
            this.processRoute(message, context, false);
        }
    }

    private void processRoute(ServerMessage message, RoutingContext context, final boolean direct) throws Exception {
        MessageReference reference;
        final ArrayList<MessageReference> refs = new ArrayList<MessageReference>();
        Transaction tx = context.getTransaction();
        for (Queue queue : context.getNonDurableQueues()) {
            reference = message.createReference(queue);
            refs.add(reference);
            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
                reference.setScheduledDeliveryTime(scheduledDeliveryTime);
            }
            message.incrementRefCount();
        }
        Iterator<Queue> iter = context.getDurableQueues().iterator();
        while (iter.hasNext()) {
            Queue queue;
            queue = iter.next();
            reference = message.createReference(queue);
            refs.add(reference);
            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
                reference.setScheduledDeliveryTime(scheduledDeliveryTime);
            }
            if (message.isDurable()) {
                int durableRefCount = message.incrementDurableRefCount();
                if (durableRefCount == 1) {
                    if (tx != null) {
                        this.storageManager.storeMessageTransactional(tx.getID(), message);
                    } else {
                        this.storageManager.storeMessage(message);
                    }
                }
                if (tx != null) {
                    this.storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
                    tx.setContainsPersistent();
                } else {
                    this.storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                }
                if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                    if (tx != null) {
                        this.storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
                    } else {
                        this.storageManager.updateScheduledDeliveryTime(reference);
                    }
                }
            }
            message.incrementRefCount();
        }
        if (tx != null) {
            tx.addOperation(new AddOperation(refs));
        } else {
            this.storageManager.afterCompleteOperations(new IOAsyncTask(){

                @Override
                public void onError(int errorCode, String errorMessage) {
                    log.warn("It wasn't possible to add references due to an IO error code " + errorCode + " message = " + errorMessage);
                }

                @Override
                public void done() {
                    PostOfficeImpl.this.addReferences(refs, direct);
                }
            });
        }
    }

    private void addReferences(List<MessageReference> refs, boolean direct) {
        for (MessageReference ref : refs) {
            ref.getQueue().addLast(ref, direct);
        }
    }

    private synchronized void startExpiryScanner() {
        if (this.reaperPeriod > 0L) {
            this.reaperThread = new Thread((Runnable)this.reaperRunnable, "hornetq-expiry-reaper-thread");
            this.reaperThread.setPriority(this.reaperPriority);
            this.reaperThread.start();
        }
    }

    private ServerMessage createQueueInfoMessage(NotificationType type, SimpleString queueName) {
        ServerMessageImpl message = new ServerMessageImpl(this.storageManager.generateUniqueID(), 50);
        message.setAddress(queueName);
        String uid = UUIDGenerator.getInstance().generateStringUUID();
        message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
        message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
        message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final PageMessageOperation getPageOperation(Transaction tx) {
        Transaction transaction = tx;
        synchronized (transaction) {
            PageMessageOperation oper = (PageMessageOperation)tx.getProperty(7);
            if (oper == null) {
                oper = new PageMessageOperation();
                tx.putProperty(7, oper);
                tx.addOperation(oper);
            }
            return oper;
        }
    }

    @Override
    public Bindings createBindings() {
        return new BindingsImpl(this.server.getGroupingHandler());
    }

    private class AddOperation
    implements TransactionOperation {
        private final List<MessageReference> refs;

        AddOperation(List<MessageReference> refs) {
            this.refs = refs;
        }

        @Override
        public void afterCommit(Transaction tx) {
            for (MessageReference ref : this.refs) {
                ref.getQueue().addLast(ref, false);
            }
        }

        @Override
        public void afterPrepare(Transaction tx) {
        }

        @Override
        public void afterRollback(Transaction tx) {
        }

        @Override
        public void beforeCommit(Transaction tx) throws Exception {
        }

        @Override
        public void beforePrepare(Transaction tx) throws Exception {
        }

        @Override
        public void beforeRollback(Transaction tx) throws Exception {
            for (MessageReference ref : this.refs) {
                ServerMessage message = ref.getMessage();
                if (message.isDurable() && ref.getQueue().isDurable()) {
                    message.decrementDurableRefCount();
                }
                message.decrementRefCount();
            }
        }
    }

    private class PageMessageOperation
    implements TransactionOperation {
        private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();

        private PageMessageOperation() {
        }

        void addMessageToPage(ServerMessage message) {
            this.messagesToPage.add(message);
        }

        public Collection<Queue> getDistinctQueues() {
            return Collections.emptySet();
        }

        @Override
        public void afterCommit(Transaction tx) {
            PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(5);
            if (pageTransaction != null) {
                pageTransaction.commit();
            }
        }

        @Override
        public void afterPrepare(Transaction tx) {
        }

        @Override
        public void afterRollback(Transaction tx) {
            PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(5);
            if (tx.getState() == Transaction.State.PREPARED && pageTransaction != null) {
                pageTransaction.rollback();
            }
        }

        @Override
        public void beforeCommit(Transaction tx) throws Exception {
            if (tx.getState() != Transaction.State.PREPARED) {
                this.pageMessages(tx);
            }
        }

        @Override
        public void beforePrepare(Transaction tx) throws Exception {
            this.pageMessages(tx);
        }

        @Override
        public void beforeRollback(Transaction tx) throws Exception {
        }

        private void pageMessages(Transaction tx) throws Exception {
            if (!this.messagesToPage.isEmpty()) {
                PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(5);
                if (pageTransaction == null) {
                    pageTransaction = new PageTransactionInfoImpl(tx.getID());
                    tx.putProperty(5, pageTransaction);
                    PostOfficeImpl.this.pagingManager.addTransaction(pageTransaction);
                }
                boolean pagingPersistent = false;
                HashSet<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
                boolean first = true;
                for (ServerMessage message : this.messagesToPage) {
                    if (message.page(tx.getID(), first)) {
                        if (message.isDurable()) {
                            pageTransaction.increment();
                            pagingPersistent = true;
                            pagingStoresToSync.add(message.getPagingStore());
                        }
                    } else {
                        PostOfficeImpl.this.route(message, false);
                    }
                    first = false;
                }
                if (pagingPersistent) {
                    tx.setContainsPersistent();
                    if (!pagingStoresToSync.isEmpty()) {
                        for (PagingStore store : pagingStoresToSync) {
                            store.sync();
                        }
                        PostOfficeImpl.this.storageManager.storePageTransaction(tx.getID(), pageTransaction);
                    }
                }
            }
        }
    }

    private class Reaper
    implements Runnable {
        private volatile boolean closed = false;

        private Reaper() {
        }

        public synchronized void stop() {
            this.closed = true;
            this.notify();
        }

        @Override
        public synchronized void run() {
            if (this.closed) {
                log.warn("Reaper thread being restarted");
                this.closed = false;
            }
            while (PostOfficeImpl.this.isStarted()) {
                long now;
                long start = System.currentTimeMillis();
                for (long toWait = PostOfficeImpl.this.reaperPeriod; !this.closed && toWait > 0L; toWait -= now - start) {
                    try {
                        this.wait(toWait);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    now = System.currentTimeMillis();
                    start = now;
                }
                if (this.closed) {
                    return;
                }
                Map<SimpleString, Binding> nameMap = PostOfficeImpl.this.addressManager.getBindings();
                ArrayList<Queue> queues = new ArrayList<Queue>();
                for (Binding binding : nameMap.values()) {
                    if (binding.getType() != BindingType.LOCAL_QUEUE) continue;
                    Queue queue = (Queue)binding.getBindable();
                    queues.add(queue);
                }
                for (Queue queue : queues) {
                    try {
                        queue.expireReferences();
                    }
                    catch (Exception e) {
                        log.error("failed to expire messages for queue " + queue.getName(), e);
                    }
                }
            }
        }
    }
}

