package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.exchange.ExchangeDefaults;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.binding.Binding;
import org.wso2.andes.server.exchange.AbstractExchange;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.queue.SimpleAMQQueue;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.util.AndesUtils;
import org.wso2.andes.server.virtualhost.VirtualHost;

/* loaded from: input_file:org/wso2/andes/server/cassandra/TopicDeliveryWorker.class */
public class TopicDeliveryWorker extends Thread {
    private AMQProtocolSession session;
    private Binding binding;
    private SimpleAMQQueue queue;
    private AbstractExchange exchange;
    private VirtualHost virtualHost;
    private boolean markedForRemoval;
    private CassandraMessageStore messageStore;
    private boolean isInMemoryMode;
    private static Log log = LogFactory.getLog(TopicDeliveryWorker.class);
    private SequentialThreadPoolExecutor executor;
    private long lastDeliveredMessageID = 0;
    private boolean working = false;
    private String topicNodeQueueName = AndesUtils.getTopicNodeQueueName();
    private String id = this.topicNodeQueueName;

    public TopicDeliveryWorker(Binding binding, AMQQueue aMQQueue, Exchange exchange, VirtualHost virtualHost, boolean z) {
        this.messageStore = null;
        this.isInMemoryMode = false;
        this.binding = binding;
        this.exchange = (AbstractExchange) exchange;
        this.queue = (SimpleAMQQueue) aMQQueue;
        this.virtualHost = virtualHost;
        this.messageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        this.isInMemoryMode = z;
        this.messageStore.registerSubscriberForTopic(binding.getBindingKey(), this.topicNodeQueueName, aMQQueue.getResourceName());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                if (this.isInMemoryMode) {
                    try {
                        this.working = true;
                        List<AMQMessage> nextTopicMessageToDeliver = this.messageStore.getNextTopicMessageToDeliver();
                        if (nextTopicMessageToDeliver != null) {
                            ArrayList arrayList = new ArrayList();
                            try {
                                for (AMQMessage aMQMessage : nextTopicMessageToDeliver) {
                                    enqueueMessageToWorkerDestinationQueue(aMQMessage);
                                    arrayList.add(aMQMessage.getMessageNumber());
                                    this.lastDeliveredMessageID = aMQMessage.getMessageNumber().longValue();
                                    if (log.isDebugEnabled()) {
                                        log.debug("Sending message  " + this.lastDeliveredMessageID + "from cassandra topic publisher");
                                    }
                                }
                            } catch (Exception e) {
                                log.error("Error on enqueue messages to relevant queue:" + e.getMessage(), e);
                            }
                            this.messageStore.removeDeliveredTopicMessageIdsFromIncomingMessagesTable(arrayList);
                        } else {
                            try {
                                Thread.sleep(ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval());
                            } catch (InterruptedException e2) {
                            }
                        }
                        this.working = false;
                        return;
                    } catch (Exception e3) {
                        log.error("Error in sending message out in in memory mode ", e3);
                        this.working = false;
                        return;
                    }
                }
                try {
                    this.working = true;
                    CassandraMessageStore cassandraMessageStore = this.messageStore;
                    String str = this.topicNodeQueueName;
                    long j = this.lastDeliveredMessageID;
                    this.lastDeliveredMessageID = j + 1;
                    List<AMQMessage> subscriberMessages = cassandraMessageStore.getSubscriberMessages(str, j);
                    if (subscriberMessages == null || subscriberMessages.size() <= 0) {
                        try {
                            Thread.sleep(ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval());
                        } catch (InterruptedException e4) {
                        }
                    } else {
                        ArrayList arrayList2 = new ArrayList();
                        for (AMQMessage aMQMessage2 : subscriberMessages) {
                            try {
                                enqueueMessage(aMQMessage2);
                                arrayList2.add(aMQMessage2.getMessageNumber());
                                this.lastDeliveredMessageID = aMQMessage2.getMessageNumber().longValue();
                                if (log.isDebugEnabled()) {
                                    log.debug("Sending message  " + this.lastDeliveredMessageID + "from cassandra topic publisher" + this.queue.getName());
                                }
                            } catch (Exception e5) {
                                log.error("Error on enqueue messages to relevant queue:" + e5.getMessage(), e5);
                            }
                        }
                        this.messageStore.removeDeliveredTopicMessageIds(arrayList2, this.topicNodeQueueName);
                    }
                    this.working = false;
                } catch (AMQStoreException e6) {
                    log.error("Error removing delivered Message Ids from Message store ", e6);
                    this.working = false;
                }
            } catch (Throwable th) {
                this.working = false;
                throw th;
            }
        } catch (Throwable th2) {
            this.working = false;
            throw th2;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void enqueueMessage(AMQMessage aMQMessage) {
        Exchange exchange = this.virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
        if (exchange != null) {
            String aMQShortString = aMQMessage.getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
            for (Binding binding : exchange.getBindings()) {
                if (isMatching(binding.getBindingKey(), aMQShortString)) {
                    aMQMessage.setTopicMessage(true);
                    deliverAsynchronously(binding, aMQMessage);
                    if (log.isDebugEnabled()) {
                        log.info("sent1 (" + aMQMessage.getMessageNumber() + ")" + AndesUtils.printAMQMessage((QueueEntry) aMQMessage));
                    }
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void enqueueMessageToWorkerDestinationQueue(AMQMessage aMQMessage) {
        Exchange exchange = this.virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
        if (exchange != null) {
            String aMQShortString = aMQMessage.getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
            for (Binding binding : exchange.getBindings()) {
                if (isMatching(binding.getBindingKey(), aMQShortString)) {
                    aMQMessage.setTopicMessage(true);
                    deliverAsynchronously(binding, aMQMessage);
                    if (log.isDebugEnabled()) {
                        log.info("sent1 (" + aMQMessage.getMessageNumber() + ")" + AndesUtils.printAMQMessage((QueueEntry) aMQMessage));
                    }
                }
            }
        }
    }

    public boolean isMatching(String str, String str2) {
        boolean z = false;
        if (str.equals(str2)) {
            z = true;
        } else if (str.indexOf(".#") > 1) {
            z = Pattern.compile(str.substring(0, str.indexOf(".#")) + ".*").matcher(str2).matches();
        } else if (str.indexOf(".*") > 1) {
            z = Pattern.compile("^" + str.substring(0, str.indexOf(".*")) + "[.][^.]+$").matcher(str2).matches();
        }
        return z;
    }

    public boolean isWorking() {
        return this.working;
    }

    public boolean isMarkedForRemoval() {
        return this.markedForRemoval;
    }

    public void setMarkedForRemoval(boolean z) {
        this.markedForRemoval = z;
    }

    public String getQueueId() {
        return this.id;
    }

    private void deliverAsynchronously(final Binding binding, final AMQMessage aMQMessage) {
        ClusterResourceHolder.getInstance().getCassandraTopicPublisherManager().getMessagePublishingExecutor().submit(new Runnable() { // from class: org.wso2.andes.server.cassandra.TopicDeliveryWorker.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    binding.getQueue().enqueue(aMQMessage);
                } catch (Throwable th) {
                    TopicDeliveryWorker.log.error("Error while delivering message ", th);
                }
            }
        }, Math.abs(binding.getId().hashCode()));
    }
}
