package org.wso2.carbon.event.processor.core.internal;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import javax.xml.stream.XMLStreamException;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.event.processor.common.storm.config.StormDeploymentConfig;
import org.wso2.carbon.event.processor.core.EventProcessorService;
import org.wso2.carbon.event.processor.core.ExecutionPlan;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfigurationFile;
import org.wso2.carbon.event.processor.core.StreamConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanDependencyValidationException;
import org.wso2.carbon.event.processor.core.exception.ServiceDependencyValidationException;
import org.wso2.carbon.event.processor.core.exception.StormDeploymentException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.ha.CEPMembership;
import org.wso2.carbon.event.processor.core.internal.ha.HAManager;
import org.wso2.carbon.event.processor.core.internal.ha.SiddhiHAInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.ha.SiddhiHAOutputStreamListener;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener;
import org.wso2.carbon.event.processor.core.internal.storm.SiddhiStormInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.storm.SiddhiStormOutputEventListener;
import org.wso2.carbon.event.processor.core.internal.storm.TopologyManager;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
import org.wso2.carbon.event.processor.core.internal.util.helper.EventProcessorConfigurationHelper;
import org.wso2.carbon.event.processor.core.internal.util.helper.SiddhiExtensionLoader;
import org.wso2.carbon.event.stream.manager.core.EventProducer;
import org.wso2.carbon.event.stream.manager.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.manager.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.ndatasource.common.DataSourceException;
import org.wso2.carbon.ndatasource.core.CarbonDataSource;
import org.wso2.carbon.ndatasource.core.DataSourceManager;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.SiddhiConfiguration;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/CarbonEventProcessorService.class */
public class CarbonEventProcessorService implements EventProcessorService {
    private static final Log log = LogFactory.getLog(CarbonEventProcessorService.class);
    private Map<Integer, Map<String, ExecutionPlan>> tenantSpecificExecutionPlans = new ConcurrentHashMap();
    private Map<Integer, List<ExecutionPlanConfigurationFile>> tenantSpecificExecutionPlanFiles = new ConcurrentHashMap();
    private CEPMembership currentCepMembershipInfo;

    private static void populateAttributes(StreamDefinition streamDefinition, List<Attribute> list, String str) {
        if (list != null) {
            Iterator<Attribute> it = list.iterator();
            while (it.hasNext()) {
                org.wso2.siddhi.query.api.definition.Attribute convertToSiddhiAttribute = EventProcessorUtil.convertToSiddhiAttribute(it.next(), str);
                streamDefinition.attribute(convertToSiddhiAttribute.getName(), convertToSiddhiAttribute.getType());
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void deployExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, AxisConfiguration axisConfiguration) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException {
        deployExecutionPlan(EventProcessorConfigurationHelper.toOM(executionPlanConfiguration), axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void deployExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException {
        try {
            deployExecutionPlan(AXIOMUtil.stringToOM(str), axisConfiguration);
        } catch (XMLStreamException e) {
            throw new ExecutionPlanConfigurationException("Cannot parse execution plan configuration XML:" + e.getMessage(), e);
        }
    }

    private void deployExecutionPlan(OMElement oMElement, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(oMElement, tenantId);
        String executionPlanName = EventProcessorConfigurationHelper.getExecutionPlanName(oMElement);
        if (!checkExecutionPlanValidity(executionPlanName, tenantId)) {
            throw new ExecutionPlanConfigurationException(executionPlanName + " already registered as an execution in this tenant");
        }
        String path = axisConfiguration.getRepository().getPath();
        File file = new File(path);
        if (!file.exists()) {
            synchronized (path.intern()) {
                if (!file.mkdir()) {
                    throw new ExecutionPlanConfigurationException("Cannot create directory to add tenant specific execution plan : " + executionPlanName);
                }
            }
        }
        String str = file.getAbsolutePath() + File.separator + EventProcessorConstants.EP_ELE_DIRECTORY;
        File file2 = new File(str);
        if (!file2.exists()) {
            synchronized (str.intern()) {
                if (!file2.mkdir()) {
                    throw new ExecutionPlanConfigurationException("Cannot create directory executionplans to add tenant specific  execution plan :" + executionPlanName);
                }
            }
        }
        validateToRemoveInactiveExecutionPlanConfiguration(executionPlanName, axisConfiguration);
        EventProcessorConfigurationFilesystemInvoker.save(oMElement, executionPlanName, executionPlanName + ".xml", axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployInactiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(str, axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployActiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(getExecutionPlanConfigurationFileByPlanName(str, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()).getFileName(), axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editActiveExecutionPlanConfiguration(String str, String str2, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        try {
            OMElement stringToOM = AXIOMUtil.stringToOM(str);
            EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(stringToOM, tenantId);
            ExecutionPlanConfiguration fromOM = EventProcessorConfigurationHelper.fromOM(stringToOM);
            if (!fromOM.getName().equals(str2) && !checkExecutionPlanValidity(fromOM.getName(), tenantId)) {
                throw new ExecutionPlanConfigurationException(fromOM.getName() + " already registered as an execution in this tenant");
            }
            if (str2 == null || str2.length() <= 0) {
                throw new ExecutionPlanConfigurationException("Invalid configuration provided, No execution plan name.");
            }
            ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str2, tenantId);
            String fileName = executionPlanConfigurationFileByPlanName == null ? str2 + ".xml" : executionPlanConfigurationFileByPlanName.getFileName();
            EventProcessorConfigurationFilesystemInvoker.delete(fileName, axisConfiguration);
            EventProcessorConfigurationFilesystemInvoker.save(str, str2, fileName, axisConfiguration);
        } catch (XMLStreamException e) {
            log.error("Error while creating the xml object");
            throw new ExecutionPlanConfigurationException("Not a valid xml object, ", e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editInactiveExecutionPlanConfiguration(String str, String str2, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        try {
            int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
            OMElement stringToOM = AXIOMUtil.stringToOM(str);
            EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(stringToOM, tenantId);
            ExecutionPlanConfiguration fromOM = EventProcessorConfigurationHelper.fromOM(stringToOM);
            EventProcessorConfigurationFilesystemInvoker.delete(str2, axisConfiguration);
            EventProcessorConfigurationFilesystemInvoker.save(str, fromOM.getName(), str2, axisConfiguration);
        } catch (XMLStreamException e) {
            log.error("Error while creating the xml object");
            throw new ExecutionPlanConfigurationException("Not a valid xml object ", e);
        }
    }

    public void addExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, AxisConfiguration axisConfiguration) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException, ServiceDependencyValidationException {
        SiddhiEventConsumer siddhiInputEventDispatcher;
        SiddhiOutputStreamListener siddhiOutputStreamListener;
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificExecutionPlans.put(Integer.valueOf(tenantId), map);
        } else if (map.get(executionPlanConfiguration.getName()) != null) {
            throw new ExecutionPlanConfigurationException("Execution plan with the same name already exists. Please remove it and retry.");
        }
        for (StreamConfiguration streamConfiguration : executionPlanConfiguration.getImportedStreams()) {
            try {
                if (EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration.getStreamId(), tenantId) == null) {
                    throw new ExecutionPlanDependencyValidationException(streamConfiguration.getStreamId(), "Imported Stream " + streamConfiguration.getStreamId() + " does not exist");
                }
            } catch (EventStreamConfigurationException e) {
                throw new ExecutionPlanConfigurationException("Error in retrieving stream ID : " + streamConfiguration.getStreamId());
            }
        }
        for (StreamConfiguration streamConfiguration2 : executionPlanConfiguration.getExportedStreams()) {
            try {
                if (EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration2.getStreamId(), tenantId) == null) {
                    throw new ExecutionPlanDependencyValidationException(streamConfiguration2.getStreamId(), "Exported Stream " + streamConfiguration2.getStreamId() + " does not exist");
                }
            } catch (EventStreamConfigurationException e2) {
                throw new ExecutionPlanConfigurationException("Error in retrieving stream ID : " + streamConfiguration2.getStreamId());
            }
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(executionPlanConfiguration.getImportedStreams().size());
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        SiddhiManager siddhiManagerFor = getSiddhiManagerFor(executionPlanConfiguration, getSiddhiConfigurationFor(executionPlanConfiguration, tenantId), concurrentHashMap);
        for (StreamConfiguration streamConfiguration3 : executionPlanConfiguration.getImportedStreams()) {
            StreamDefinition streamDefinition = new StreamDefinition();
            streamDefinition.name(streamConfiguration3.getSiddhiStreamName());
            try {
                org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition2 = EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration3.getStreamId(), tenantId);
                populateAttributes(streamDefinition, streamDefinition2.getMetaData(), "meta_");
                populateAttributes(streamDefinition, streamDefinition2.getCorrelationData(), "correlation_");
                populateAttributes(streamDefinition, streamDefinition2.getPayloadData(), "");
                concurrentHashMap.put(streamDefinition2.getStreamId(), siddhiManagerFor.defineStream(streamDefinition));
                log.debug("input handler created for " + streamDefinition.getStreamId());
            } catch (EventStreamConfigurationException e3) {
            }
        }
        for (StreamConfiguration streamConfiguration4 : executionPlanConfiguration.getExportedStreams()) {
            StreamDefinition streamDefinition3 = new StreamDefinition();
            streamDefinition3.name(streamConfiguration4.getSiddhiStreamName());
            try {
                org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition4 = EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration4.getStreamId(), tenantId);
                populateAttributes(streamDefinition3, streamDefinition4.getMetaData(), "meta_");
                populateAttributes(streamDefinition3, streamDefinition4.getCorrelationData(), "correlation_");
                populateAttributes(streamDefinition3, streamDefinition4.getPayloadData(), "");
                siddhiManagerFor.defineStream(streamDefinition3);
                concurrentHashMap2.put(streamDefinition3.getStreamId(), streamDefinition3);
                log.debug("Stream defined for " + streamDefinition3.getStreamId());
            } catch (EventStreamConfigurationException e4) {
            }
        }
        HAManager hAManager = null;
        String str = executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_DISTRIBUTED_PROCESSING);
        if (str != null && str.equalsIgnoreCase("RedundantNode")) {
            hAManager = new HAManager(EventProcessorValueHolder.getHazelcastInstance(), executionPlanConfiguration.getName(), tenantId, siddhiManagerFor, concurrentHashMap.size(), this.currentCepMembershipInfo);
        }
        boolean z = false;
        if (str != null && str.equalsIgnoreCase("Distributed")) {
            z = true;
        }
        StormDeploymentConfig stormDeploymentConfig = EventProcessorValueHolder.getStormDeploymentConfig();
        if (!z) {
            try {
                siddhiManagerFor.addExecutionPlan(executionPlanConfiguration.getQueryExpressions());
            } catch (Exception e5) {
                throw new ExecutionPlanConfigurationException("Invalid query specified, " + e5.getMessage(), e5);
            }
        } else if (stormDeploymentConfig != null && stormDeploymentConfig.isManagerNode() && EventProcessorValueHolder.getStormManagerServer().isStormManager()) {
            try {
                TopologyManager.submitTopology(executionPlanConfiguration, siddhiManagerFor.getStreamDefinitions(), tenantId, stormDeploymentConfig.getTopologySubmitRetryInterval());
            } catch (StormDeploymentException e6) {
                throw new ExecutionPlanConfigurationException("Invalid distributed query specified, " + e6.getMessage(), e6);
            }
        }
        ExecutionPlan executionPlan = new ExecutionPlan(executionPlanConfiguration.getName(), siddhiManagerFor, executionPlanConfiguration, hAManager);
        map.put(executionPlanConfiguration.getName(), executionPlan);
        SiddhiStormOutputEventListener siddhiStormOutputEventListener = null;
        if (z && stormDeploymentConfig != null && stormDeploymentConfig.isPublisherNode()) {
            siddhiStormOutputEventListener = new SiddhiStormOutputEventListener(executionPlanConfiguration, tenantId, stormDeploymentConfig, stormDeploymentConfig.getHeartbeatInterval());
        }
        for (StreamConfiguration streamConfiguration5 : executionPlanConfiguration.getExportedStreams()) {
            if (hAManager != null) {
                siddhiOutputStreamListener = new SiddhiHAOutputStreamListener(streamConfiguration5.getSiddhiStreamName(), streamConfiguration5.getStreamId(), executionPlanConfiguration, tenantId);
                hAManager.addStreamCallback((SiddhiHAOutputStreamListener) siddhiOutputStreamListener);
            } else {
                siddhiOutputStreamListener = new SiddhiOutputStreamListener(streamConfiguration5.getSiddhiStreamName(), streamConfiguration5.getStreamId(), executionPlanConfiguration, tenantId);
            }
            if (z && stormDeploymentConfig != null && stormDeploymentConfig.isPublisherNode()) {
                siddhiStormOutputEventListener.registerOutputStreamListener((StreamDefinition) concurrentHashMap2.get(streamConfiguration5.getSiddhiStreamName()), siddhiOutputStreamListener);
            } else {
                siddhiManagerFor.addCallback(streamConfiguration5.getSiddhiStreamName(), siddhiOutputStreamListener);
            }
            try {
                EventProcessorValueHolder.getEventStreamService().subscribe(siddhiOutputStreamListener, tenantId);
            } catch (EventStreamConfigurationException e7) {
            }
            executionPlan.addProducer(siddhiOutputStreamListener);
        }
        executionPlan.addStormOutputListener(siddhiStormOutputEventListener);
        for (StreamConfiguration streamConfiguration6 : executionPlanConfiguration.getImportedStreams()) {
            InputHandler inputHandler = concurrentHashMap.get(streamConfiguration6.getStreamId());
            if (hAManager != null) {
                siddhiInputEventDispatcher = new SiddhiHAInputEventDispatcher(streamConfiguration6.getStreamId(), inputHandler, executionPlanConfiguration, tenantId, hAManager.getProcessThreadPoolExecutor(), hAManager.getThreadBarrier());
                hAManager.addInputEventDispatcher(streamConfiguration6.getStreamId(), (SiddhiHAInputEventDispatcher) siddhiInputEventDispatcher);
            } else if (z && stormDeploymentConfig != null && stormDeploymentConfig.isReceiverNode()) {
                org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition5 = null;
                try {
                    streamDefinition5 = EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration6.getStreamId(), tenantId);
                } catch (EventStreamConfigurationException e8) {
                }
                siddhiInputEventDispatcher = new SiddhiStormInputEventDispatcher(streamDefinition5, streamConfiguration6.getStreamId(), inputHandler.getStreamId(), executionPlanConfiguration, tenantId, stormDeploymentConfig);
            } else {
                siddhiInputEventDispatcher = new SiddhiInputEventDispatcher(streamConfiguration6.getStreamId(), inputHandler, executionPlanConfiguration, tenantId);
            }
            try {
                EventProcessorValueHolder.getEventStreamService().subscribe(siddhiInputEventDispatcher, tenantId);
                executionPlan.addConsumer(siddhiInputEventDispatcher);
            } catch (EventStreamConfigurationException e9) {
            }
        }
        if (hAManager != null) {
            hAManager.init();
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<org.wso2.carbon.databridge.commons.StreamDefinition> getSiddhiStreams(String[] strArr, String str) {
        SiddhiManager createMockSiddhiManager = createMockSiddhiManager(strArr, str);
        List<StreamDefinition> streamDefinitions = createMockSiddhiManager.getStreamDefinitions();
        ArrayList arrayList = new ArrayList(streamDefinitions.size());
        for (StreamDefinition streamDefinition : streamDefinitions) {
            arrayList.add(EventProcessorUtil.convertToDatabridgeStreamDefinition(streamDefinition, new StreamConfiguration(streamDefinition.getStreamId())));
        }
        createMockSiddhiManager.shutdown();
        return arrayList;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getExecutionPlanStatusAsString(String str) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list == null) {
            return EventProcessorConstants.NO_DEPENDENCY_INFO_MSG;
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (str != null && str.equals(new File(executionPlanConfigurationFile.getFileName()).getName())) {
                String deploymentStatusMessage = executionPlanConfigurationFile.getDeploymentStatusMessage();
                if (executionPlanConfigurationFile.getDependency() != null) {
                    deploymentStatusMessage = deploymentStatusMessage + " [Dependency: " + executionPlanConfigurationFile.getDependency() + "]";
                }
                return deploymentStatusMessage;
            }
        }
        return EventProcessorConstants.NO_DEPENDENCY_INFO_MSG;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public boolean validateSiddhiQueries(String[] strArr, String str) {
        createMockSiddhiManager(strArr, str).shutdown();
        return true;
    }

    private SiddhiManager createMockSiddhiManager(String[] strArr, String str) {
        SiddhiConfiguration siddhiConfiguration = new SiddhiConfiguration();
        siddhiConfiguration.setSiddhiExtensions(SiddhiExtensionLoader.loadSiddhiExtensions());
        SiddhiManager siddhiManager = new SiddhiManager(siddhiConfiguration);
        try {
            int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
            if (tenantId > -1) {
                DataSourceManager.getInstance().initTenant(tenantId);
            }
            for (CarbonDataSource carbonDataSource : EventProcessorValueHolder.getDataSourceService().getAllDataSources()) {
                try {
                    if (carbonDataSource.getDSObject() instanceof DataSource) {
                        siddhiManager.getSiddhiContext().addDataSource(carbonDataSource.getDSMInfo().getName(), (DataSource) carbonDataSource.getDSObject());
                    }
                } catch (Exception e) {
                    log.error("Unable to add the datasource" + carbonDataSource.getDSMInfo().getName(), e);
                }
            }
        } catch (DataSourceException e2) {
            log.error("Unable to access the datasource service", e2);
        }
        for (String str2 : strArr) {
            if (str2.trim().length() > 0) {
                siddhiManager.defineStream(str2);
            }
        }
        siddhiManager.addExecutionPlan(str);
        return siddhiManager;
    }

    private SiddhiManager getSiddhiManagerFor(ExecutionPlanConfiguration executionPlanConfiguration, SiddhiConfiguration siddhiConfiguration, Map<String, InputHandler> map) throws ExecutionPlanConfigurationException {
        SiddhiManager siddhiManager = new SiddhiManager(siddhiConfiguration);
        try {
            int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
            if (tenantId > -1) {
                DataSourceManager.getInstance().initTenant(tenantId);
            }
            for (CarbonDataSource carbonDataSource : EventProcessorValueHolder.getDataSourceService().getAllDataSources()) {
                try {
                    if (carbonDataSource.getDSObject() instanceof DataSource) {
                        siddhiManager.getSiddhiContext().addDataSource(carbonDataSource.getDSMInfo().getName(), (DataSource) carbonDataSource.getDSObject());
                    }
                } catch (Exception e) {
                    log.error("Unable to add the datasource" + carbonDataSource.getDSMInfo().getName(), e);
                }
            }
        } catch (DataSourceException e2) {
            log.error("Unable to populate the data sources in Siddhi engine.", e2);
        }
        int i = 0;
        try {
            i = Integer.parseInt(executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_SNAPSHOT_INTERVAL));
        } catch (NumberFormatException e3) {
            log.error("Unable to parse snapshot time interval.", e3);
        }
        if (i > 0) {
            if (null == EventProcessorValueHolder.getPersistenceStore()) {
            }
            siddhiManager.setPersistStore(EventProcessorValueHolder.getPersistenceStore());
        }
        return siddhiManager;
    }

    private SiddhiConfiguration getSiddhiConfigurationFor(ExecutionPlanConfiguration executionPlanConfiguration, int i) throws ServiceDependencyValidationException {
        SiddhiConfiguration siddhiConfiguration = new SiddhiConfiguration();
        siddhiConfiguration.setAsyncProcessing(false);
        siddhiConfiguration.setInstanceIdentifier("org.wso2.siddhi.instance-" + i + "-" + UUID.randomUUID().toString());
        String str = executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_DISTRIBUTED_PROCESSING);
        if (str != null && (str.equalsIgnoreCase("DistributedCache") || str.equalsIgnoreCase("true"))) {
            siddhiConfiguration.setDistributedProcessing(true);
            if (EventProcessorValueHolder.getHazelcastInstance() == null) {
                throw new ServiceDependencyValidationException(EventProcessorConstants.HAZELCAST_INSTANCE, "Hazelcast instance is not initialized.");
            }
            siddhiConfiguration.setInstanceIdentifier(EventProcessorValueHolder.getHazelcastInstance().getName());
        } else if (str == null || !str.equalsIgnoreCase("RedundantNode")) {
            siddhiConfiguration.setDistributedProcessing(false);
        } else {
            siddhiConfiguration.setDistributedProcessing(false);
            if (EventProcessorValueHolder.getHazelcastInstance() == null) {
                throw new ServiceDependencyValidationException(EventProcessorConstants.HAZELCAST_INSTANCE, "Hazelcast instance is not initialized.");
            }
            siddhiConfiguration.setInstanceIdentifier(EventProcessorValueHolder.getHazelcastInstance().getName());
        }
        siddhiConfiguration.setQueryPlanIdentifier("org.wso2.siddhi-" + i + "-" + executionPlanConfiguration.getName());
        siddhiConfiguration.setSiddhiExtensions(SiddhiExtensionLoader.loadSiddhiExtensions());
        return siddhiConfiguration;
    }

    public void notifyServiceAvailability(String str) {
        for (Integer num : this.tenantSpecificExecutionPlanFiles.keySet()) {
            try {
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(num.intValue());
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
                    activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE, str, num.intValue());
                    PrivilegedCarbonContext.endTenantFlow();
                } catch (ExecutionPlanConfigurationException e) {
                    log.error("Error while redeploying distributed execution plans.", e);
                    PrivilegedCarbonContext.endTenantFlow();
                }
            } catch (Throwable th) {
                PrivilegedCarbonContext.endTenantFlow();
                throw th;
            }
        }
    }

    private void removeExecutionPlanConfiguration(String str, int i) {
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map == null || !map.containsKey(str)) {
            return;
        }
        ExecutionPlan remove = map.remove(str);
        remove.shutdown();
        ExecutionPlanConfiguration executionPlanConfiguration = remove.getExecutionPlanConfiguration();
        boolean z = false;
        String str2 = executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_DISTRIBUTED_PROCESSING);
        if (str2 != null && str2.equalsIgnoreCase("Distributed")) {
            z = true;
        }
        StormDeploymentConfig stormDeploymentConfig = EventProcessorValueHolder.getStormDeploymentConfig();
        if (z && stormDeploymentConfig != null && stormDeploymentConfig.isManagerNode() && EventProcessorValueHolder.getStormManagerServer().isStormManager()) {
            try {
                TopologyManager.killTopology(executionPlanConfiguration.getName(), i);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        Iterator<SiddhiEventConsumer> it = remove.getSiddhiEventConsumers().iterator();
        while (it.hasNext()) {
            EventProcessorValueHolder.getEventStreamService().unsubscribe(it.next(), i);
        }
        Iterator<EventProducer> it2 = remove.getEventProducers().iterator();
        while (it2.hasNext()) {
            EventProcessorValueHolder.getEventStreamService().unsubscribe(it2.next(), i);
        }
    }

    public void addExecutionPlanConfigurationFile(ExecutionPlanConfigurationFile executionPlanConfigurationFile, int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        if (list == null) {
            list = new ArrayList();
            this.tenantSpecificExecutionPlanFiles.put(Integer.valueOf(i), list);
        }
        list.add(executionPlanConfigurationFile);
    }

    public void removeExecutionPlanConfigurationFile(String str, int i) {
        Iterator<ExecutionPlanConfigurationFile> it = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i)).iterator();
        while (it.hasNext()) {
            ExecutionPlanConfigurationFile next = it.next();
            if (new File(next.getFileName()).getName().equals(str)) {
                if (next.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    removeExecutionPlanConfiguration(next.getExecutionPlanName(), i);
                }
                it.remove();
                return;
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getActiveExecutionPlanConfigurationContent(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
        if (executionPlanConfigurationFileByPlanName == null) {
            throw new ExecutionPlanConfigurationException("Configuration file for " + str + "doesn't exist.");
        }
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(executionPlanConfigurationFileByPlanName.getFileName(), axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getInactiveExecutionPlanConfigurationContent(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(str, axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllActiveExecutionConfigurations(int i) {
        HashMap hashMap = new HashMap();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            for (Map.Entry<String, ExecutionPlan> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllExportedStreamSpecificActiveExecutionConfigurations(int i, String str) {
        HashMap hashMap = new HashMap();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            for (Map.Entry<String, ExecutionPlan> entry : map.entrySet()) {
                for (StreamConfiguration streamConfiguration : entry.getValue().getExecutionPlanConfiguration().getExportedStreams()) {
                    if ((streamConfiguration.getName() + EventProcessorConstants.STREAM_SEPARATOR + streamConfiguration.getVersion()).equals(str)) {
                        hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllImportedStreamSpecificActiveExecutionConfigurations(int i, String str) {
        HashMap hashMap = new HashMap();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            for (Map.Entry<String, ExecutionPlan> entry : map.entrySet()) {
                for (StreamConfiguration streamConfiguration : entry.getValue().getExecutionPlanConfiguration().getImportedStreams()) {
                    if ((streamConfiguration.getName() + EventProcessorConstants.STREAM_SEPARATOR + streamConfiguration.getVersion()).equals(str)) {
                        hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public ExecutionPlanConfiguration getActiveExecutionPlanConfiguration(String str, int i) {
        ExecutionPlan executionPlan;
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map == null || (executionPlan = map.get(str)) == null) {
            return null;
        }
        return executionPlan.getExecutionPlanConfiguration();
    }

    public ExecutionPlan getActiveExecutionPlan(String str, int i) {
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            return map.get(str);
        }
        return null;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<ExecutionPlanConfigurationFile> getAllInactiveExecutionPlanConfiguration(int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.ERROR || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_DEPENDENCY || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        return arrayList;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setTracingEnabled(String str, boolean z, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map != null) {
            ExecutionPlan executionPlan = map.get(str);
            executionPlan.getExecutionPlanConfiguration().setTracingEnabled(z);
            editExecutionPlanConfiguration(executionPlan.getExecutionPlanConfiguration(), str, tenantId, axisConfiguration);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setStatisticsEnabled(String str, boolean z, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map != null) {
            ExecutionPlan executionPlan = map.get(str);
            executionPlan.getExecutionPlanConfiguration().setStatisticsEnabled(z);
            editExecutionPlanConfiguration(executionPlan.getExecutionPlanConfiguration(), str, tenantId, axisConfiguration);
        }
    }

    public void activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status status, String str, int i) throws ExecutionPlanConfigurationException {
        List<ExecutionPlanConfigurationFile> list;
        ArrayList<ExecutionPlanConfigurationFile> arrayList = new ArrayList();
        if (this.tenantSpecificExecutionPlanFiles != null && this.tenantSpecificExecutionPlanFiles.size() > 0 && (list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i))) != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus().equals(status) && str.equalsIgnoreCase(executionPlanConfigurationFile.getDependency())) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile2 : arrayList) {
            try {
                EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFile2.getFilePath(), executionPlanConfigurationFile2.getAxisConfiguration());
            } catch (Exception e) {
                log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFile2.getFileName()).getName());
            }
        }
    }

    public void deactivateActiveExecutionPlanConfigurations(String str, int i) {
        ArrayList arrayList = new ArrayList();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            for (ExecutionPlan executionPlan : map.values()) {
                boolean z = false;
                Iterator<EventProducer> it = executionPlan.getEventProducers().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (it.next().getStreamId().equals(str)) {
                        arrayList.add(executionPlan.getName());
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    Iterator<SiddhiEventConsumer> it2 = executionPlan.getSiddhiEventConsumers().iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            if (it2.next().getStreamId().equals(str)) {
                                arrayList.add(executionPlan.getName());
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }
        }
        if (arrayList.size() > 0) {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName((String) it3.next(), i);
                try {
                    EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFileByPlanName.getFilePath(), executionPlanConfigurationFileByPlanName.getAxisConfiguration());
                } catch (Exception e) {
                    log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFileByPlanName.getFileName()).getName());
                }
            }
        }
    }

    private ExecutionPlanConfigurationFile getExecutionPlanConfigurationFileByPlanName(String str, int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        if (list == null) {
            return null;
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (str.equals(executionPlanConfigurationFile.getExecutionPlanName()) && executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                return executionPlanConfigurationFile;
            }
        }
        return null;
    }

    private void editExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, String str, int i, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str, i);
        String fileName = executionPlanConfigurationFileByPlanName.getFileName();
        EventProcessorConfigurationFilesystemInvoker.delete(executionPlanConfigurationFileByPlanName.getFileName(), axisConfiguration);
        EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationHelper.toOM(executionPlanConfiguration), str, fileName, axisConfiguration);
    }

    private void validateToRemoveInactiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        String str2 = str + ".xml";
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getFileName().equals(str2) && !executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    EventProcessorConfigurationFilesystemInvoker.delete(str2, axisConfiguration);
                    return;
                }
            }
        }
    }

    private boolean checkExecutionPlanValidity(String str, int i) throws ExecutionPlanConfigurationException {
        Map<String, ExecutionPlanConfiguration> allActiveExecutionConfigurations = getAllActiveExecutionConfigurations(i);
        if (allActiveExecutionConfigurations == null) {
            return true;
        }
        Iterator<String> it = allActiveExecutionConfigurations.keySet().iterator();
        while (it.hasNext()) {
            if (str.equalsIgnoreCase(it.next())) {
                return false;
            }
        }
        return true;
    }

    public boolean isExecutionPlanFileAlreadyExist(String str, int i) throws ExecutionPlanConfigurationException {
        List<ExecutionPlanConfigurationFile> list;
        if (this.tenantSpecificExecutionPlanFiles.size() <= 0 || (list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i))) == null) {
            return false;
        }
        Iterator<ExecutionPlanConfigurationFile> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getExecutionPlanName().equals(str)) {
                return true;
            }
        }
        return false;
    }

    public void addCurrentCEPMembership(CEPMembership cEPMembership) {
        this.currentCepMembershipInfo = cEPMembership;
    }
}
