package org.wso2.broker.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.broker.common.data.types.FieldTable;
import org.wso2.broker.core.Exchange;
import org.wso2.broker.core.store.dao.BindingDao;
import org.wso2.broker.core.store.dao.DaoFactory;
import org.wso2.broker.core.store.dao.ExchangeDao;
import org.wso2.broker.core.store.dao.MessageDao;
import org.wso2.broker.core.store.dao.QueueDao;
import org.wso2.broker.core.store.dao.SharedMessageStore;
import org.wso2.broker.core.task.TaskExecutorService;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/wso2/broker/core/MessagingEngine.class */
public final class MessagingEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessagingEngine.class);
    private static final long IDLE_TASK_DELAY_MILLIS = 100;
    private static final int WORKER_COUNT = 5;
    public static final String DEFAULT_DEAD_LETTER_QUEUE = "amq.dlq";
    public static final String ORIGIN_QUEUE_HEADER = "x-origin-queue";
    public static final String ORIGIN_EXCHANGE_HEADER = "x-origin-exchange";
    public static final String ORIGIN_ROUTING_KEY_HEADER = "x-origin-routing-key";
    private final QueueRegistry queueRegistry;
    private final TaskExecutorService<MessageDeliveryTask> deliveryTaskService;
    private final ExchangeRegistry exchangeRegistry;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final SharedMessageStore sharedMessageStore;
    private final MessageIdGenerator messageIdGenerator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingEngine(DataSource dataSource) throws BrokerException {
        DaoFactory daoFactory = new DaoFactory(dataSource);
        QueueDao createQueueDao = daoFactory.createQueueDao();
        ExchangeDao createExchangeDao = daoFactory.createExchangeDao();
        BindingDao createBindingDao = daoFactory.createBindingDao();
        MessageDao createMessageDao = daoFactory.createMessageDao();
        this.exchangeRegistry = new ExchangeRegistry(createExchangeDao, createBindingDao);
        this.sharedMessageStore = new SharedMessageStore(createMessageDao, 32768, 1024);
        this.queueRegistry = new QueueRegistry(createQueueDao, this.sharedMessageStore);
        this.exchangeRegistry.retrieveFromStore(this.queueRegistry);
        this.deliveryTaskService = new TaskExecutorService<>(5, IDLE_TASK_DELAY_MILLIS, new ThreadFactoryBuilder().setNameFormat("MessageDeliveryTaskThreadPool-%d").build());
        this.messageIdGenerator = new MessageIdGenerator();
        initDefaultDeadLetterQueue();
    }

    private void initDefaultDeadLetterQueue() throws BrokerException {
        createQueue(DEFAULT_DEAD_LETTER_QUEUE, false, true, false);
        bind(DEFAULT_DEAD_LETTER_QUEUE, ExchangeRegistry.DEFAULT_DEAD_LETTER_EXCHANGE, DEFAULT_DEAD_LETTER_QUEUE, FieldTable.EMPTY_TABLE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void bind(String str, String str2, String str3, FieldTable fieldTable) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            Exchange exchange = this.exchangeRegistry.getExchange(str2);
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (exchange == null) {
                throw new BrokerException("Unknown exchange name: " + str2);
            }
            if (queueHandler == null) {
                throw new BrokerException("Unknown queue name: " + str);
            }
            if (!str3.isEmpty()) {
                exchange.bind(queueHandler.getQueue(), str3, fieldTable);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unbind(String str, String str2, String str3) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            Exchange exchange = this.exchangeRegistry.getExchange(str2);
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (exchange == null) {
                throw new BrokerException("Unknown exchange name: " + str2);
            }
            if (queueHandler == null) {
                throw new BrokerException("Unknown queue name: " + str);
            }
            exchange.unbind(queueHandler.getQueue(), str3);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean createQueue(String str, boolean z, boolean z2, boolean z3) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            boolean addQueue = this.queueRegistry.addQueue(str, z, z2, z3);
            if (addQueue) {
                this.exchangeRegistry.getDefaultExchange().bind(this.queueRegistry.getQueueHandler(str).getQueue(), str, FieldTable.EMPTY_TABLE);
            }
            return addQueue;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            Metadata metadata = message.getMetadata();
            Exchange exchange = this.exchangeRegistry.getExchange(metadata.getExchangeName());
            if (exchange == null) {
                throw new BrokerException("Message publish failed. Unknown exchange: " + metadata.getExchangeName());
            }
            String routingKey = metadata.getRoutingKey();
            BindingSet bindingsForRoute = exchange.getBindingsForRoute(routingKey);
            if (bindingsForRoute.isEmpty()) {
                LOGGER.info("Dropping message since no queues found for routing key " + routingKey + " in " + exchange);
                message.release();
            } else {
                try {
                    this.sharedMessageStore.add(message);
                    HashSet hashSet = new HashSet();
                    Iterator<Binding> it = bindingsForRoute.getUnfilteredBindings().iterator();
                    while (it.hasNext()) {
                        hashSet.add(it.next().getQueue().getName());
                    }
                    for (Binding binding : bindingsForRoute.getFilteredBindings()) {
                        if (binding.getFilterExpression().evaluate(metadata)) {
                            hashSet.add(binding.getQueue().getName());
                        }
                    }
                    publishToQueues(message, hashSet);
                    this.sharedMessageStore.flush(metadata.getInternalId());
                    message.release();
                } catch (Throwable th) {
                    this.sharedMessageStore.flush(metadata.getInternalId());
                    message.release();
                    throw th;
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private void publishToQueues(Message message, Set<String> set) throws BrokerException {
        if (set.isEmpty()) {
            LOGGER.info("Dropping message since message didn't have any routes to {}", message.getMetadata().getRoutingKey());
            return;
        }
        for (String str : set) {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            Message shallowCopy = message.shallowCopy();
            if (!queueHandler.enqueue(shallowCopy)) {
                LOGGER.info("Failed to publish message {} to the queue {}", message, str);
                shallowCopy.release();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(String str, Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            this.queueRegistry.getQueueHandler(str).acknowledge(message);
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean deleteQueue(String str, boolean z, boolean z2) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            boolean removeQueue = this.queueRegistry.removeQueue(str, z, z2);
            this.lock.writeLock().unlock();
            return removeQueue;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consume(Consumer consumer) throws BrokerException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Consume request received for {}", consumer.getQueueName());
        }
        this.lock.readLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(consumer.getQueueName());
            if (queueHandler == null) {
                throw new BrokerException("Cannot add consumer. Queue [ " + consumer.getQueueName() + " ] not found. Create the queue before attempting to consume.");
            }
            synchronized (queueHandler) {
                if (queueHandler.addConsumer(consumer) && queueHandler.consumerCount() == 1) {
                    this.deliveryTaskService.add(new MessageDeliveryTask(queueHandler));
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startMessageDelivery() {
        this.deliveryTaskService.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopMessageDelivery() {
        this.deliveryTaskService.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createExchange(String str, String str2, boolean z, boolean z2) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            this.exchangeRegistry.declareExchange(str, Exchange.Type.from(str2), z, z2);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteExchange(String str, boolean z) throws BrokerException {
        this.lock.writeLock().lock();
        try {
            this.exchangeRegistry.deleteExchange(str, z);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeConsumer(Consumer consumer) {
        this.lock.readLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(consumer.getQueueName());
            if (queueHandler != null) {
                synchronized (queueHandler) {
                    if (queueHandler.removeConsumer(consumer) && queueHandler.consumerCount() == 0) {
                        this.deliveryTaskService.remove(queueHandler.getQueue().getName());
                    }
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getNextMessageId() {
        return this.messageIdGenerator.getNextId();
    }

    public void requeue(String str, Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            this.queueRegistry.getQueueHandler(str).requeue(message);
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public void moveToDlc(String str, Message message) throws BrokerException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Moving message to DLC: {}", message);
        }
        try {
            Message shallowCopyWith = message.shallowCopyWith(getNextMessageId(), DEFAULT_DEAD_LETTER_QUEUE, ExchangeRegistry.DEFAULT_DEAD_LETTER_EXCHANGE);
            shallowCopyWith.getMetadata().addHeader(ORIGIN_QUEUE_HEADER, str);
            shallowCopyWith.getMetadata().addHeader(ORIGIN_EXCHANGE_HEADER, message.getMetadata().getExchangeName());
            shallowCopyWith.getMetadata().addHeader(ORIGIN_ROUTING_KEY_HEADER, message.getMetadata().getRoutingKey());
            publish(shallowCopyWith);
            acknowledge(str, message);
            message.release();
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    public Collection<QueueHandler> getAllQueues() {
        return this.queueRegistry.getAllQueues();
    }

    public QueueHandler getQueue(String str) {
        return this.queueRegistry.getQueueHandler(str);
    }
}
