package org.wso2.andes.server.cassandra;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.store.util.CassandraDataAccessException;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/DefaultClusteringEnabledSubscriptionManager.class */
public class DefaultClusteringEnabledSubscriptionManager implements ClusteringEnabledSubscriptionManager {
    private static Log log = LogFactory.getLog(DefaultClusteringEnabledSubscriptionManager.class);
    private Map<String, List<String>> globalSubscriptionsMap = new ConcurrentHashMap();
    private Set<String> destinationQueues = Collections.synchronizedSet(new HashSet());
    private Map<String, QueueDeliveryWorker> workMap = new ConcurrentHashMap();
    private Map<String, Map<String, CassandraSubscription>> subscriptionMap = new ConcurrentHashMap();
    private ExecutorService messageFlusherExecutor = null;
    private SequentialThreadPoolExecutor messageDeliveryExecutor = null;
    private Map<AMQChannel, Map<Long, Semaphore>> unAckedMessagelocks = new ConcurrentHashMap();
    private Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> acknowledgementHandlerMap = new ConcurrentHashMap();
    private int queueWorkerWaitInterval;

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void init() {
        this.messageFlusherExecutor = Executors.newFixedThreadPool(ClusterResourceHolder.getInstance().getClusterConfiguration().getFlusherPoolSize(), new ThreadFactoryBuilder().setNameFormat("QueueDeliveryWorker-%d").build());
        this.messageDeliveryExecutor = new SequentialThreadPoolExecutor(ClusterResourceHolder.getInstance().getClusterConfiguration().getPublisherPoolSize(), "AsyncQueueDelivery");
        this.queueWorkerWaitInterval = ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval();
        clearAndUpdateDestinationQueueList();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void addSubscription(AMQQueue aMQQueue, CassandraSubscription cassandraSubscription) {
        try {
            if (cassandraSubscription.getSubscription() instanceof SubscriptionImpl.BrowserSubscription) {
                new QueueBrowserDeliveryWorker(cassandraSubscription.getSubscription(), aMQQueue, cassandraSubscription.getSession(), ClusterResourceHolder.getInstance().getClusterConfiguration().isInMemoryMode().booleanValue()).send();
            } else {
                Map<String, CassandraSubscription> map = this.subscriptionMap.get(aMQQueue.getResourceName());
                if (map == null || map.size() == 0) {
                    synchronized (this.subscriptionMap) {
                        Map<String, CassandraSubscription> map2 = this.subscriptionMap.get(aMQQueue.getResourceName());
                        if (map2 == null || map2.size() == 0) {
                            Map<String, CassandraSubscription> map3 = this.subscriptionMap.get(aMQQueue.getResourceName());
                            if (map3 == null) {
                                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                                concurrentHashMap.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                                this.subscriptionMap.put(aMQQueue.getResourceName(), concurrentHashMap);
                                if (!aMQQueue.checkIfBoundToTopicExchange()) {
                                    handleSubscription(aMQQueue);
                                }
                            } else if (map3.size() == 0) {
                                map3.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                                if (!aMQQueue.checkIfBoundToTopicExchange()) {
                                    handleSubscription(aMQQueue);
                                }
                            }
                            incrementSubscriptionCount(true, aMQQueue.getResourceName());
                        } else {
                            map2.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                        }
                    }
                } else {
                    map.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                    incrementSubscriptionCount(false, aMQQueue.getResourceName());
                }
                log.info("Binding Subscription " + cassandraSubscription.getSubscription().getSubscriptionID() + " to queue " + aMQQueue.getName());
            }
            if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } else {
                synchronized (this.destinationQueues) {
                    Iterator<String> it = ClusterResourceHolder.getInstance().getSubscriptionManager().updateNodeQueuesForDestinationQueueMap().iterator();
                    while (it.hasNext()) {
                        ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void clearAndUpdateDestinationQueueList() {
        this.destinationQueues.clear();
        try {
            Iterator<String> it = ClusterResourceHolder.getInstance().getCassandraMessageStore().getDestinationQueues().iterator();
            while (it.hasNext()) {
                this.destinationQueues.add(it.next());
            }
        } catch (AMQStoreException e) {
            log.error("Error in updating in-memory list of destination queues", e);
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> updateNodeQueuesForDestinationQueueMap() {
        ArrayList arrayList = new ArrayList();
        try {
            HashMap hashMap = new HashMap();
            for (String str : this.destinationQueues) {
                if (this.globalSubscriptionsMap.get(str) == null) {
                    hashMap.put(str, 0);
                } else {
                    hashMap.put(str, Integer.valueOf(this.globalSubscriptionsMap.get(str).size()));
                }
            }
            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
            this.globalSubscriptionsMap.clear();
            for (String str2 : this.destinationQueues) {
                List<String> nodeQueuesForDestinationQueue = cassandraMessageStore.getNodeQueuesForDestinationQueue(str2);
                if (nodeQueuesForDestinationQueue.size() > 0) {
                    ArrayList arrayList2 = new ArrayList();
                    for (String str3 : nodeQueuesForDestinationQueue) {
                        for (long j = 0; j < cassandraMessageStore.getSubscriptionCountForQueue(str2, str3); j++) {
                            arrayList2.add(str3);
                        }
                    }
                    this.globalSubscriptionsMap.put(str2, arrayList2);
                }
            }
            for (String str4 : this.destinationQueues) {
                int size = this.globalSubscriptionsMap.get(str4) == null ? 0 : this.globalSubscriptionsMap.get(str4).size();
                int intValue = ((Integer) hashMap.get(str4)).intValue();
                if (size > 0 && intValue == 0) {
                    arrayList.add(str4);
                }
            }
        } catch (CassandraDataAccessException e) {
            log.error("Error in getting the Node Queues as cassandra connection is down");
        } catch (Exception e2) {
            e2.printStackTrace();
        }
        return arrayList;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void removeSubscription(String str, String str2, boolean z) {
        try {
            Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
            if (map != null && map.containsKey(str2)) {
                map.remove(str2);
                log.info("Removing Subscription " + str2 + " from queue " + str);
                ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementSubscriptionCount(str, AndesUtils.getMyNodeQueueName(), 1L);
                if (map.size() == 0) {
                    log.debug("Executing subscription removal handler to minimize message losses");
                    if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                        handleMessageRemoval(str, AndesUtils.getGlobalQueueNameForDestinationQueue(str));
                    }
                    ClusterResourceHolder.getInstance().getCassandraMessageStore().removeSubscriptionCounterForQueue(str, AndesUtils.getMyNodeQueueName());
                }
            }
        } catch (Exception e) {
            log.error("Error while removing subscription for queue: " + str, e);
        }
        try {
            if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } else {
                Iterator<String> it = ClusterResourceHolder.getInstance().getSubscriptionManager().updateNodeQueuesForDestinationQueueMap().iterator();
                while (it.hasNext()) {
                    ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
                }
            }
        } catch (Exception e2) {
            log.error("Error while notifying Subscription change");
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getNodeQueuesHavingSubscriptionsForQueue(String str) {
        return this.globalSubscriptionsMap.get(str);
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void handleFreshSubscriptionsJoiningToCluster() {
        Iterator<String> it = updateNodeQueuesForDestinationQueueMap().iterator();
        while (it.hasNext()) {
            ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
        }
    }

    private void handleMessageRemoval(String str, String str2) throws AMQStoreException {
        try {
            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
            String myNodeQueueName = AndesUtils.getMyNodeQueueName();
            int i = 0;
            long j = 0;
            for (List<CassandraQueueMessage> messagesFromNodeQueue = cassandraMessageStore.getMessagesFromNodeQueue(myNodeQueueName, 40, 0L); messagesFromNodeQueue.size() != 0; messagesFromNodeQueue = cassandraMessageStore.getMessagesFromNodeQueue(myNodeQueueName, 40, j)) {
                for (CassandraQueueMessage cassandraQueueMessage : messagesFromNodeQueue) {
                    if (cassandraQueueMessage.getDestinationQueueName().equals(str)) {
                        i++;
                        cassandraMessageStore.removeMessageFromNodeQueue(myNodeQueueName, cassandraQueueMessage.getMessageId());
                        try {
                            cassandraMessageStore.addMessageToGlobalQueue(str2, cassandraQueueMessage.getNodeQueue(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage(), false, 0L, false);
                        } catch (Exception e) {
                            log.error(e);
                        }
                    }
                    j = cassandraQueueMessage.getMessageId();
                }
            }
            log.info("Moved " + i + " Number of Messages Addressed to Queue " + str + " from Node Queue " + myNodeQueueName + "to Global Queue");
        } catch (AMQStoreException e2) {
            log.error("Error removing messages addressed to " + str + "from relevant node queue");
        }
    }

    private void handleSubscription(AMQQueue aMQQueue) {
        try {
            String globalQueueNameForDestinationQueue = AndesUtils.getGlobalQueueNameForDestinationQueue(aMQQueue.getResourceName());
            String myNodeQueueName = AndesUtils.getMyNodeQueueName();
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addNodeQueueToGlobalQueue(globalQueueNameForDestinationQueue, myNodeQueueName);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addNodeQueueToDestinationQueue(aMQQueue.getResourceName(), myNodeQueueName);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addMessageCounterForQueue(aMQQueue.getName());
            if (this.workMap.get(myNodeQueueName) == null) {
                QueueDeliveryWorker queueDeliveryWorker = new QueueDeliveryWorker(myNodeQueueName, aMQQueue, this.subscriptionMap, this.messageDeliveryExecutor, this.queueWorkerWaitInterval, ClusterResourceHolder.getInstance().getClusterConfiguration().isInMemoryMode().booleanValue());
                this.workMap.put(myNodeQueueName, queueDeliveryWorker);
                this.messageFlusherExecutor.execute(queueDeliveryWorker);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Error while adding subscription to queue :" + aMQQueue, e);
        }
    }

    public void incrementSubscriptionCount(boolean z, String str) {
        String myNodeQueueName = AndesUtils.getMyNodeQueueName();
        if (z) {
            ClusterResourceHolder.getInstance().getCassandraMessageStore().removeSubscriptionCounterForQueue(str, myNodeQueueName);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addSubscriptionCounterForQueue(str, myNodeQueueName);
        }
        ClusterResourceHolder.getInstance().getCassandraMessageStore().incrementSubscriptionCount(str, myNodeQueueName, 1L);
    }

    public void markSubscriptionForRemovel(String str) {
        QueueDeliveryWorker queueDeliveryWorker = this.workMap.get(str);
        if (queueDeliveryWorker != null) {
            queueDeliveryWorker.stopFlusher();
        }
    }

    public int getNumberOfSubscriptionsForQueue(String str) {
        int i = 0;
        Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
        if (map != null) {
            i = map.size();
        }
        return i;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void stopAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().stopFlusher();
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void startAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().startFlusher();
        }
    }

    public Map<String, QueueDeliveryWorker> getWorkMap() {
        return this.workMap;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, Map<Long, Semaphore>> getUnAcknowledgedMessageLocks() {
        return this.unAckedMessagelocks;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> getAcknowledgementHandlerMap() {
        return this.acknowledgementHandlerMap;
    }
}
