package org.wso2.carbon.event.processor.core.internal;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.processor.core.EventProcessorService;
import org.wso2.carbon.event.processor.core.ExecutionPlan;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfigurationFile;
import org.wso2.carbon.event.processor.core.StreamConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanDependencyValidationException;
import org.wso2.carbon.event.processor.core.exception.StormDeploymentException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener;
import org.wso2.carbon.event.processor.core.internal.storm.SiddhiStormInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.storm.SiddhiStormOutputEventListener;
import org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager;
import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.StormStatusMapListener;
import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.StormStatusMonitor;
import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.exception.DeploymentStatusMonitorException;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
import org.wso2.carbon.event.processor.core.internal.util.helper.EventProcessorHelper;
import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
import org.wso2.carbon.event.processor.core.util.ExecutionPlanStatusHolder;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.ManagementModeInfo;
import org.wso2.carbon.event.processor.manager.core.config.Mode;
import org.wso2.carbon.event.stream.core.EventProducer;
import org.wso2.carbon.event.stream.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.annotation.Element;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.util.AnnotationHelper;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;
import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/CarbonEventProcessorService.class */
public class CarbonEventProcessorService implements EventProcessorService {
    private static final Log log = LogFactory.getLog(CarbonEventProcessorService.class);
    private Map<Integer, ConcurrentHashMap<String, ExecutionPlan>> tenantSpecificExecutionPlans = new ConcurrentHashMap();
    private Map<Integer, List<ExecutionPlanConfigurationFile>> tenantSpecificExecutionPlanFiles = new ConcurrentHashMap();
    private ManagementModeInfo managementInfo;

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void deployExecutionPlan(String str) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException {
        try {
            String value = AnnotationHelper.getAnnotationElement("name", (String) null, SiddhiCompiler.parse(str).getAnnotations()).getValue();
            if (!isExecutionPlanAlreadyExist(value)) {
                throw new ExecutionPlanConfigurationException(value + " already registered as an execution in this tenant");
            }
            String path = EventProcessorUtil.getAxisConfiguration().getRepository().getPath();
            File file = new File(path);
            if (!file.exists()) {
                synchronized (path.intern()) {
                    if (!file.mkdir()) {
                        throw new ExecutionPlanConfigurationException("Cannot create directory to add tenant specific execution plan : " + value);
                    }
                }
            }
            String str2 = file.getAbsolutePath() + File.separator + EventProcessorConstants.EP_ELE_DIRECTORY;
            File file2 = new File(str2);
            if (!file2.exists()) {
                synchronized (str2.intern()) {
                    if (!file2.mkdir()) {
                        throw new ExecutionPlanConfigurationException("Cannot create directory executionplans to add tenant specific  execution plan :" + value);
                    }
                }
            }
            validateToRemoveInactiveExecutionPlanConfiguration(value);
            EventProcessorConfigurationFilesystemInvoker.save(str, value, value + ".siddhiql");
        } catch (SiddhiParserException e) {
            throw new ExecutionPlanConfigurationException("Couldn't parse execution plan: \n" + str + EventProcessorConstants.SIDDHI_LINE_SEPARATER);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployInactiveExecutionPlan(String str) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(str);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployActiveExecutionPlan(String str) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(getExecutionPlanConfigurationFileByPlanName(str).getFileName());
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editActiveExecutionPlan(String str, String str2) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        EventProcessorHelper.validateExecutionPlan(str);
        String value = AnnotationHelper.getAnnotationElement("name", (String) null, SiddhiCompiler.parse(str).getAnnotations()).getValue();
        if (!value.equals(str2) && !isExecutionPlanAlreadyExist(value)) {
            throw new ExecutionPlanConfigurationException(value + EventProcessorConstants.SPACE + "already registered as an execution in this tenant");
        }
        if (str2 == null || str2.length() <= 0) {
            throw new ExecutionPlanConfigurationException("Invalid configuration provided, No execution plan name.");
        }
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str2);
        EventProcessorConfigurationFilesystemInvoker.delete(executionPlanConfigurationFileByPlanName == null ? str2 + ".siddhiql" : executionPlanConfigurationFileByPlanName.getFileName());
        EventProcessorConfigurationFilesystemInvoker.save(str, value, value + ".siddhiql");
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editInactiveExecutionPlan(String str, String str2) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        EventProcessorHelper.validateExecutionPlan(str);
        String value = AnnotationHelper.getAnnotationElement("name", (String) null, SiddhiCompiler.parse(str).getAnnotations()).getValue();
        EventProcessorConfigurationFilesystemInvoker.delete(str2);
        EventProcessorConfigurationFilesystemInvoker.save(str, value, str2);
    }

    public void addExecutionPlan(String str, boolean z) throws ExecutionPlanConfigurationException {
        AbstractSiddhiInputEventDispatcher siddhiInputEventDispatcher;
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        SiddhiManager siddhiManager = EventProcessorValueHolder.getSiddhiManager();
        EventProcessorHelper.loadDataSourceConfiguration(siddhiManager);
        org.wso2.siddhi.query.api.ExecutionPlan parse = SiddhiCompiler.parse(str);
        ExecutionPlanConfiguration executionPlanConfiguration = new ExecutionPlanConfiguration();
        executionPlanConfiguration.setExecutionPlan(str);
        String value = AnnotationHelper.getAnnotationElement("name", (String) null, parse.getAnnotations()).getValue();
        executionPlanConfiguration.setName(value);
        Element annotationElement = AnnotationHelper.getAnnotationElement("description", (String) null, parse.getAnnotations());
        if (annotationElement != null) {
            executionPlanConfiguration.setDescription(annotationElement.getValue());
        } else {
            executionPlanConfiguration.setDescription("");
        }
        Element annotationElement2 = AnnotationHelper.getAnnotationElement("trace", (String) null, parse.getAnnotations());
        if (annotationElement2 != null) {
            executionPlanConfiguration.setTracingEnabled(Boolean.valueOf(annotationElement2.getValue()).booleanValue());
        } else {
            executionPlanConfiguration.setTracingEnabled(false);
        }
        Element annotationElement3 = AnnotationHelper.getAnnotationElement("statistics", (String) null, parse.getAnnotations());
        if (annotationElement3 != null) {
            executionPlanConfiguration.setStatisticsEnabled(Boolean.valueOf(annotationElement3.getValue()).booleanValue());
        } else {
            executionPlanConfiguration.setStatisticsEnabled(false);
        }
        executionPlanConfiguration.setEditable(z);
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.tenantSpecificExecutionPlans.put(Integer.valueOf(tenantId), concurrentHashMap);
        } else if (concurrentHashMap.get(value) != null) {
            throw new ExecutionPlanConfigurationException("Execution plan with the same name already exists. Please remove it and retry.");
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : parse.getStreamDefinitionMap().entrySet()) {
            String str2 = (String) entry.getKey();
            Element annotationElement4 = AnnotationHelper.getAnnotationElement(EventProcessorConstants.ANNOTATION_IMPORT, (String) null, ((StreamDefinition) entry.getValue()).getAnnotations());
            Element annotationElement5 = AnnotationHelper.getAnnotationElement(EventProcessorConstants.ANNOTATION_EXPORT, (String) null, ((StreamDefinition) entry.getValue()).getAnnotations());
            if (annotationElement4 != null) {
                String value2 = annotationElement4.getValue();
                hashMap.put(str2, value2);
                String[] split = value2.split(":");
                executionPlanConfiguration.addImportedStream(new StreamConfiguration(split[0], split[1], str2));
            }
            if (annotationElement5 != null) {
                String value3 = annotationElement5.getValue();
                hashMap2.put(str2, value3);
                String[] split2 = value3.split(":");
                executionPlanConfiguration.addExportedStream(new StreamConfiguration(split2[0], split2[1], str2));
            }
        }
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap(hashMap.size());
        ArrayList arrayList = new ArrayList(executionPlanConfiguration.getImportedStreams().size());
        for (StreamConfiguration streamConfiguration : executionPlanConfiguration.getImportedStreams()) {
            try {
                arrayList.add(EventProcessorUtil.getDefinitionString(EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration.getStreamId()), streamConfiguration.getSiddhiStreamName()));
            } catch (EventStreamConfigurationException e) {
            }
        }
        ArrayList arrayList2 = new ArrayList(executionPlanConfiguration.getExportedStreams().size());
        for (StreamConfiguration streamConfiguration2 : executionPlanConfiguration.getExportedStreams()) {
            try {
                arrayList2.add(EventProcessorUtil.getDefinitionString(EventProcessorValueHolder.getEventStreamService().getStreamDefinition(streamConfiguration2.getStreamId()), streamConfiguration2.getSiddhiStreamName()));
            } catch (EventStreamConfigurationException e2) {
            }
        }
        DistributedConfiguration stormDeploymentConfiguration = EventProcessorValueHolder.getStormDeploymentConfiguration();
        try {
            ExecutionPlanRuntime createExecutionPlanRuntime = siddhiManager.createExecutionPlanRuntime(str);
            if (this.managementInfo.getMode() == Mode.Distributed && stormDeploymentConfiguration != null && stormDeploymentConfiguration.isManagerNode() && EventProcessorValueHolder.getStormManagerServer().isStormCoordinator()) {
                try {
                    EventProcessorValueHolder.getStormTopologyManager().submitTopology(executionPlanConfiguration, arrayList, arrayList2, tenantId, stormDeploymentConfiguration.getTopologySubmitRetryInterval());
                } catch (StormDeploymentException e3) {
                    throw new ExecutionPlanConfigurationException("Invalid distributed query specified, " + e3.getMessage(), e3);
                }
            }
            for (Map.Entry entry2 : hashMap.entrySet()) {
                concurrentHashMap2.put(entry2.getValue(), createExecutionPlanRuntime.getInputHandler((String) entry2.getKey()));
            }
            ExecutionPlan executionPlan = new ExecutionPlan(value, createExecutionPlanRuntime, executionPlanConfiguration);
            concurrentHashMap.put(value, executionPlan);
            boolean z2 = this.managementInfo.getMode() == Mode.Distributed && stormDeploymentConfiguration != null && stormDeploymentConfiguration.isWorkerNode();
            StormStatusMonitor stormStatusMonitor = null;
            if (z2) {
                StormStatusMapListener stormStatusMapListener = null;
                try {
                    stormStatusMonitor = new StormStatusMonitor(tenantId, value, hashMap.size());
                    stormStatusMapListener = new StormStatusMapListener(value, tenantId, stormStatusMonitor);
                } catch (DeploymentStatusMonitorException e4) {
                    log.error("Failed to initialize map listener. Reason: " + e4.getMessage(), e4);
                }
                executionPlan.setStormStatusMonitor(stormStatusMonitor);
                executionPlan.setStormStatusMapListener(stormStatusMapListener);
            }
            SiddhiStormOutputEventListener siddhiStormOutputEventListener = null;
            if (this.managementInfo.getMode() == Mode.Distributed && this.managementInfo.getDistributedConfiguration().isWorkerNode()) {
                siddhiStormOutputEventListener = new SiddhiStormOutputEventListener(executionPlanConfiguration, tenantId, stormDeploymentConfiguration, stormStatusMonitor);
                executionPlan.addStormOutputListener(siddhiStormOutputEventListener);
            }
            for (Map.Entry entry3 : hashMap2.entrySet()) {
                SiddhiOutputStreamListener siddhiOutputStreamListener = new SiddhiOutputStreamListener((String) entry3.getKey(), (String) entry3.getValue(), executionPlanConfiguration, tenantId);
                if (this.managementInfo.getMode() == Mode.Distributed && stormDeploymentConfiguration != null && stormDeploymentConfiguration.isWorkerNode()) {
                    try {
                        siddhiStormOutputEventListener.registerOutputStreamListener(EventProcessorUtil.convertToSiddhiStreamDefinition(EventProcessorValueHolder.getEventStreamService().getStreamDefinition((String) entry3.getValue()), (String) entry3.getKey()), siddhiOutputStreamListener);
                    } catch (EventStreamConfigurationException e5) {
                    }
                } else {
                    createExecutionPlanRuntime.addCallback((String) entry3.getKey(), siddhiOutputStreamListener);
                }
                try {
                    EventProcessorValueHolder.getEventStreamService().subscribe(siddhiOutputStreamListener);
                } catch (EventStreamConfigurationException e6) {
                }
                executionPlan.addProducer(siddhiOutputStreamListener);
            }
            ArrayList<AbstractSiddhiInputEventDispatcher> arrayList3 = new ArrayList();
            for (Map.Entry entry4 : hashMap.entrySet()) {
                InputHandler inputHandler = (InputHandler) concurrentHashMap2.get(entry4.getValue());
                if (z2) {
                    org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition = null;
                    try {
                        streamDefinition = EventProcessorValueHolder.getEventStreamService().getStreamDefinition((String) entry4.getValue());
                    } catch (EventStreamConfigurationException e7) {
                    }
                    siddhiInputEventDispatcher = new SiddhiStormInputEventDispatcher(streamDefinition, (String) entry4.getKey(), executionPlanConfiguration, tenantId, stormDeploymentConfiguration, stormStatusMonitor);
                } else {
                    siddhiInputEventDispatcher = new SiddhiInputEventDispatcher((String) entry4.getValue(), inputHandler, executionPlanConfiguration, tenantId);
                }
                arrayList3.add(siddhiInputEventDispatcher);
            }
            if (createExecutionPlanRuntime != null) {
                createExecutionPlanRuntime.start();
            }
            for (AbstractSiddhiInputEventDispatcher abstractSiddhiInputEventDispatcher : arrayList3) {
                try {
                    EventProcessorValueHolder.getEventStreamService().subscribe(abstractSiddhiInputEventDispatcher);
                    executionPlan.addConsumer(abstractSiddhiInputEventDispatcher);
                } catch (EventStreamConfigurationException e8) {
                }
            }
            if (EventProcessorValueHolder.getPersistenceConfiguration() != null) {
                createExecutionPlanRuntime.restoreLastRevision();
            }
        } catch (Exception e9) {
            throw new ExecutionPlanConfigurationException("Invalid query specified, " + e9.getMessage(), e9);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<org.wso2.carbon.databridge.commons.StreamDefinition> getSiddhiStreams(String str) {
        SiddhiManager siddhiManager = EventProcessorValueHolder.getSiddhiManager();
        EventProcessorHelper.loadDataSourceConfiguration(siddhiManager);
        ExecutionPlanRuntime createExecutionPlanRuntime = siddhiManager.createExecutionPlanRuntime(str);
        Collection<StreamDefinition> values = createExecutionPlanRuntime.getStreamDefinitionMap().values();
        ArrayList arrayList = new ArrayList(values.size());
        for (StreamDefinition streamDefinition : values) {
            arrayList.add(EventProcessorUtil.convertToDatabridgeStreamDefinition(streamDefinition, new StreamConfiguration(streamDefinition.getId())));
        }
        createExecutionPlanRuntime.shutdown();
        return arrayList;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public boolean isDistributedProcessingEnabled() {
        return this.managementInfo.getMode() == Mode.Distributed;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, String> getAllExecutionPlanStatusesInStorm() {
        String executionPlanStatus;
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Set<String> keySet = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId)).keySet();
        HashMap hashMap = new HashMap();
        for (String str : keySet) {
            if (hazelcastInstance == null) {
                executionPlanStatus = "No status info available. \nTo get status info, enable clustering.";
            } else {
                ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP).get(StormTopologyManager.getTopologyName(str, tenantId));
                executionPlanStatus = executionPlanStatusHolder == null ? "Execution plan not deployed to a manager. Hence no status info available." : executionPlanStatusHolder.getExecutionPlanStatus();
            }
            hashMap.put(str, executionPlanStatus);
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void validateExecutionPlan(String str) throws ExecutionPlanConfigurationException, ExecutionPlanDependencyValidationException {
        EventProcessorHelper.validateExecutionPlan(str);
    }

    public ManagementModeInfo getManagementInfo() {
        return this.managementInfo;
    }

    public void setManagementInfo(ManagementModeInfo managementModeInfo) {
        this.managementInfo = managementModeInfo;
    }

    public void notifyServiceAvailability(String str) {
        for (Integer num : this.tenantSpecificExecutionPlanFiles.keySet()) {
            try {
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(num.intValue());
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
                    activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE, str);
                    PrivilegedCarbonContext.endTenantFlow();
                } catch (ExecutionPlanConfigurationException e) {
                    log.error("Error while redeploying distributed execution plans.", e);
                    PrivilegedCarbonContext.endTenantFlow();
                }
            } catch (Throwable th) {
                PrivilegedCarbonContext.endTenantFlow();
                throw th;
            }
        }
    }

    private void removeExecutionPlanConfiguration(String str) {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (concurrentHashMap == null || !concurrentHashMap.containsKey(str)) {
            return;
        }
        ExecutionPlan remove = concurrentHashMap.remove(str);
        remove.shutdown();
        ExecutionPlanConfiguration executionPlanConfiguration = remove.getExecutionPlanConfiguration();
        DistributedConfiguration stormDeploymentConfiguration = EventProcessorValueHolder.getStormDeploymentConfiguration();
        if (this.managementInfo.getMode() == Mode.Distributed && stormDeploymentConfiguration != null && stormDeploymentConfiguration.isManagerNode() && EventProcessorValueHolder.getStormManagerServer().isStormCoordinator()) {
            try {
                removeExecutionPlanStatusHolder(executionPlanConfiguration.getName(), tenantId);
                EventProcessorValueHolder.getStormTopologyManager().killTopology(executionPlanConfiguration.getName(), tenantId);
                EventProcessorValueHolder.getStormManagerServer().onExecutionPlanRemove(str, tenantId);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        Iterator<SiddhiEventConsumer> it = remove.getSiddhiEventConsumers().iterator();
        while (it.hasNext()) {
            EventProcessorValueHolder.getEventStreamService().unsubscribe(it.next());
        }
        Iterator<EventProducer> it2 = remove.getEventProducers().iterator();
        while (it2.hasNext()) {
            EventProcessorValueHolder.getEventStreamService().unsubscribe(it2.next());
        }
    }

    void removeExecutionPlanStatusHolder(String str, int i) {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't clean status info for execution plan: " + str + ", for tenant ID : " + i + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        String topologyName = StormTopologyManager.getTopologyName(str, i);
        map.remove(topologyName, (ExecutionPlanStatusHolder) map.get(topologyName));
    }

    public void addExecutionPlanConfigurationFile(ExecutionPlanConfigurationFile executionPlanConfigurationFile) {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(tenantId));
        if (list == null) {
            list = new CopyOnWriteArrayList();
            this.tenantSpecificExecutionPlanFiles.put(Integer.valueOf(tenantId), list);
        }
        list.add(executionPlanConfigurationFile);
    }

    public void removeExecutionPlanConfigurationFile(String str) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (new File(executionPlanConfigurationFile.getFileName()).getName().equals(str)) {
                if (executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    removeExecutionPlanConfiguration(executionPlanConfigurationFile.getExecutionPlanName());
                }
                list.remove(executionPlanConfigurationFile);
                return;
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getActiveExecutionPlan(String str) throws ExecutionPlanConfigurationException {
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str);
        if (executionPlanConfigurationFileByPlanName == null) {
            throw new ExecutionPlanConfigurationException("Configuration file for " + str + "doesn't exist.");
        }
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(executionPlanConfigurationFileByPlanName.getFileName());
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getInactiveExecutionPlan(String str) throws ExecutionPlanConfigurationException {
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(str);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllActiveExecutionConfigurations() {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            for (Map.Entry<String, ExecutionPlan> entry : concurrentHashMap.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllExportedStreamSpecificActiveExecutionConfigurations(String str) {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            for (Map.Entry<String, ExecutionPlan> entry : concurrentHashMap.entrySet()) {
                for (StreamConfiguration streamConfiguration : entry.getValue().getExecutionPlanConfiguration().getExportedStreams()) {
                    if ((streamConfiguration.getName() + ":" + streamConfiguration.getVersion()).equals(str)) {
                        hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllImportedStreamSpecificActiveExecutionConfigurations(String str) {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            for (Map.Entry<String, ExecutionPlan> entry : concurrentHashMap.entrySet()) {
                for (StreamConfiguration streamConfiguration : entry.getValue().getExecutionPlanConfiguration().getImportedStreams()) {
                    if ((streamConfiguration.getName() + ":" + streamConfiguration.getVersion()).equals(str)) {
                        hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public ExecutionPlanConfiguration getActiveExecutionPlanConfiguration(String str) {
        ExecutionPlan executionPlan;
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap == null || (executionPlan = concurrentHashMap.get(str)) == null) {
            return null;
        }
        return executionPlan.getExecutionPlanConfiguration();
    }

    public ExecutionPlan getActiveExecutionPlan(String str, int i) {
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (concurrentHashMap != null) {
            return concurrentHashMap.get(str);
        }
        return null;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<ExecutionPlanConfigurationFile> getAllInactiveExecutionPlanConfiguration() {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.ERROR || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_DEPENDENCY || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        return arrayList;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setTracingEnabled(String str, boolean z) throws ExecutionPlanConfigurationException {
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            ExecutionPlanConfiguration executionPlanConfiguration = concurrentHashMap.get(str).getExecutionPlanConfiguration();
            executionPlanConfiguration.setTracingEnabled(z);
            String executionPlanAnnotationName = EventProcessorHelper.setExecutionPlanAnnotationName(executionPlanConfiguration.getExecutionPlan(), "trace", z);
            executionPlanConfiguration.setExecutionPlan(executionPlanAnnotationName);
            String fileName = getExecutionPlanConfigurationFileByPlanName(str).getFileName();
            EventProcessorConfigurationFilesystemInvoker.delete(fileName);
            EventProcessorConfigurationFilesystemInvoker.save(executionPlanAnnotationName, str, fileName);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setStatisticsEnabled(String str, boolean z) throws ExecutionPlanConfigurationException {
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            ExecutionPlanConfiguration executionPlanConfiguration = concurrentHashMap.get(str).getExecutionPlanConfiguration();
            executionPlanConfiguration.setStatisticsEnabled(z);
            String executionPlanAnnotationName = EventProcessorHelper.setExecutionPlanAnnotationName(executionPlanConfiguration.getExecutionPlan(), "statistics", z);
            executionPlanConfiguration.setExecutionPlan(executionPlanAnnotationName);
            String fileName = getExecutionPlanConfigurationFileByPlanName(str).getFileName();
            EventProcessorConfigurationFilesystemInvoker.delete(fileName);
            EventProcessorConfigurationFilesystemInvoker.save(executionPlanAnnotationName, str, fileName);
        }
    }

    public void activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status status, String str) throws ExecutionPlanConfigurationException {
        List<ExecutionPlanConfigurationFile> list;
        ArrayList<ExecutionPlanConfigurationFile> arrayList = new ArrayList();
        if (this.tenantSpecificExecutionPlanFiles != null && this.tenantSpecificExecutionPlanFiles.size() > 0 && (list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()))) != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus().equals(status) && str.equalsIgnoreCase(executionPlanConfigurationFile.getDependency())) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile2 : arrayList) {
            try {
                EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFile2.getFilePath());
            } catch (ExecutionPlanConfigurationException e) {
                log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFile2.getFileName()).getName() + "," + e.getMessage(), e);
            }
        }
    }

    public void deactivateActiveExecutionPlanConfigurations(String str) {
        ArrayList arrayList = new ArrayList();
        ConcurrentHashMap<String, ExecutionPlan> concurrentHashMap = this.tenantSpecificExecutionPlans.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap != null) {
            for (ExecutionPlan executionPlan : concurrentHashMap.values()) {
                boolean z = false;
                Iterator<EventProducer> it = executionPlan.getEventProducers().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (it.next().getStreamId().equals(str)) {
                        arrayList.add(executionPlan.getName());
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    Iterator<SiddhiEventConsumer> it2 = executionPlan.getSiddhiEventConsumers().iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            if (it2.next().getStreamId().equals(str)) {
                                arrayList.add(executionPlan.getName());
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }
        }
        if (arrayList.size() > 0) {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName((String) it3.next());
                try {
                    EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFileByPlanName.getFilePath());
                } catch (Exception e) {
                    log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFileByPlanName.getFileName()).getName());
                }
            }
        }
    }

    private ExecutionPlanConfigurationFile getExecutionPlanConfigurationFileByPlanName(String str) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list == null) {
            return null;
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (str.equals(executionPlanConfigurationFile.getExecutionPlanName()) && executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                return executionPlanConfigurationFile;
            }
        }
        return null;
    }

    private void validateToRemoveInactiveExecutionPlanConfiguration(String str) throws ExecutionPlanConfigurationException {
        String str2 = str + ".siddhiql";
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getFileName().equals(str2) && !executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    EventProcessorConfigurationFilesystemInvoker.delete(str2);
                    return;
                }
            }
        }
    }

    private boolean isExecutionPlanAlreadyExist(String str) throws ExecutionPlanConfigurationException {
        Map<String, ExecutionPlanConfiguration> allActiveExecutionConfigurations = getAllActiveExecutionConfigurations();
        if (allActiveExecutionConfigurations == null) {
            return true;
        }
        Iterator<String> it = allActiveExecutionConfigurations.keySet().iterator();
        while (it.hasNext()) {
            if (str.equalsIgnoreCase(it.next())) {
                return false;
            }
        }
        return true;
    }

    public boolean isExecutionPlanFileAlreadyExist(String str) throws ExecutionPlanConfigurationException {
        List<ExecutionPlanConfigurationFile> list;
        if (this.tenantSpecificExecutionPlanFiles.size() <= 0 || (list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()))) == null) {
            return false;
        }
        Iterator<ExecutionPlanConfigurationFile> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getFileName().equals(str)) {
                return true;
            }
        }
        return false;
    }

    public Map<Integer, ConcurrentHashMap<String, ExecutionPlan>> getTenantSpecificExecutionPlans() {
        return this.tenantSpecificExecutionPlans;
    }

    public void shutdown() {
        for (Map.Entry<Integer, ConcurrentHashMap<String, ExecutionPlan>> entry : this.tenantSpecificExecutionPlans.entrySet()) {
            for (ExecutionPlan executionPlan : entry.getValue().values()) {
                try {
                    executionPlan.shutdown();
                } catch (RuntimeException e) {
                    log.error("Error in shutting down ExecutionPlan '" + executionPlan.getName() + "' of tenant '" + entry.getKey() + "'," + e.getMessage(), e);
                }
            }
        }
        log.info("Successfully shutdown ExecutionPlans");
    }
}
