package org.wso2.carbon.andes.internal;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Dictionary;
import java.util.Iterator;
import java.util.Stack;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.wso2.andes.configuration.AndesConfigurationManager;
import org.wso2.andes.configuration.enums.AndesConfiguration;
import org.wso2.andes.kernel.AndesContext;
import org.wso2.andes.kernel.AndesException;
import org.wso2.andes.kernel.AndesKernelBoot;
import org.wso2.andes.server.Main;
import org.wso2.andes.server.registry.ApplicationRegistry;
import org.wso2.andes.wso2.service.QpidNotificationService;
import org.wso2.carbon.andes.authentication.service.AuthenticationService;
import org.wso2.carbon.andes.event.core.EventBundleNotificationService;
import org.wso2.carbon.andes.event.core.qpid.QpidServerDetails;
import org.wso2.carbon.andes.listeners.BrokerLifecycleListener;
import org.wso2.carbon.andes.listeners.MessageBrokerTenantManagementListener;
import org.wso2.carbon.andes.service.QpidService;
import org.wso2.carbon.andes.service.QpidServiceImpl;
import org.wso2.carbon.andes.service.exception.ConfigurationException;
import org.wso2.carbon.andes.utils.MessageBrokerDBUtil;
import org.wso2.carbon.base.ServerConfiguration;
import org.wso2.carbon.base.api.ServerConfigurationService;
import org.wso2.carbon.core.ServerRestartHandler;
import org.wso2.carbon.core.ServerShutdownHandler;
import org.wso2.carbon.server.admin.common.IServerAdmin;
import org.wso2.carbon.stratos.common.listeners.TenantMgtListener;
import org.wso2.carbon.utils.ConfigurationContextService;

@Component(name = "org.wso2.carbon.andes.internal.QpidServiceComponent", immediate = true)
/* loaded from: input_file:org/wso2/carbon/andes/internal/QpidServiceComponent.class */
public class QpidServiceComponent {
    private static final String CARBON_CONFIG_PORT_OFFSET = "Ports.Offset";
    private static final int CARBON_DEFAULT_PORT_OFFSET = 0;
    protected static final String MODE_STANDALONE = "standalone";
    protected static final String MODE_DEFAULT = "default";
    private static BundleContext bundleContext;
    private QpidServiceImpl qpidServiceImpl;
    private static final Log log = LogFactory.getLog(QpidServiceComponent.class);
    private static Stack<ServiceRegistration> registrations = new Stack<>();

    /* loaded from: input_file:org/wso2/carbon/andes/internal/QpidServiceComponent$MBShutdownHandler.class */
    private static class MBShutdownHandler implements ServerShutdownHandler, ServerRestartHandler {
        private MBShutdownHandler() {
        }

        public void invoke() {
            try {
                Iterator<BrokerLifecycleListener> it = QpidServiceDataHolder.getInstance().getBrokerLifecycleListeners().iterator();
                while (it.hasNext()) {
                    it.next().onShuttingdown();
                }
                AndesKernelBoot.shutDownAndesKernel();
                Iterator<BrokerLifecycleListener> it2 = QpidServiceDataHolder.getInstance().getBrokerLifecycleListeners().iterator();
                while (it2.hasNext()) {
                    it2.next().onShutdown();
                }
            } catch (AndesException e) {
                QpidServiceComponent.log.error("Error while shutting down Andes kernel. ", e);
            }
        }
    }

    @Activate
    protected void activate(ComponentContext componentContext) throws AndesException {
        try {
            AndesConfigurationManager.initialize(readPortOffset());
            this.qpidServiceImpl = new QpidServiceImpl(QpidServiceDataHolder.getInstance().getAccessKey());
            this.qpidServiceImpl.loadConfigurations();
            bundleContext = componentContext.getBundleContext();
            registrations.push(bundleContext.registerService(TenantMgtListener.class.getName(), new MessageBrokerTenantManagementListener(), (Dictionary) null));
            AndesContext.getInstance().constructStoreConfiguration();
            String str = (String) AndesConfigurationManager.readValue(AndesConfiguration.DEPLOYMENT_MODE);
            if (str.equalsIgnoreCase(MODE_STANDALONE)) {
                AndesContext.getInstance().setClusteringEnabled(false);
                startAndesBroker();
            } else {
                if (!str.equalsIgnoreCase(MODE_DEFAULT)) {
                    throw new ConfigurationException("Invalid value " + str + " for deployment/mode in broker.xml");
                }
                if (!AndesContext.getInstance().isClusteringEnabled()) {
                    startAndesBroker();
                }
            }
            MBShutdownHandler mBShutdownHandler = new MBShutdownHandler();
            registrations.push(bundleContext.registerService(ServerShutdownHandler.class.getName(), mBShutdownHandler, (Dictionary) null));
            registrations.push(bundleContext.registerService(ServerRestartHandler.class.getName(), mBShutdownHandler, (Dictionary) null));
        } catch (ConfigurationException e) {
            log.error("Invalid configuration found in a configuration file", e);
            shutdown();
        }
    }

    @Deactivate
    protected void deactivate(ComponentContext componentContext) {
        while (!registrations.empty()) {
            registrations.pop().unregister();
        }
        bundleContext = null;
    }

    @Reference(name = "org.wso2.carbon.andes.authentication.service.AuthenticationService", service = AuthenticationService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetAccessKey")
    protected void setAccessKey(AuthenticationService authenticationService) {
        QpidServiceDataHolder.getInstance().setAccessKey(authenticationService.getAccessKey());
    }

    protected void unsetAccessKey(AuthenticationService authenticationService) {
        QpidServiceDataHolder.getInstance().setAccessKey(null);
    }

    @Reference(name = "org.wso2.andes.wso2.service.QpidNotificationService", service = QpidNotificationService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetQpidNotificationService")
    protected void setQpidNotificationService(QpidNotificationService qpidNotificationService) {
    }

    protected void unsetQpidNotificationService(QpidNotificationService qpidNotificationService) {
    }

    @Reference(name = "server.configuration", service = ServerConfigurationService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetServerConfiguration")
    protected void setServerConfiguration(ServerConfigurationService serverConfigurationService) {
        QpidServiceDataHolder.getInstance().setCarbonConfiguration(serverConfigurationService);
    }

    protected void unsetServerConfiguration(ServerConfigurationService serverConfigurationService) {
        QpidServiceDataHolder.getInstance().setCarbonConfiguration(null);
    }

    @Reference(name = "event.broker", service = EventBundleNotificationService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetEventBundleNotificationService")
    protected void setEventBundleNotificationService(EventBundleNotificationService eventBundleNotificationService) {
        QpidServiceDataHolder.getInstance().registerEventBundleNotificationService(eventBundleNotificationService);
    }

    protected void unsetEventBundleNotificationService(EventBundleNotificationService eventBundleNotificationService) {
    }

    @Reference(name = "config.context.service", service = ConfigurationContextService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetConfigurationContextService")
    protected void setConfigurationContextService(ConfigurationContextService configurationContextService) {
        AndesContext.getInstance().setClusteringEnabled(configurationContextService.getServerConfigContext().getAxisConfiguration().getClusteringAgent() != null);
    }

    protected void unsetConfigurationContextService(ConfigurationContextService configurationContextService) {
    }

    @Reference(name = "org.wso2.carbon.server.admin.common.IServerAdmin", service = IServerAdmin.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetIServerAdmin")
    protected void setIServerAdmin(IServerAdmin iServerAdmin) {
        QpidServiceDataHolder.getInstance().setService(iServerAdmin);
    }

    protected void unsetIServerAdmin(IServerAdmin iServerAdmin) {
        QpidServiceDataHolder.getInstance().setService(null);
    }

    private void shutdown() throws AndesException {
        try {
            QpidServiceDataHolder.getInstance().getService().shutdownGracefully();
        } catch (Exception e) {
            log.error("Error occurred while shutting down", e);
            throw new AndesException("Error occurred while shutting down", e);
        }
    }

    private boolean isBrokerRunning() {
        boolean z = CARBON_DEFAULT_PORT_OFFSET;
        try {
            if (ManagementFactory.getPlatformMBeanServer().queryNames(new ObjectName("org.wso2.andes:type=VirtualHost.VirtualHostManager,*"), (QueryExp) null).size() > 0) {
                z = true;
            }
        } catch (MalformedObjectNameException e) {
            log.error("Error checking if broker is running.", e);
        }
        return z;
    }

    private int readPortOffset() {
        String property = System.getProperty("portOffset", ServerConfiguration.getInstance().getFirstProperty(CARBON_CONFIG_PORT_OFFSET));
        if (property == null) {
            return CARBON_DEFAULT_PORT_OFFSET;
        }
        try {
            return Integer.parseInt(property.trim());
        } catch (NumberFormatException e) {
            return CARBON_DEFAULT_PORT_OFFSET;
        }
    }

    private String getTransportBindAddress() {
        return (String) AndesConfigurationManager.readValue(AndesConfiguration.TRANSPORTS_BIND_ADDRESS);
    }

    private String getMQTTTransportBindAddress() {
        return (String) AndesConfigurationManager.readValue(AndesConfiguration.TRANSPORTS_MQTT_BIND_ADDRESS);
    }

    private String getAMQPTransportBindAddress() {
        return (String) AndesConfigurationManager.readValue(AndesConfiguration.TRANSPORTS_AMQP_BIND_ADDRESS);
    }

    private void startAndesBroker() throws ConfigurationException, AndesException {
        if (System.getProperty("setup") != null) {
            new MessageBrokerDBUtil().initialize();
        }
        log.info("Activating Andes Message Broker Engine...");
        System.setProperty("ANDES_HOME", this.qpidServiceImpl.getQpidHome());
        Main.main(new String[]{"-p" + this.qpidServiceImpl.getAMQPPort(), "-s" + this.qpidServiceImpl.getAMQPSSLPort(), "-q" + this.qpidServiceImpl.getMqttPort()});
        Runtime.getRuntime().removeShutdownHook(ApplicationRegistry.getShutdownHook());
        while (!isBrokerRunning()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
        }
        startAMQPServer();
        startMQTTServer();
        log.info("WSO2 Message Broker is started.");
        registrations.push(bundleContext.registerService(QpidService.class.getName(), this.qpidServiceImpl, (Dictionary) null));
        QpidServiceDataHolder.getInstance().getEventBundleNotificationService().notifyStart(new QpidServerDetails(this.qpidServiceImpl.getAccessKey(), this.qpidServiceImpl.getClientID(), this.qpidServiceImpl.getVirtualHostName(), this.qpidServiceImpl.getHostname(), (this.qpidServiceImpl.getIfSSLOnly() ? this.qpidServiceImpl.getAMQPSSLPort() : this.qpidServiceImpl.getAMQPPort()).toString(), this.qpidServiceImpl.getIfSSLOnly()));
    }

    private void startAMQPServer() throws ConfigurationException {
        boolean z = CARBON_DEFAULT_PORT_OFFSET;
        int intValue = this.qpidServiceImpl.getIfSSLOnly() ? this.qpidServiceImpl.getAMQPSSLPort().intValue() : this.qpidServiceImpl.getAMQPPort().intValue();
        if (!((Boolean) AndesConfigurationManager.readValue(AndesConfiguration.TRANSPORTS_AMQP_ENABLED)).booleanValue()) {
            log.warn("AMQP Transport is disabled as per configuration.");
            return;
        }
        while (!z) {
            Socket socket = CARBON_DEFAULT_PORT_OFFSET;
            try {
                try {
                    InetAddress byName = InetAddress.getByName(getAMQPTransportBindAddress());
                    socket = new Socket(byName, intValue);
                    log.info("AMQP Host Address : " + byName.getHostAddress() + " Port : " + intValue);
                    z = socket.isConnected();
                    if (z) {
                        log.info("Successfully connected to AMQP server on port " + intValue);
                    }
                    if (socket != null) {
                        try {
                            if (socket.isConnected()) {
                                socket.close();
                            }
                        } catch (IOException e) {
                            log.error("Can not close the socket which is used to check the server status ", e);
                        }
                    }
                } catch (IOException e2) {
                    log.error("Wait until Qpid server starts on port " + intValue, e2);
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e3) {
                    }
                    if (socket != null) {
                        try {
                            if (socket.isConnected()) {
                                socket.close();
                            }
                        } catch (IOException e4) {
                            log.error("Can not close the socket which is used to check the server status ", e4);
                        }
                    }
                }
            } catch (Throwable th) {
                if (socket != null) {
                    try {
                        if (socket.isConnected()) {
                            socket.close();
                        }
                    } catch (IOException e5) {
                        log.error("Can not close the socket which is used to check the server status ", e5);
                        throw th;
                    }
                }
                throw th;
            }
        }
    }

    private void startMQTTServer() throws ConfigurationException {
        boolean z = CARBON_DEFAULT_PORT_OFFSET;
        int intValue = this.qpidServiceImpl.getMQTTSSLOnly() ? this.qpidServiceImpl.getMqttSSLPort().intValue() : this.qpidServiceImpl.getMqttPort().intValue();
        if (!((Boolean) AndesConfigurationManager.readValue(AndesConfiguration.TRANSPORTS_MQTT_ENABLED)).booleanValue()) {
            if (log.isDebugEnabled()) {
                log.debug("MQTT Transport is disabled as per configuration.");
                return;
            }
            return;
        }
        while (!z) {
            Socket socket = CARBON_DEFAULT_PORT_OFFSET;
            try {
                try {
                    InetAddress byName = InetAddress.getByName(getMQTTTransportBindAddress());
                    socket = new Socket(byName, intValue);
                    log.info("MQTT Host Address : " + byName.getHostAddress() + " Port : " + intValue);
                    z = socket.isConnected();
                    if (z) {
                        log.info("Successfully connected to MQTT server on port " + intValue);
                    }
                    if (socket != null) {
                        try {
                            if (socket.isConnected()) {
                                socket.close();
                            }
                        } catch (IOException e) {
                            log.error("Can not close the socket which is used to check the server status ", e);
                        }
                    }
                } catch (IOException e2) {
                    log.error("Wait until server starts on port " + intValue, e2);
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e3) {
                    }
                    if (socket != null) {
                        try {
                            if (socket.isConnected()) {
                                socket.close();
                            }
                        } catch (IOException e4) {
                            log.error("Can not close the socket which is used to check the server status ", e4);
                        }
                    }
                }
            } catch (Throwable th) {
                if (socket != null) {
                    try {
                        if (socket.isConnected()) {
                            socket.close();
                        }
                    } catch (IOException e5) {
                        log.error("Can not close the socket which is used to check the server status ", e5);
                        throw th;
                    }
                }
                throw th;
            }
        }
    }
}
