package org.wso2.andes.server.cluster;

import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.cassandra.CassandraQueueMessage;
import org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.CassandraMessageStore;

/* loaded from: input_file:org/wso2/andes/server/cluster/GlobalQueueWorker.class */
public class GlobalQueueWorker implements Runnable {
    private static Log log = LogFactory.getLog(GlobalQueueWorker.class);
    private String globalQueueName;
    private boolean running;
    private int messageCountToReadFromCasssandra;
    private CassandraMessageStore cassandraMessageStore;
    private long totMsgMoved = 0;
    private long lastProcessedMessageId = 0;

    public GlobalQueueWorker(String str, CassandraMessageStore cassandraMessageStore, int i) {
        this.cassandraMessageStore = cassandraMessageStore;
        this.globalQueueName = str;
        this.messageCountToReadFromCasssandra = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        int queueWorkerInterval = ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval();
        ClusteringEnabledSubscriptionManager subscriptionManager = ClusterResourceHolder.getInstance().getSubscriptionManager();
        int i = 0;
        while (this.running) {
            try {
                List<CassandraQueueMessage> messagesFromGlobalQueue = this.cassandraMessageStore.getMessagesFromGlobalQueue(this.globalQueueName, this.lastProcessedMessageId, this.messageCountToReadFromCasssandra);
                PerformanceCounter.recordGlobalQueueMsgMove(messagesFromGlobalQueue.size());
                if (subscriptionManager == null) {
                    subscriptionManager = ClusterResourceHolder.getInstance().getSubscriptionManager();
                }
                if (messagesFromGlobalQueue == null || messagesFromGlobalQueue.size() <= 0) {
                    try {
                        Thread.sleep(queueWorkerInterval);
                        i++;
                        if (i > 1) {
                            resetMessageReading();
                        }
                    } catch (InterruptedException e) {
                    }
                } else {
                    i = 0;
                    Iterator<CassandraQueueMessage> it = messagesFromGlobalQueue.iterator();
                    while (it.hasNext()) {
                        CassandraQueueMessage next = it.next();
                        String destinationQueueName = next.getDestinationQueueName();
                        Random random = new Random();
                        if (subscriptionManager.getNodeQueuesHavingSubscriptionsForQueue(destinationQueueName) == null || subscriptionManager.getNodeQueuesHavingSubscriptionsForQueue(destinationQueueName).size() <= 0) {
                            it.remove();
                        } else {
                            next.setNodeQueue(subscriptionManager.getNodeQueuesHavingSubscriptionsForQueue(destinationQueueName).get(random.nextInt(subscriptionManager.getNodeQueuesHavingSubscriptionsForQueue(destinationQueueName).size())));
                        }
                        this.lastProcessedMessageId = next.getMessageId();
                    }
                    this.cassandraMessageStore.transferMessageBatchFromGlobalQueueToNodeQueue(messagesFromGlobalQueue, this.globalQueueName);
                    this.totMsgMoved += messagesFromGlobalQueue.size();
                    if (log.isDebugEnabled()) {
                        log.debug("[Global, " + this.globalQueueName + "] moved " + messagesFromGlobalQueue.size() + " to node queues, tot = " + this.totMsgMoved + " ,Last ID:" + this.lastProcessedMessageId);
                    }
                }
            } catch (Exception e2) {
                log.error("Error in moving messages from global queue to node queue", e2);
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void setRunning(boolean z) {
        this.running = z;
    }

    public void resetMessageReading() {
        this.lastProcessedMessageId = 0L;
        if (log.isDebugEnabled()) {
            log.debug("Worker for Global Queue " + this.globalQueueName + " is reset");
        }
    }
}
