package org.wso2.andes.server.cassandra;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/InOrderMessageFlusher.class */
public class InOrderMessageFlusher {
    private Subscription subscription;
    private AMQQueue queue;
    private AMQProtocolSession session;
    private String id;
    private static Log log = LogFactory.getLog(InOrderMessageFlusher.class);
    private int defaultMessageCount = 1;
    private long maxWaitTimePerMessage = 120000;
    private int messageCount = this.defaultMessageCount;
    private ClusteringEnabledSubscriptionManager subscriptionManager = ClusterResourceHolder.getInstance().getSubscriptionManager();

    public InOrderMessageFlusher(Subscription subscription, AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession) {
        this.subscription = subscription;
        this.queue = aMQQueue;
        this.session = aMQProtocolSession;
        this.id = "" + subscription.getSubscriptionID();
    }

    public void send() {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        try {
            List<QueueEntry> queueFilteredMessagesFromGlobalQueue = cassandraMessageStore.getQueueFilteredMessagesFromGlobalQueue(this.queue, this.session, this.messageCount);
            if (queueFilteredMessagesFromGlobalQueue.size() > 0) {
                for (QueueEntry queueEntry : queueFilteredMessagesFromGlobalQueue) {
                    try {
                        if (this.subscription instanceof SubscriptionImpl.AckSubscription) {
                            synchronized (((SubscriptionImpl.AckSubscription) this.subscription).getChannel()) {
                                AMQChannel channel = ((SubscriptionImpl) this.subscription).getChannel();
                                if (!this.subscriptionManager.getUnAcknowledgedMessageLocks().containsKey(channel)) {
                                    this.subscriptionManager.getUnAcknowledgedMessageLocks().put(channel, new ConcurrentHashMap());
                                }
                                Semaphore semaphore = new Semaphore(1);
                                semaphore.acquire();
                                this.subscriptionManager.getUnAcknowledgedMessageLocks().get(channel).put(Long.valueOf(channel.getCurrentDeliveryTag() + 1), semaphore);
                                this.subscription.send(queueEntry);
                                semaphore.tryAcquire(ClusterResourceHolder.getInstance().getClusterConfiguration().getMaxAckWaitTime(), TimeUnit.SECONDS);
                                String globalQueueNameForDestinationQueue = AndesUtils.getGlobalQueueNameForDestinationQueue(this.queue.getName());
                                long longValue = queueEntry.getMessage().getMessageNumber().longValue();
                                cassandraMessageStore.removeMessageFromGlobalQueue(globalQueueNameForDestinationQueue, queueEntry.getMessage().getMessageNumber().longValue());
                                ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(this.queue.getName(), 1L);
                                cassandraMessageStore.addContentDeletionTask(longValue);
                                cassandraMessageStore.addMessageQueueMappingDeletionTask(this.queue.getName(), longValue);
                                log.debug("Ack:" + longValue);
                                PerformanceCounter.recordMessageDelivered(this.queue.getName());
                            }
                        }
                    } catch (Exception e) {
                        log.error("Unexpected Error in Message Flusher Task while delivering the message : ", e);
                    }
                }
            }
        } catch (AMQStoreException e2) {
            log.error("Error while sending messages ", e2);
        }
    }

    public String getSubscriptionId() {
        return this.id;
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    public AMQQueue getQueue() {
        return this.queue;
    }
}
