package org.wso2.carbon.stream.processor.core.internal;

import java.io.File;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.analytics.permissions.PermissionManager;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.config.ConfigurationException;
import org.wso2.carbon.config.provider.ConfigProvider;
import org.wso2.carbon.databridge.commons.ServerEventListener;
import org.wso2.carbon.datasource.core.api.DataSourceService;
import org.wso2.carbon.kernel.CarbonRuntime;
import org.wso2.carbon.kernel.config.model.CarbonConfiguration;
import org.wso2.carbon.sp.metrics.core.SPMetricsFactory;
import org.wso2.carbon.sp.metrics.core.internal.SPMetricsDataHolder;
import org.wso2.carbon.stream.processor.common.EventStreamService;
import org.wso2.carbon.stream.processor.common.HAStateChangeListener;
import org.wso2.carbon.stream.processor.common.SiddhiAppRuntimeService;
import org.wso2.carbon.stream.processor.common.utils.config.FileConfigManager;
import org.wso2.carbon.stream.processor.core.DeploymentMode;
import org.wso2.carbon.stream.processor.core.NodeInfo;
import org.wso2.carbon.stream.processor.core.distribution.DistributionService;
import org.wso2.carbon.stream.processor.core.ha.HAManager;
import org.wso2.carbon.stream.processor.core.ha.exception.HAModeException;
import org.wso2.carbon.stream.processor.core.ha.util.CoordinationConstants;
import org.wso2.carbon.stream.processor.core.internal.beans.DeploymentConfig;
import org.wso2.carbon.stream.processor.core.internal.util.SiddhiAppProcessorConstants;
import org.wso2.carbon.stream.processor.core.persistence.PersistenceManager;
import org.wso2.carbon.stream.processor.core.persistence.beans.PersistenceConfigurations;
import org.wso2.carbon.stream.processor.core.persistence.exception.PersistenceStoreConfigurationException;
import org.wso2.carbon.stream.processor.core.persistence.util.PersistenceConstants;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.StatisticsConfiguration;
import org.wso2.siddhi.core.util.SiddhiComponentActivator;
import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;

@Component(name = "stream-processor-core-service", immediate = true)
/* loaded from: input_file:org/wso2/carbon/stream/processor/core/internal/ServiceComponent.class */
public class ServiceComponent {
    private static final Logger log = LoggerFactory.getLogger(ServiceComponent.class);
    private ServiceRegistration streamServiceRegistration;
    private ServiceRegistration siddhiAppRuntimeServiceRegistration;
    private ScheduledFuture<?> scheduledFuture = null;
    private ScheduledExecutorService scheduledExecutorService = null;
    private boolean clusterComponentActivated;
    private boolean serviceComponentActivated;

    @Activate
    protected void start(BundleContext bundleContext) throws Exception {
        log.debug("Service Component is activated");
        String property = System.getProperty(SiddhiAppProcessorConstants.SYSTEM_PROP_RUN_FILE);
        ConfigProvider configProvider = StreamProcessorDataHolder.getInstance().getConfigProvider();
        StreamProcessorDataHolder.setStreamProcessorService(new StreamProcessorService());
        SiddhiManager siddhiManager = new SiddhiManager();
        siddhiManager.setConfigManager(new FileConfigManager(configProvider));
        PersistenceConfigurations persistenceConfigurations = (PersistenceConfigurations) configProvider.getConfigurationObject(PersistenceConfigurations.class);
        if (persistenceConfigurations != null && persistenceConfigurations.isEnabled()) {
            String persistenceStore = persistenceConfigurations.getPersistenceStore();
            try {
                if (Class.forName(persistenceStore).newInstance() instanceof PersistenceStore) {
                    PersistenceStore persistenceStore2 = (PersistenceStore) Class.forName(persistenceStore).newInstance();
                    persistenceStore2.setProperties((Map) configProvider.getConfigurationObject(PersistenceConstants.STATE_PERSISTENCE_NS));
                    siddhiManager.setPersistenceStore(persistenceStore2);
                } else {
                    if (!(Class.forName(persistenceStore).newInstance() instanceof IncrementalPersistenceStore)) {
                        throw new PersistenceStoreConfigurationException("Persistence Store class with name " + persistenceStore + " is invalid. The given class has to implement either org.wso2.siddhi.core.util.persistence.PersistenceStore or org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore.");
                    }
                    IncrementalPersistenceStore incrementalPersistenceStore = (IncrementalPersistenceStore) Class.forName(persistenceStore).newInstance();
                    incrementalPersistenceStore.setProperties((Map) configProvider.getConfigurationObject(PersistenceConstants.STATE_PERSISTENCE_NS));
                    siddhiManager.setIncrementalPersistenceStore(incrementalPersistenceStore);
                }
                if (log.isDebugEnabled()) {
                    log.debug(persistenceStore + " chosen as persistence store");
                }
                int intervalInMin = persistenceConfigurations.getIntervalInMin();
                this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
                if (intervalInMin > 0) {
                    this.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(new PersistenceManager(), intervalInMin, intervalInMin, TimeUnit.MINUTES);
                }
                StreamProcessorDataHolder.setIsPersistenceEnabled(true);
                log.info("Periodic state persistence started with an interval of " + String.valueOf(intervalInMin) + " using " + persistenceStore);
            } catch (ClassNotFoundException e) {
                throw new PersistenceStoreConfigurationException("Persistence Store class with name " + persistenceStore + " is invalid. ", e);
            }
        } else if (log.isDebugEnabled()) {
            log.debug("Periodic persistence is disabled");
        }
        StatisticsConfiguration statisticsConfiguration = new StatisticsConfiguration(new SPMetricsFactory());
        siddhiManager.setStatisticsConfiguration(statisticsConfiguration);
        StreamProcessorDataHolder.setSiddhiManager(siddhiManager);
        StreamProcessorDataHolder.setStatisticsConfiguration(statisticsConfiguration);
        if (property != null) {
            StreamProcessorDataHolder.getInstance().setRuntimeMode(SiddhiAppProcessorConstants.RuntimeMode.RUN_FILE);
            if (property.trim().equals("")) {
                log.error("Error: Can't get target file to run. System property {} is not set.", SiddhiAppProcessorConstants.SYSTEM_PROP_RUN_FILE);
                StreamProcessorDataHolder.getInstance().setRuntimeMode(SiddhiAppProcessorConstants.RuntimeMode.ERROR);
                return;
            }
            File file = new File(property);
            if (!file.exists()) {
                log.error("Error: File " + file.getName() + " not found in the given location.");
                StreamProcessorDataHolder.getInstance().setRuntimeMode(SiddhiAppProcessorConstants.RuntimeMode.ERROR);
                return;
            } else {
                try {
                    StreamProcessorDeployer.deploySiddhiQLFile(file);
                } catch (Exception e2) {
                    StreamProcessorDataHolder.getInstance().setRuntimeMode(SiddhiAppProcessorConstants.RuntimeMode.ERROR);
                    log.error(e2.getMessage(), e2);
                    return;
                }
            }
        } else {
            StreamProcessorDataHolder.getInstance().setRuntimeMode(SiddhiAppProcessorConstants.RuntimeMode.SERVER);
        }
        if (log.isDebugEnabled()) {
            log.debug("Runtime mode is set to : " + StreamProcessorDataHolder.getInstance().getRuntimeMode());
        }
        this.streamServiceRegistration = bundleContext.registerService(EventStreamService.class.getName(), new CarbonEventStreamService(), (Dictionary) null);
        this.siddhiAppRuntimeServiceRegistration = bundleContext.registerService(SiddhiAppRuntimeService.class.getCanonicalName(), new CarbonSiddhiAppRuntimeService(), (Dictionary) null);
        NodeInfo nodeInfo = new NodeInfo(DeploymentMode.SINGLE_NODE, ((CarbonConfiguration) configProvider.getConfigurationObject(CarbonConfiguration.class)).getId());
        bundleContext.registerService(NodeInfo.class.getName(), nodeInfo, (Dictionary) null);
        StreamProcessorDataHolder.setNodeInfo(nodeInfo);
        StreamProcessorDataHolder.getInstance().setBundleContext(bundleContext);
        this.serviceComponentActivated = true;
        if (this.clusterComponentActivated) {
            setUpClustering(StreamProcessorDataHolder.getClusterCoordinator());
        }
    }

    @Deactivate
    protected void stop() throws Exception {
        log.debug("Service Component is deactivated");
        for (SiddhiAppData siddhiAppData : StreamProcessorDataHolder.getStreamProcessorService().getSiddhiAppMap().values()) {
            if (siddhiAppData.getSiddhiAppRuntime() != null) {
                siddhiAppData.getSiddhiAppRuntime().shutdown();
            }
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
        }
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
        this.streamServiceRegistration.unregister();
        this.siddhiAppRuntimeServiceRegistration.unregister();
    }

    @Reference(name = "carbon.runtime.service", service = CarbonRuntime.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetCarbonRuntime")
    protected void setCarbonRuntime(CarbonRuntime carbonRuntime) {
        StreamProcessorDataHolder.getInstance().setCarbonRuntime(carbonRuntime);
    }

    protected void unsetCarbonRuntime(CarbonRuntime carbonRuntime) {
        StreamProcessorDataHolder.getInstance().setCarbonRuntime(null);
    }

    @Reference(name = "siddhi.component.activator.service", service = SiddhiComponentActivator.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetSiddhiComponentActivator")
    protected void setSiddhiComponentActivator(SiddhiComponentActivator siddhiComponentActivator) {
    }

    protected void unsetSiddhiComponentActivator(SiddhiComponentActivator siddhiComponentActivator) {
    }

    @Reference(name = "carbon.config.provider", service = ConfigProvider.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterConfigProvider")
    protected void registerConfigProvider(ConfigProvider configProvider) {
        StreamProcessorDataHolder.getInstance().setConfigProvider(configProvider);
    }

    protected void unregisterConfigProvider(ConfigProvider configProvider) {
        StreamProcessorDataHolder.getInstance().setConfigProvider(null);
    }

    @Reference(name = "org.wso2.carbon.datasource.DataSourceService", service = DataSourceService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterDataSourceListener")
    protected void registerDataSourceListener(DataSourceService dataSourceService) {
        StreamProcessorDataHolder.setDataSourceService(dataSourceService);
    }

    protected void unregisterDataSourceListener(DataSourceService dataSourceService) {
        StreamProcessorDataHolder.setDataSourceService(null);
    }

    @Reference(name = "org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator", service = ClusterCoordinator.class, cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterClusterCoordinator")
    protected void registerClusterCoordinator(ClusterCoordinator clusterCoordinator) throws ConfigurationException {
        if (clusterCoordinator != null) {
            this.clusterComponentActivated = true;
            StreamProcessorDataHolder.setClusterCoordinator(clusterCoordinator);
            if (this.serviceComponentActivated) {
                setUpClustering(clusterCoordinator);
            }
        }
    }

    protected void unregisterClusterCoordinator(ClusterCoordinator clusterCoordinator) {
        StreamProcessorDataHolder.setClusterCoordinator(null);
    }

    private void setUpClustering(ClusterCoordinator clusterCoordinator) throws ConfigurationException {
        ConfigProvider configProvider = StreamProcessorDataHolder.getInstance().getConfigProvider();
        if (configProvider.getConfigurationObject(CoordinationConstants.DEPLOYMENT_CONFIG_NS) == null || !CoordinationConstants.MODE_HA.equalsIgnoreCase((String) ((Map) configProvider.getConfigurationObject(CoordinationConstants.DEPLOYMENT_CONFIG_NS)).get(SiddhiAppProcessorConstants.WSO2_SERVER_TYPE))) {
            return;
        }
        StreamProcessorDataHolder.setDeploymentConfig((DeploymentConfig) configProvider.getConfigurationObject(DeploymentConfig.class));
        if (clusterCoordinator.getAllNodeDetails().size() > 2) {
            throw new HAModeException("More than two nodes can not be used in the minimum HA mode. Use another clustering mode, change the groupId or disable clustering.");
        }
        log.info("WSO2 Stream Processor Starting in Two Node Minimum HA Deployment");
        StreamProcessorDataHolder.setIsStatisticsEnabled(SPMetricsDataHolder.getInstance().getMetricManagementService().isEnabled());
        StreamProcessorDataHolder.setStatisticsManager(StreamProcessorDataHolder.getStatisticsConfiguration().getFactory().createStatisticsManager((String) null, "HA." + StreamProcessorDataHolder.getNodeInfo().getNodeId(), (List) null));
        HAManager hAManager = new HAManager(clusterCoordinator, ((CarbonConfiguration) configProvider.getConfigurationObject(CarbonConfiguration.class)).getId(), (String) ((Map) configProvider.getConfigurationObject(CoordinationConstants.CLUSTER_CONFIG_NS)).get(CoordinationConstants.GROUP_ID), StreamProcessorDataHolder.getDeploymentConfig());
        StreamProcessorDataHolder.setHaManager(hAManager);
        hAManager.start();
    }

    @Reference(name = "org.wso2.carbon.stream.processor.core.distribution.DistributionService", service = DistributionService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterDistributionService")
    protected void registerDistributionService(DistributionService distributionService) {
        StreamProcessorDataHolder.setDistributionService(distributionService);
    }

    protected void unregisterDistributionService(DistributionService distributionService) {
        StreamProcessorDataHolder.setDistributionService(null);
    }

    @Reference(name = "permission-manager", service = PermissionManager.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetPermissionManager")
    protected void setPermissionManager(PermissionManager permissionManager) {
        StreamProcessorDataHolder.setPermissionProvider(permissionManager.getProvider());
    }

    protected void unsetPermissionManager(PermissionManager permissionManager) {
        StreamProcessorDataHolder.setPermissionProvider(null);
    }

    @Reference(name = "org.wso2.carbon.databridge.commons.ServerEventListener", service = ServerEventListener.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterServerListener")
    protected void registerServerListener(ServerEventListener serverEventListener) {
        StreamProcessorDataHolder.setServerListener(serverEventListener);
        if (StreamProcessorDataHolder.getHAManager() == null) {
            serverEventListener.start();
        } else if (StreamProcessorDataHolder.getHAManager().isActiveNode()) {
            serverEventListener.start();
        }
    }

    protected void unregisterServerListener(ServerEventListener serverEventListener) {
        StreamProcessorDataHolder.removeServerListener(serverEventListener);
    }

    @Reference(name = "org.wso2.carbon.stream.processor.common.HAStateChangeListener", service = HAStateChangeListener.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unregisterHAStateChangeListener")
    protected void registerHAStateChangeListener(HAStateChangeListener hAStateChangeListener) {
        StreamProcessorDataHolder.setHAStateChangeListener(hAStateChangeListener);
    }

    protected void unregisterHAStateChangeListener(HAStateChangeListener hAStateChangeListener) {
        StreamProcessorDataHolder.removeHAStateChangeListener(hAStateChangeListener);
    }
}
