package org.wso2.carbon.event.processor.core.internal.storm;

import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologySummary;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.w3c.dom.Document;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.StormDeploymentException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormQueryPlanBuilder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor;
import org.wso2.carbon.utils.CarbonUtils;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/TopologyManager.class */
public class TopologyManager {
    private static Map stormConfig;
    private static Nimbus.Client client;
    private static String jarLocation;
    private static final Log log = LogFactory.getLog(TopologyManager.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/TopologyManager$TopologySubmitter.class */
    public static class TopologySubmitter implements Runnable {
        String executionPlanName;
        String uploadedJarLocation;
        StormTopology topology;
        int tenantId;
        boolean isTopologyAlive;

        public TopologySubmitter(String str, String str2, StormTopology stormTopology, int i, boolean z) {
            this.executionPlanName = str;
            this.uploadedJarLocation = str2;
            this.topology = stormTopology;
            this.tenantId = i;
            this.isTopologyAlive = z;
        }

        private boolean submitTopology() {
            String jSONString = JSONValue.toJSONString(TopologyManager.stormConfig);
            try {
                if (this.isTopologyAlive) {
                    TopologyManager.log.info("Killing already existing storm topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' to re-submit");
                    KillOptions killOptions = new KillOptions();
                    killOptions.set_wait_secs(10);
                    TopologyManager.client.killTopologyWithOpts(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId), killOptions);
                    Thread.sleep(15000L);
                }
                TopologyManager.client.submitTopology(TopologyManager.getTopologyName(this.executionPlanName, this.tenantId), this.uploadedJarLocation, jSONString, this.topology);
                Thread.sleep(10000L);
            } catch (AlreadyAliveException e) {
                TopologyManager.log.warn("Topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' already existing", e);
                return false;
            } catch (InvalidTopologyException e2) {
            } catch (InterruptedException e3) {
            } catch (NotAliveException e4) {
                TopologyManager.log.info("Topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' is not alive to kill");
                this.isTopologyAlive = false;
                return false;
            } catch (TException e5) {
                TopologyManager.log.warn("Error connecting to storm when trying to submit topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "'", e5);
                return false;
            }
            TopologyManager.log.info("Successfully submitted storm topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "'");
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
                TopologyManager.log.info("Retrying to submit topology '" + TopologyManager.getTopologyName(this.executionPlanName, this.tenantId) + "' in 30 sec");
                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException e) {
                }
            } while (!submitTopology());
        }
    }

    public static List<TopologySummary> getTopologies() throws StormDeploymentException {
        try {
            return client.getClusterInfo().get_topologies();
        } catch (TException e) {
            throw new StormDeploymentException("Cannot get topologies from strom cluster", e);
        }
    }

    public static void submitTopology(ExecutionPlanConfiguration executionPlanConfiguration, List<StreamDefinition> list, int i) throws StormDeploymentException, ExecutionPlanConfigurationException {
        Document constructStormQueryPlanXML = StormQueryPlanBuilder.constructStormQueryPlanXML(executionPlanConfiguration, list);
        String name = executionPlanConfiguration.getName();
        try {
            TopologyBuilder constructTopologyBuilder = StormTopologyConstructor.constructTopologyBuilder(getStringQueryPlan(constructStormQueryPlanXML), name, i, EventProcessorValueHolder.getStormDeploymentConfig());
            String submitJar = StormSubmitter.submitJar(stormConfig, jarLocation);
            try {
                client.submitTopology(getTopologyName(name, i), submitJar, JSONValue.toJSONString(stormConfig), constructTopologyBuilder.createTopology());
                Thread.sleep(10000L);
                log.info("Successfully submitted storm topology '" + getTopologyName(name, i) + "'");
            } catch (InvalidTopologyException e) {
                throw new ExecutionPlanConfigurationException("Invalid Execution Plan " + name + " for tenant " + i, e);
            } catch (InterruptedException e2) {
            } catch (TException e3) {
                log.warn("Error connecting to storm when trying to submit topology '" + getTopologyName(name, i) + "'", e3);
                new Thread(new TopologySubmitter(name, submitJar, constructTopologyBuilder.createTopology(), i, false)).start();
            } catch (AlreadyAliveException e4) {
                log.warn("Topology '" + getTopologyName(name, i) + "' already existing", e4);
                new Thread(new TopologySubmitter(name, submitJar, constructTopologyBuilder.createTopology(), i, true)).start();
            }
        } catch (TransformerException e5) {
            throw new ExecutionPlanConfigurationException("Error while converting to storm query plan string. Execution plan: " + name + " Tenant: " + i, e5);
        } catch (XMLStreamException e6) {
            throw new ExecutionPlanConfigurationException("Invalid Config for Execution Plan " + name + " for tenant " + i, e6);
        }
    }

    public static void killTopology(String str, int i) throws StormDeploymentException {
        try {
            log.info("Killing storm topology '" + str + "'");
            client.killTopologyWithOpts(getTopologyName(str, i), new KillOptions());
        } catch (TException e) {
            throw new StormDeploymentException("Error connecting to Storm", e);
        } catch (NotAliveException e2) {
        }
    }

    private static String getStringQueryPlan(Document document) throws TransformerException {
        Transformer newTransformer = TransformerFactory.newInstance().newTransformer();
        newTransformer.setOutputProperty("indent", "yes");
        StringWriter stringWriter = new StringWriter();
        newTransformer.transform(new DOMSource(document), new StreamResult(stringWriter));
        return stringWriter.toString();
    }

    public static String getTopologyName(String str, int i) {
        return str + "[" + i + "]";
    }

    static {
        String str = CarbonUtils.getCarbonConfigDirPath() + File.separator + "cep" + File.separator + "storm";
        System.setProperty("storm.yaml", str + File.separator + "storm.yaml");
        stormConfig = Utils.readStormConfig();
        client = NimbusClient.getConfiguredClient(stormConfig).getClient();
        jarLocation = str + File.separator + EventProcessorValueHolder.getStormDeploymentConfig().getJar();
    }
}
