package io.ballerina.messaging.broker.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.ballerina.messaging.broker.common.ResourceNotFoundException;
import io.ballerina.messaging.broker.common.StartupContext;
import io.ballerina.messaging.broker.common.ValidationException;
import io.ballerina.messaging.broker.common.config.BrokerCommonConfiguration;
import io.ballerina.messaging.broker.common.config.BrokerConfigProvider;
import io.ballerina.messaging.broker.common.data.types.FieldTable;
import io.ballerina.messaging.broker.coordination.BasicHaListener;
import io.ballerina.messaging.broker.coordination.HaListener;
import io.ballerina.messaging.broker.coordination.HaStrategy;
import io.ballerina.messaging.broker.core.Exchange;
import io.ballerina.messaging.broker.core.configuration.BrokerCoreConfiguration;
import io.ballerina.messaging.broker.core.metrics.BrokerMetricManager;
import io.ballerina.messaging.broker.core.metrics.DefaultBrokerMetricManager;
import io.ballerina.messaging.broker.core.metrics.NullBrokerMetricManager;
import io.ballerina.messaging.broker.core.rest.api.ExchangesApi;
import io.ballerina.messaging.broker.core.rest.api.QueuesApi;
import io.ballerina.messaging.broker.core.store.DbBackedStoreFactory;
import io.ballerina.messaging.broker.core.store.MemBackedStoreFactory;
import io.ballerina.messaging.broker.core.store.MessageStore;
import io.ballerina.messaging.broker.core.store.StoreFactory;
import io.ballerina.messaging.broker.core.task.TaskExecutorService;
import io.ballerina.messaging.broker.core.transaction.BranchFactory;
import io.ballerina.messaging.broker.core.transaction.BrokerTransactionFactory;
import io.ballerina.messaging.broker.core.transaction.LocalTransaction;
import io.ballerina.messaging.broker.core.util.MessageTracer;
import io.ballerina.messaging.broker.rest.BrokerServiceRunner;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.sql.DataSource;
import javax.transaction.xa.Xid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.metrics.core.MetricService;

/* loaded from: input_file:io/ballerina/messaging/broker/core/Broker.class */
public final class Broker {
    private static final Logger LOGGER = LoggerFactory.getLogger(Broker.class);
    public static final String DEFAULT_DEAD_LETTER_QUEUE = "amq.dlq";
    public static final String ORIGIN_QUEUE_HEADER = "x-origin-queue";
    public static final String ORIGIN_EXCHANGE_HEADER = "x-origin-exchange";
    public static final String ORIGIN_ROUTING_KEY_HEADER = "x-origin-routing-key";
    private final BrokerMetricManager metricManager;
    private HaStrategy haStrategy;
    private BrokerHelper brokerHelper;
    private final BrokerTransactionFactory brokerTransactionFactory;
    private final QueueRegistry queueRegistry;
    private final TaskExecutorService<MessageDeliveryTask> deliveryTaskService;
    private final ExchangeRegistry exchangeRegistry;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final MessageStore messageStore;
    private final MessageIdGenerator messageIdGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/core/Broker$BrokerHelper.class */
    public class BrokerHelper {
        private BrokerHelper() {
        }

        public void startMessageDelivery() {
            Broker.LOGGER.info("Starting message delivery threads.");
            Broker.this.deliveryTaskService.start();
        }

        public void shutdown() {
            Broker.this.stopMessageDelivery();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/core/Broker$HaEnabledBrokerHelper.class */
    public class HaEnabledBrokerHelper extends BrokerHelper implements HaListener {
        private BasicHaListener basicHaListener;

        HaEnabledBrokerHelper() {
            super();
            this.basicHaListener = new BasicHaListener(this);
            Broker.this.haStrategy.registerListener(this.basicHaListener, 1);
        }

        @Override // io.ballerina.messaging.broker.core.Broker.BrokerHelper
        public synchronized void startMessageDelivery() {
            this.basicHaListener.setStartCalled();
            if (this.basicHaListener.isActive()) {
                super.startMessageDelivery();
            }
        }

        @Override // io.ballerina.messaging.broker.core.Broker.BrokerHelper
        public void shutdown() {
            Broker.this.haStrategy.unregisterListener(this.basicHaListener);
            super.shutdown();
        }

        public void activate() {
            try {
                Broker.this.queueRegistry.reloadQueuesOnBecomingActive();
                Broker.this.exchangeRegistry.reloadExchangesOnBecomingActive(Broker.this.queueRegistry);
            } catch (BrokerException e) {
                Broker.LOGGER.error("Error on loading data from the database on becoming active ", e);
            }
            startMessageDeliveryOnBecomingActive();
            Broker.LOGGER.info("Broker mode changed from PASSIVE to ACTIVE");
        }

        public void deactivate() {
            Broker.this.stopMessageDelivery();
            Broker.LOGGER.info("Broker mode changed from ACTIVE to PASSIVE");
        }

        private synchronized void startMessageDeliveryOnBecomingActive() {
            if (this.basicHaListener.isStartCalled()) {
                startMessageDelivery();
            }
        }
    }

    public Broker(StartupContext startupContext) throws Exception {
        this.metricManager = getMetricManager((MetricService) startupContext.getService(MetricService.class));
        BrokerConfigProvider brokerConfigProvider = (BrokerConfigProvider) startupContext.getService(BrokerConfigProvider.class);
        BrokerCoreConfiguration brokerCoreConfiguration = (BrokerCoreConfiguration) brokerConfigProvider.getConfigurationObject(BrokerCoreConfiguration.NAMESPACE, BrokerCoreConfiguration.class);
        StoreFactory storeFactory = getStoreFactory(startupContext, brokerConfigProvider, brokerCoreConfiguration);
        this.exchangeRegistry = storeFactory.getExchangeRegistry();
        this.messageStore = storeFactory.getMessageStore();
        this.queueRegistry = storeFactory.getQueueRegistry();
        this.exchangeRegistry.retrieveFromStore(this.queueRegistry);
        this.deliveryTaskService = createTaskExecutorService(brokerCoreConfiguration);
        this.messageIdGenerator = new MessageIdGenerator();
        initDefaultDeadLetterQueue();
        this.brokerTransactionFactory = new BrokerTransactionFactory(new BranchFactory(this, storeFactory));
        initRestApi(startupContext);
        initHaSupport(startupContext);
        startupContext.registerService(Broker.class, this);
    }

    private StoreFactory getStoreFactory(StartupContext startupContext, BrokerConfigProvider brokerConfigProvider, BrokerCoreConfiguration brokerCoreConfiguration) throws Exception {
        BrokerCommonConfiguration brokerCommonConfiguration = (BrokerCommonConfiguration) brokerConfigProvider.getConfigurationObject("ballerina.broker", BrokerCommonConfiguration.class);
        if (Objects.isNull(brokerCommonConfiguration)) {
            brokerCommonConfiguration = new BrokerCommonConfiguration();
        }
        return brokerCommonConfiguration.getEnableInMemoryMode() ? new MemBackedStoreFactory(this.metricManager, brokerCoreConfiguration) : new DbBackedStoreFactory((DataSource) startupContext.getService(DataSource.class), this.metricManager, brokerCoreConfiguration);
    }

    private void initRestApi(StartupContext startupContext) {
        BrokerServiceRunner brokerServiceRunner = (BrokerServiceRunner) startupContext.getService(BrokerServiceRunner.class);
        if (Objects.nonNull(brokerServiceRunner)) {
            brokerServiceRunner.deploy(new Object[]{new QueuesApi(this), new ExchangesApi(this)});
        }
    }

    private void initHaSupport(StartupContext startupContext) {
        this.haStrategy = (HaStrategy) startupContext.getService(HaStrategy.class);
        if (this.haStrategy == null) {
            this.brokerHelper = new BrokerHelper();
        } else {
            LOGGER.info("Broker is in PASSIVE mode");
            this.brokerHelper = new HaEnabledBrokerHelper();
        }
    }

    private void initDefaultDeadLetterQueue() throws BrokerException, ValidationException {
        createQueue(DEFAULT_DEAD_LETTER_QUEUE, false, true, false);
        bind(DEFAULT_DEAD_LETTER_QUEUE, ExchangeRegistry.DEFAULT_DEAD_LETTER_EXCHANGE, DEFAULT_DEAD_LETTER_QUEUE, FieldTable.EMPTY_TABLE);
    }

    private BrokerMetricManager getMetricManager(MetricService metricService) {
        return Objects.nonNull(metricService) ? new DefaultBrokerMetricManager(metricService) : new NullBrokerMetricManager();
    }

    private TaskExecutorService<MessageDeliveryTask> createTaskExecutorService(BrokerCoreConfiguration brokerCoreConfiguration) {
        return new TaskExecutorService<>(Integer.parseInt(brokerCoreConfiguration.getDeliveryTask().getWorkerCount()), Integer.parseInt(brokerCoreConfiguration.getDeliveryTask().getIdleTaskDelay()), new ThreadFactoryBuilder().setNameFormat("MessageDeliveryTaskThreadPool-%d").build());
    }

    public void publish(Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            Metadata metadata = message.getMetadata();
            Exchange exchange = this.exchangeRegistry.getExchange(metadata.getExchangeName());
            if (exchange == null) {
                MessageTracer.trace(message, MessageTracer.UNKNOWN_EXCHANGE);
                throw new BrokerException("Message publish failed. Unknown exchange: " + metadata.getExchangeName());
            }
            String routingKey = metadata.getRoutingKey();
            BindingSet bindingsForRoute = exchange.getBindingsForRoute(routingKey);
            if (bindingsForRoute.isEmpty()) {
                LOGGER.info("Dropping message since no queues found for routing key {} in {}", routingKey, exchange);
                MessageTracer.trace(message, MessageTracer.NO_ROUTES);
            } else {
                try {
                    this.messageStore.add(message.shallowCopy());
                    publishToQueues(message, getUniqueQueueHandlersForBinding(metadata, bindingsForRoute));
                    this.messageStore.flush(message.getInternalId());
                } catch (Throwable th) {
                    this.messageStore.flush(message.getInternalId());
                    throw th;
                }
            }
        } finally {
            this.lock.readLock().unlock();
            message.release();
        }
    }

    private Set<QueueHandler> getUniqueQueueHandlersForBinding(Metadata metadata, BindingSet bindingSet) {
        HashSet hashSet = new HashSet();
        Iterator<Binding> it = bindingSet.getUnfilteredBindings().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getQueue().getQueueHandler());
        }
        for (Binding binding : bindingSet.getFilteredBindings()) {
            if (binding.getFilterExpression().evaluate(metadata)) {
                hashSet.add(binding.getQueue().getQueueHandler());
            }
        }
        return hashSet;
    }

    private void publishToQueues(Message message, Set<QueueHandler> set) throws BrokerException {
        if (set.isEmpty()) {
            LOGGER.info("Dropping message since message didn't have any routes to {}", message.getMetadata().getRoutingKey());
            MessageTracer.trace(message, MessageTracer.NO_ROUTES);
        } else {
            Iterator<QueueHandler> it = set.iterator();
            while (it.hasNext()) {
                it.next().enqueue(message.shallowCopy());
            }
            this.metricManager.markPublish();
        }
    }

    public void acknowledge(String str, Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            this.queueRegistry.getQueueHandler(str).dequeue(message);
            this.metricManager.markAcknowledge();
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public Set<QueueHandler> prepareEnqueue(Xid xid, Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            Metadata metadata = message.getMetadata();
            Exchange exchange = this.exchangeRegistry.getExchange(metadata.getExchangeName());
            if (!Objects.nonNull(exchange)) {
                throw new BrokerException("Message published to unknown exchange " + metadata.getExchangeName());
            }
            Set<QueueHandler> uniqueQueueHandlersForBinding = getUniqueQueueHandlersForBinding(metadata, exchange.getBindingsForRoute(metadata.getRoutingKey()));
            if (uniqueQueueHandlersForBinding.isEmpty()) {
                return uniqueQueueHandlersForBinding;
            }
            this.messageStore.add(xid, message.shallowCopy());
            Iterator<QueueHandler> it = uniqueQueueHandlersForBinding.iterator();
            while (it.hasNext()) {
                it.next().prepareForEnqueue(xid, message.shallowCopy());
            }
            this.lock.readLock().unlock();
            message.release();
            return uniqueQueueHandlersForBinding;
        } finally {
            this.lock.readLock().unlock();
            message.release();
        }
    }

    public QueueHandler prepareDequeue(Xid xid, String str, Message message) throws BrokerException {
        this.lock.readLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            queueHandler.prepareForDetach(xid, message);
            this.lock.readLock().unlock();
            return queueHandler;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public void addConsumer(Consumer consumer) throws BrokerException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Consume request received for {}", consumer.getQueueName());
        }
        this.lock.readLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(consumer.getQueueName());
            if (queueHandler == null) {
                throw new BrokerException("Cannot add consumer. Queue [ " + consumer.getQueueName() + " ] not found. Create the queue before attempting to consume.");
            }
            synchronized (queueHandler) {
                if (queueHandler.addConsumer(consumer) && queueHandler.consumerCount() == 1) {
                    this.deliveryTaskService.add(new MessageDeliveryTask(queueHandler));
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void removeConsumer(Consumer consumer) {
        this.lock.readLock().lock();
        boolean z = false;
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(consumer.getQueueName());
            if (queueHandler != null) {
                synchronized (queueHandler) {
                    if (queueHandler.removeConsumer(consumer) && queueHandler.consumerCount() == 0) {
                        this.deliveryTaskService.remove(queueHandler.getQueue().getName());
                        if (queueHandler.getQueue().isAutoDelete()) {
                            z = true;
                        }
                    }
                }
            }
            if (z) {
                try {
                    deleteQueue(queueHandler.getQueue().getName(), true, false);
                } catch (ValidationException | ResourceNotFoundException | BrokerException e) {
                    LOGGER.warn("Exception while auto deleting the queue " + queueHandler.getQueue(), e);
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void declareExchange(String str, String str2, boolean z, boolean z2) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            this.exchangeRegistry.declareExchange(str, str2, z, z2);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void createExchange(String str, String str2, boolean z) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            this.exchangeRegistry.createExchange(str, Exchange.Type.from(str2), z);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public boolean deleteExchange(String str, boolean z) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            boolean deleteExchange = this.exchangeRegistry.deleteExchange(str, z);
            this.lock.writeLock().unlock();
            return deleteExchange;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public boolean createQueue(String str, boolean z, boolean z2, boolean z3) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            boolean addQueue = this.queueRegistry.addQueue(str, z, z2, z3);
            if (addQueue) {
                this.exchangeRegistry.getDefaultExchange().bind(this.queueRegistry.getQueueHandler(str), str, FieldTable.EMPTY_TABLE);
            }
            return addQueue;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public int deleteQueue(String str, boolean z, boolean z2) throws BrokerException, ValidationException, ResourceNotFoundException {
        this.lock.writeLock().lock();
        try {
            int removeQueue = this.queueRegistry.removeQueue(str, z, z2);
            this.lock.writeLock().unlock();
            return removeQueue;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public int purgeQueue(String str) throws ResourceNotFoundException, ValidationException {
        this.lock.writeLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (queueHandler == null) {
                throw new ResourceNotFoundException("Queue [ " + str + " ] Not found");
            }
            int purgeQueue = queueHandler.purgeQueue();
            this.lock.writeLock().unlock();
            return purgeQueue;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void bind(String str, String str2, String str3, FieldTable fieldTable) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            Exchange exchange = this.exchangeRegistry.getExchange(str2);
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (exchange == null) {
                throw new ValidationException("Unknown exchange name: " + str2);
            }
            if (queueHandler == null) {
                throw new ValidationException("Unknown queue name: " + str);
            }
            if (!str3.isEmpty()) {
                exchange.bind(queueHandler, str3, fieldTable);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void unbind(String str, String str2, String str3) throws BrokerException, ValidationException {
        this.lock.writeLock().lock();
        try {
            Exchange exchange = this.exchangeRegistry.getExchange(str2);
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (exchange == null) {
                throw new ValidationException("Unknown exchange name: " + str2);
            }
            if (queueHandler == null) {
                throw new ValidationException("Unknown queue name: " + str);
            }
            exchange.unbind(queueHandler.getQueue(), str3);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void startMessageDelivery() {
        this.brokerHelper.startMessageDelivery();
    }

    public void stopMessageDelivery() {
        LOGGER.info("Stopping message delivery threads.");
        this.deliveryTaskService.stop();
    }

    public void shutdown() {
        this.brokerHelper.shutdown();
    }

    public long getNextMessageId() {
        return this.messageIdGenerator.getNextId();
    }

    public void requeue(String str, Message message) throws BrokerException, ResourceNotFoundException {
        this.lock.readLock().lock();
        try {
            QueueHandler queueHandler = this.queueRegistry.getQueueHandler(str);
            if (Objects.isNull(queueHandler)) {
                throw new ResourceNotFoundException("Queue [ " + str + " ] Not found");
            }
            queueHandler.requeue(message);
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public Collection<QueueHandler> getAllQueues() {
        this.lock.readLock().lock();
        try {
            return this.queueRegistry.getAllQueues();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public QueueHandler getQueue(String str) {
        this.lock.readLock().lock();
        try {
            return this.queueRegistry.getQueueHandler(str);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void moveToDlc(String str, Message message) throws BrokerException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Moving message to DLC: {}", message);
        }
        try {
            Message shallowCopyWith = message.shallowCopyWith(getNextMessageId(), DEFAULT_DEAD_LETTER_QUEUE, ExchangeRegistry.DEFAULT_DEAD_LETTER_EXCHANGE);
            shallowCopyWith.getMetadata().addHeader(ORIGIN_QUEUE_HEADER, str);
            shallowCopyWith.getMetadata().addHeader(ORIGIN_EXCHANGE_HEADER, message.getMetadata().getExchangeName());
            shallowCopyWith.getMetadata().addHeader(ORIGIN_ROUTING_KEY_HEADER, message.getMetadata().getRoutingKey());
            publish(shallowCopyWith);
            acknowledge(str, message);
            message.release();
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    public Collection<Exchange> getAllExchanges() {
        this.lock.readLock().lock();
        try {
            return this.exchangeRegistry.getAllExchanges();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Map<String, BindingSet> getAllBindingsForExchange(String str) throws ValidationException {
        this.lock.readLock().lock();
        try {
            Exchange exchange = this.exchangeRegistry.getExchange(str);
            if (Objects.isNull(exchange)) {
                throw new ValidationException("Non existing exchange name " + str);
            }
            Map<String, BindingSet> allBindings = exchange.getBindingsRegistry().getAllBindings();
            this.lock.readLock().unlock();
            return allBindings;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public Exchange getExchange(String str) {
        this.lock.readLock().lock();
        try {
            return this.exchangeRegistry.getExchange(str);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public LocalTransaction newLocalTransaction() {
        return this.brokerTransactionFactory.createLocalTransaction();
    }
}
