package io.ballerina.messaging.broker.core;

import io.ballerina.messaging.broker.common.BrokerConfigProvider;
import io.ballerina.messaging.broker.common.ResourceNotFoundException;
import io.ballerina.messaging.broker.common.StartupContext;
import io.ballerina.messaging.broker.common.ValidationException;
import io.ballerina.messaging.broker.common.data.types.FieldTable;
import io.ballerina.messaging.broker.coordination.BasicHaListener;
import io.ballerina.messaging.broker.coordination.HaListener;
import io.ballerina.messaging.broker.coordination.HaStrategy;
import io.ballerina.messaging.broker.core.configuration.BrokerConfiguration;
import io.ballerina.messaging.broker.core.metrics.BrokerMetricManager;
import io.ballerina.messaging.broker.core.metrics.DefaultBrokerMetricManager;
import io.ballerina.messaging.broker.core.metrics.NullBrokerMetricManager;
import io.ballerina.messaging.broker.core.rest.api.ExchangesApi;
import io.ballerina.messaging.broker.core.rest.api.QueuesApi;
import io.ballerina.messaging.broker.core.store.StoreFactory;
import io.ballerina.messaging.broker.rest.BrokerServiceRunner;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.metrics.core.MetricService;

/* loaded from: input_file:io/ballerina/messaging/broker/core/Broker.class */
public final class Broker {
    private static final Logger LOGGER = LoggerFactory.getLogger(Broker.class);
    private final MessagingEngine messagingEngine;
    private final BrokerMetricManager metricManager;
    private HaStrategy haStrategy;
    private BrokerHelper brokerHelper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/core/Broker$BrokerHelper.class */
    public class BrokerHelper {
        private BrokerHelper() {
        }

        public void startMessageDelivery() {
            Broker.LOGGER.info("Starting message delivery threads.");
            Broker.this.messagingEngine.startMessageDelivery();
        }

        public void shutdown() {
            Broker.this.stopMessageDelivery();
        }
    }

    /* loaded from: input_file:io/ballerina/messaging/broker/core/Broker$HaEnabledBrokerHelper.class */
    private class HaEnabledBrokerHelper extends BrokerHelper implements HaListener {
        private BasicHaListener basicHaListener;

        HaEnabledBrokerHelper() {
            super();
            this.basicHaListener = new BasicHaListener(this);
            Broker.this.haStrategy.registerListener(this.basicHaListener, 1);
        }

        @Override // io.ballerina.messaging.broker.core.Broker.BrokerHelper
        public synchronized void startMessageDelivery() {
            this.basicHaListener.setStartCalled();
            if (this.basicHaListener.isActive()) {
                super.startMessageDelivery();
            }
        }

        @Override // io.ballerina.messaging.broker.core.Broker.BrokerHelper
        public void shutdown() {
            Broker.this.haStrategy.unregisterListener(this.basicHaListener);
            super.shutdown();
        }

        public void activate() {
            try {
                Broker.this.messagingEngine.reloadOnBecomingActive();
            } catch (BrokerException e) {
                Broker.LOGGER.error("Error on loading data from the database on becoming active ", e);
            }
            startMessageDeliveryOnBecomingActive();
            Broker.LOGGER.info("Broker mode changed from PASSIVE to ACTIVE");
        }

        public void deactivate() {
            Broker.this.stopMessageDelivery();
            Broker.LOGGER.info("Broker mode changed from ACTIVE to PASSIVE");
        }

        private synchronized void startMessageDeliveryOnBecomingActive() {
            if (this.basicHaListener.isStartCalled()) {
                startMessageDelivery();
            }
        }
    }

    public Broker(StartupContext startupContext) throws Exception {
        MetricService metricService = (MetricService) startupContext.getService(MetricService.class);
        if (Objects.nonNull(metricService)) {
            this.metricManager = new DefaultBrokerMetricManager(metricService);
        } else {
            this.metricManager = new NullBrokerMetricManager();
        }
        this.messagingEngine = new MessagingEngine(new StoreFactory((DataSource) startupContext.getService(DataSource.class), this.metricManager, (BrokerConfiguration) ((BrokerConfigProvider) startupContext.getService(BrokerConfigProvider.class)).getConfigurationObject(BrokerConfiguration.NAMESPACE, BrokerConfiguration.class)), this.metricManager);
        ((BrokerServiceRunner) startupContext.getService(BrokerServiceRunner.class)).deploy(new Object[]{new QueuesApi(this), new ExchangesApi(this)});
        startupContext.registerService(Broker.class, this);
        this.haStrategy = (HaStrategy) startupContext.getService(HaStrategy.class);
        if (this.haStrategy == null) {
            this.brokerHelper = new BrokerHelper();
        } else {
            LOGGER.info("Broker is in PASSIVE mode");
            this.brokerHelper = new HaEnabledBrokerHelper();
        }
    }

    public void publish(Message message) throws BrokerException {
        this.messagingEngine.publish(message);
        this.metricManager.markPublish();
    }

    public void acknowledge(String str, Message message) throws BrokerException {
        this.messagingEngine.acknowledge(str, message);
        this.metricManager.markAcknowledge();
    }

    public void addConsumer(Consumer consumer) throws BrokerException {
        this.messagingEngine.consume(consumer);
    }

    public void removeConsumer(Consumer consumer) {
        this.messagingEngine.closeConsumer(consumer);
    }

    public void declareExchange(String str, String str2, boolean z, boolean z2) throws BrokerException, ValidationException {
        this.messagingEngine.declareExchange(str, str2, z, z2);
    }

    public void createExchange(String str, String str2, boolean z) throws BrokerException, ValidationException {
        this.messagingEngine.createExchange(str, str2, z);
    }

    public boolean deleteExchange(String str, boolean z) throws BrokerException, ValidationException {
        return this.messagingEngine.deleteExchange(str, z);
    }

    public boolean createQueue(String str, boolean z, boolean z2, boolean z3) throws BrokerException, ValidationException {
        return this.messagingEngine.createQueue(str, z, z2, z3);
    }

    public int deleteQueue(String str, boolean z, boolean z2) throws BrokerException, ValidationException, ResourceNotFoundException {
        return this.messagingEngine.deleteQueue(str, z, z2);
    }

    public void bind(String str, String str2, String str3, FieldTable fieldTable) throws BrokerException, ValidationException {
        this.messagingEngine.bind(str, str2, str3, fieldTable);
    }

    public void unbind(String str, String str2, String str3) throws BrokerException, ValidationException {
        this.messagingEngine.unbind(str, str2, str3);
    }

    public void startMessageDelivery() {
        this.brokerHelper.startMessageDelivery();
    }

    public void stopMessageDelivery() {
        LOGGER.info("Stopping message delivery threads.");
        this.messagingEngine.stopMessageDelivery();
    }

    public void shutdown() {
        this.brokerHelper.shutdown();
    }

    public long getNextMessageId() {
        return this.messagingEngine.getNextMessageId();
    }

    public void requeue(String str, Message message) throws BrokerException {
        this.messagingEngine.requeue(str, message);
    }

    public Collection<QueueHandler> getAllQueues() {
        return this.messagingEngine.getAllQueues();
    }

    public QueueHandler getQueue(String str) {
        return this.messagingEngine.getQueue(str);
    }

    public void moveToDlc(String str, Message message) throws BrokerException {
        this.messagingEngine.moveToDlc(str, message);
    }

    public Collection<Exchange> getAllExchanges() {
        return this.messagingEngine.getAllExchanges();
    }

    public Map<String, BindingSet> getAllBindingsForExchange(String str) throws ValidationException {
        return this.messagingEngine.getAllBindingsForExchange(str);
    }

    public Exchange getExchange(String str) {
        return this.messagingEngine.getExchange(str);
    }
}
