package org.wso2.carbon.event.processor.core.internal.storm;

import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologySummary;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.w3c.dom.Document;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.StormDeploymentException;
import org.wso2.carbon.event.processor.core.exception.StormQueryConstructionException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormQueryPlanBuilder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.utils.CarbonUtils;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/TopologyManager.class */
public class TopologyManager {
    private static Map stormConfig;
    private static Nimbus.Client client;
    private static String jarLocation;
    private static final Log log = LogFactory.getLog(TopologyManager.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/TopologyManager$TopologySubmitter.class */
    public static class TopologySubmitter implements Runnable {
        String executionPlanName;
        String uploadedJarLocation;
        StormTopology topology;
        int tenantId;
        boolean isTopologyAlive;
        int retryInterval;

        public TopologySubmitter(String str, String str2, StormTopology stormTopology, int i, boolean z, int i2) {
            this.executionPlanName = str;
            this.uploadedJarLocation = str2;
            this.topology = stormTopology;
            this.tenantId = i;
            this.isTopologyAlive = z;
            this.retryInterval = i2;
        }

        private boolean submitTopology() {
            String jSONString = JSONValue.toJSONString(TopologyManager.stormConfig);
            try {
                if (this.isTopologyAlive) {
                    TopologyManager.log.info("Killing already existing storm topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' to re-submit");
                    KillOptions killOptions = new KillOptions();
                    killOptions.set_wait_secs(10);
                    TopologyManager.client.killTopologyWithOpts(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId), killOptions);
                    TopologyManager.waitForTopologyToBeRemoved(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId));
                }
                TopologyManager.client.submitTopology(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId), this.uploadedJarLocation, jSONString, this.topology);
                TopologyManager.log.info("Successfully submitted storm topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + EventProcessorConstants.SIDDHI_SINGLE_QUOTE);
                TopologyManager.waitForTopologyToBeActive(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId));
                return true;
            } catch (TException e) {
                TopologyManager.log.error("Error connecting to storm when trying to submit topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + EventProcessorConstants.SIDDHI_SINGLE_QUOTE, e);
                return false;
            } catch (AlreadyAliveException e2) {
                TopologyManager.log.warn("Topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' already existing. Trying to kill and re-submit", e2);
                return false;
            } catch (InvalidTopologyException e3) {
                return true;
            } catch (NotAliveException e4) {
                TopologyManager.log.info("Topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' is not alive to kill");
                this.isTopologyAlive = false;
                return false;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
                TopologyManager.log.info("Retrying to submit topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' in " + this.retryInterval + "ms");
                try {
                    Thread.sleep(this.retryInterval);
                } catch (InterruptedException e) {
                }
            } while (!submitTopology());
        }
    }

    public static List<TopologySummary> getTopologies() throws StormDeploymentException {
        try {
            return client.getClusterInfo().get_topologies();
        } catch (TException e) {
            throw new StormDeploymentException("Cannot get topologies from storm cluster", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitForTopologyToBeRemoved(String str) throws TException {
        log.info("Waiting topology '" + str + "' to be removed from Storm cluster");
        boolean z = false;
        while (true) {
            try {
                List list = client.getClusterInfo().get_topologies();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    if (((TopologySummary) it.next()).get_name().equals(str)) {
                        z = true;
                        Thread.sleep(5000L);
                    }
                }
                if (!z || list.isEmpty()) {
                    break;
                } else {
                    Thread.sleep(2000L);
                }
            } catch (InterruptedException e) {
                return;
            }
        }
        log.info("Topology '" + str + "' removed from Storm cluster");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitForTopologyToBeActive(String str) throws TException {
        log.info("Waiting topology '" + str + "' to be ACTIVE");
        while (true) {
            try {
                for (TopologySummary topologySummary : client.getClusterInfo().get_topologies()) {
                    if (topologySummary.get_name().equals(str)) {
                        if (topologySummary.get_status().equals("ACTIVE")) {
                            Thread.sleep(5000L);
                            log.info("Topology '" + str + "' is ACTIVE");
                            return;
                        }
                        Thread.sleep(2000L);
                    }
                }
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    public static void submitTopology(ExecutionPlanConfiguration executionPlanConfiguration, List<String> list, List<String> list2, int i, int i2) throws StormDeploymentException, ExecutionPlanConfigurationException {
        String name = executionPlanConfiguration.getName();
        try {
            String stringQueryPlan = getStringQueryPlan(StormQueryPlanBuilder.constructStormQueryPlanXML(executionPlanConfiguration, list, list2));
            if (log.isDebugEnabled()) {
                log.debug("Following is the generated Storm query plan for execution plan: " + executionPlanConfiguration.getName() + EventProcessorConstants.SIDDHI_LINE_SEPARATER + stringQueryPlan);
            }
            TopologyBuilder constructTopologyBuilder = StormTopologyConstructor.constructTopologyBuilder(stringQueryPlan, name, i, EventProcessorValueHolder.getStormDeploymentConfiguration());
            String submitJar = StormSubmitter.submitJar(stormConfig, jarLocation);
            try {
                client.submitTopology(getTopologyName(name, i), submitJar, JSONValue.toJSONString(stormConfig), constructTopologyBuilder.createTopology());
                log.info("Successfully submitted storm topology '" + getTopologyName(name, i) + EventProcessorConstants.SIDDHI_SINGLE_QUOTE);
                waitForTopologyToBeActive(getTopologyName(name, i));
            } catch (AlreadyAliveException e) {
                log.warn("Topology '" + getTopologyName(name, i) + "' already existing", e);
                new Thread(new TopologySubmitter(name, submitJar, constructTopologyBuilder.createTopology(), i, true, i2)).start();
            } catch (InvalidTopologyException e2) {
                throw new ExecutionPlanConfigurationException("Invalid Execution Plan " + name + " for tenant " + i, e2);
            } catch (TException e3) {
                log.warn("Error connecting to storm when trying to submit topology '" + getTopologyName(name, i) + EventProcessorConstants.SIDDHI_SINGLE_QUOTE, e3);
                new Thread(new TopologySubmitter(name, submitJar, constructTopologyBuilder.createTopology(), i, false, i2)).start();
            }
        } catch (TransformerException e4) {
            throw new StormDeploymentException("Error while converting to storm query plan string. Execution plan: " + name + " Tenant: " + i, e4);
        } catch (StormQueryConstructionException e5) {
            throw new StormDeploymentException("Error while converting to XML storm query plan. Execution plan: " + name + " Tenant: " + i + ". " + e5.getMessage(), e5);
        } catch (XMLStreamException e6) {
            throw new StormDeploymentException("Invalid Config for Execution Plan " + name + " for tenant " + i, e6);
        }
    }

    public static void killTopology(String str, int i) throws StormDeploymentException {
        try {
            log.info("Killing storm topology '" + str + EventProcessorConstants.SIDDHI_SINGLE_QUOTE);
            client.killTopologyWithOpts(getTopologyName(str, i), new KillOptions());
        } catch (TException e) {
            throw new StormDeploymentException("Error connecting to Storm", e);
        } catch (NotAliveException e2) {
        }
    }

    private static String getStringQueryPlan(Document document) throws TransformerException {
        Transformer newTransformer = TransformerFactory.newInstance().newTransformer();
        newTransformer.setOutputProperty("indent", "yes");
        StringWriter stringWriter = new StringWriter();
        newTransformer.transform(new DOMSource(document), new StreamResult(stringWriter));
        return stringWriter.toString();
    }

    public static String getTopologyName(String str, int i) {
        return str + "[" + i + "]";
    }

    static {
        String str = CarbonUtils.getCarbonConfigDirPath() + File.separator + "cep" + File.separator + "storm";
        try {
            Map map = (Map) new Yaml().load(new FileInputStream(new File(str + File.separator + "storm.yaml")));
            if (map != null) {
                stormConfig = Utils.readDefaultConfig();
                stormConfig.putAll(map);
            } else {
                stormConfig = Utils.readStormConfig();
            }
        } catch (FileNotFoundException e) {
            log.warn("Error occurred while reading storm configurations using default configurations", e);
        }
        client = NimbusClient.getConfiguredClient(stormConfig).getClient();
        jarLocation = str + File.separator + EventProcessorValueHolder.getStormDeploymentConfiguration().getJar();
    }
}
