package org.wso2.carbon.event.processor.core.internal.storm;

import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.generated.TopologySummary;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.w3c.dom.Document;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.ServerUnavailableException;
import org.wso2.carbon.event.processor.core.exception.StormDeploymentException;
import org.wso2.carbon.event.processor.core.exception.StormQueryConstructionException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormQueryPlanBuilder;
import org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
import org.wso2.carbon.event.processor.core.util.ExecutionPlanStatusHolder;
import org.wso2.carbon.utils.CarbonUtils;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/StormTopologyManager.class */
public class StormTopologyManager {
    private Map stormConfig;
    private String jarLocation;
    private static final Log log = LogFactory.getLog(StormTopologyManager.class);
    private final ConcurrentHashMap<String, TopologySubmitter> toDeployTopologies = new ConcurrentHashMap<>();
    private TopologyManagerThreadFactory topologyManagerThreadFactory = new TopologyManagerThreadFactory("Storm Deployment");
    private final int lockTimeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/StormTopologyManager$TopologyManagerThreadFactory.class */
    public class TopologyManagerThreadFactory implements ThreadFactory {
        final ThreadGroup group;
        final String namePrefix;
        final AtomicInteger poolNumber = new AtomicInteger(1);
        final AtomicInteger threadNumber = new AtomicInteger(1);

        public TopologyManagerThreadFactory(String str) {
            SecurityManager securityManager = System.getSecurityManager();
            this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "TopologyManager-" + str + "-pool-" + this.poolNumber.getAndIncrement() + "-thread-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/StormTopologyManager$TopologySubmitter.class */
    public class TopologySubmitter implements Runnable {
        private final String topologyName;
        StormTopology topology;
        int retryInterval;

        public TopologySubmitter(String str, StormTopology stormTopology, int i, int i2) {
            this.topologyName = StormTopologyManager.getTopologyName(str, i);
            this.topology = stormTopology;
            this.retryInterval = i2;
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = "TopologySubmitterJob:" + Thread.currentThread().getId() + EventProcessorConstants.COMMA;
            StormTopologyManager.log.info(str + "Job started to submit storm topology '" + this.topologyName + "'.");
            while (isToBeDeployed()) {
                try {
                    try {
                    } catch (ServerUnavailableException e) {
                        StormTopologyManager.log.error(str + e.getMessage(), e);
                        StormTopologyManager.log.info(str + "Retrying to submit topology '" + this.topologyName + "' in " + this.retryInterval + " ms");
                        Thread.sleep(this.retryInterval);
                    }
                } catch (InterruptedException e2) {
                }
                if (isTopologyExist()) {
                    updateExecutionPlanStatusInStorm(this.topologyName, DistributedModeConstants.TopologyState.CLEANING);
                    StormTopologyManager.log.info(str + "Killing already existing storm topology '" + this.topologyName + "' to re-submit");
                    KillOptions killOptions = new KillOptions();
                    killOptions.set_wait_secs(10);
                    try {
                        NimbusClient.getConfiguredClient(StormTopologyManager.this.stormConfig).getClient().killTopologyWithOpts(this.topologyName, killOptions);
                        waitForTopologyToBeRemoved(str);
                    } catch (NotAliveException e3) {
                        StormTopologyManager.log.info(str + "Topology '" + this.topologyName + "' is not alive to kill");
                    } catch (TException e4) {
                        StormTopologyManager.log.error(str + "Error connecting to storm when trying to kill topology '" + this.topologyName + EventProcessorConstants.SIDDHI_SINGLE_QUOTE, e4);
                        StormTopologyManager.log.info(str + "Retrying to kill topology '" + this.topologyName + "' in " + this.retryInterval + " ms");
                        try {
                            Thread.sleep(this.retryInterval);
                        } catch (InterruptedException e5) {
                        }
                    }
                } else {
                    updateExecutionPlanStatusInStorm(this.topologyName, DistributedModeConstants.TopologyState.DEPLOYING);
                    try {
                        String jSONString = JSONValue.toJSONString(StormTopologyManager.this.stormConfig);
                        synchronized (StormTopologyManager.this.toDeployTopologies) {
                            if (!isToBeDeployed()) {
                                StormTopologyManager.log.info(str + "Aborting Storm deployment of '" + this.topologyName + "', as current job is outdated.");
                                return;
                            }
                            String submitJar = StormSubmitter.submitJar(StormTopologyManager.this.stormConfig, StormTopologyManager.this.jarLocation);
                            Nimbus.Client client = NimbusClient.getConfiguredClient(StormTopologyManager.this.stormConfig).getClient();
                            client.submitTopology(this.topologyName, submitJar, jSONString, this.topology);
                            StormTopologyManager.this.toDeployTopologies.remove(this.topologyName);
                            StormTopologyManager.log.info(str + "Successfully submitted storm topology '" + this.topologyName + EventProcessorConstants.SIDDHI_SINGLE_QUOTE);
                            waitForTopologyToBeActive(client, str, this.topologyName);
                            return;
                        }
                    } catch (AlreadyAliveException e6) {
                        StormTopologyManager.log.warn(str + "Topology '" + this.topologyName + "' already existing. Trying to kill and re-submit", e6);
                    } catch (TException e7) {
                        StormTopologyManager.log.error(str + "Error connecting to storm when trying to submit topology '" + this.topologyName + EventProcessorConstants.SIDDHI_SINGLE_QUOTE, e7);
                        StormTopologyManager.log.info(str + "Retrying to submit topology '" + this.topologyName + "' in " + this.retryInterval + " ms");
                        try {
                            Thread.sleep(this.retryInterval);
                        } catch (InterruptedException e8) {
                        }
                    } catch (InvalidTopologyException e9) {
                        StormTopologyManager.log.error(str + "Cannot deploy, Invalid Storm topology '" + this.topologyName + "' found.", e9);
                        return;
                    }
                }
                StormTopologyManager.log.error(str + e.getMessage(), e);
                StormTopologyManager.log.info(str + "Retrying to submit topology '" + this.topologyName + "' in " + this.retryInterval + " ms");
                Thread.sleep(this.retryInterval);
            }
            StormTopologyManager.log.info(str + "Aborting Storm deployment of '" + this.topologyName + "', as current job is outdated.");
        }

        private boolean isToBeDeployed() {
            boolean z;
            synchronized (StormTopologyManager.this.toDeployTopologies) {
                TopologySubmitter topologySubmitter = (TopologySubmitter) StormTopologyManager.this.toDeployTopologies.get(this.topologyName);
                z = topologySubmitter != null && topologySubmitter.equals(this);
            }
            return z;
        }

        private boolean isTopologyExist() throws ServerUnavailableException {
            try {
                Iterator it = NimbusClient.getConfiguredClient(StormTopologyManager.this.stormConfig).getClient().getClusterInfo().get_topologies().iterator();
                while (it.hasNext()) {
                    if (((TopologySummary) it.next()).get_name().equals(this.topologyName)) {
                        return true;
                    }
                }
                return false;
            } catch (TException e) {
                throw new ServerUnavailableException("Error connecting to storm when trying to check whether topology '" + this.topologyName + "' exist", e);
            } catch (RuntimeException e2) {
                throw new ServerUnavailableException("Runtime Exception connecting to storm when trying to check whether topology '" + this.topologyName + "' exist", e2);
            }
        }

        private void waitForTopologyToBeActive(Nimbus.Client client, String str, String str2) throws TException {
            TopologySummary topologySummary = null;
            while (topologySummary == null) {
                for (TopologySummary topologySummary2 : client.getClusterInfo().get_topologies()) {
                    if (topologySummary2.get_name().equals(str2)) {
                        topologySummary = topologySummary2;
                    }
                }
                if (topologySummary == null) {
                    try {
                        Thread.sleep(2000L);
                        StormTopologyManager.log.info(str + "Waiting until '" + str2 + "' has been submitted to Storm cluster");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        updateExecutionPlanStatusInStorm(str2, DistributedModeConstants.TopologyState.UNKNOWN);
                        StormTopologyManager.log.error("Could not verify whether " + str2 + "' has been submitted to Storm cluster or not as the verifier got interrupted. Setting distributed deployment status as UNKNOWN");
                        return;
                    }
                }
            }
            while (!topologySummary.get_status().equals(TopologyInitialStatus.ACTIVE.toString())) {
                try {
                    StormTopologyManager.log.info(str + "Waiting until '" + str2 + "' becomes active in Storm cluster");
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
            updateExecutionPlanStatusInStorm(str2, DistributedModeConstants.TopologyState.ACTIVE);
            StormTopologyManager.log.info(str + "Topology '" + str2 + "' found to be active in Storm cluster");
        }

        private void waitForTopologyToBeRemoved(String str) throws TException, ServerUnavailableException {
            StormTopologyManager.log.info(str + "Waiting for topology '" + this.topologyName + "' to be removed from Storm cluster");
            while (isTopologyExist()) {
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    return;
                }
            }
            Thread.sleep(2000L);
            StormTopologyManager.log.info(str + "Topology '" + this.topologyName + "' removed from Storm cluster");
        }

        /* JADX WARN: Finally extract failed */
        private void updateExecutionPlanStatusInStorm(String str, DistributedModeConstants.TopologyState topologyState) {
            String str2 = "org.wso2.cep.org.wso2.carbon.event.processor.core.storm.status.execution.plan.ui." + str;
            HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
            if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
                StormTopologyManager.log.error("Couldn't update topology status for topology:" + this.topologyName + " as the hazelcast instance is not active or not available.");
                return;
            }
            IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
            try {
                if (map.tryLock(str2, StormTopologyManager.this.lockTimeout, TimeUnit.MILLISECONDS)) {
                    try {
                        ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(str);
                        if (executionPlanStatusHolder == null) {
                            StormTopologyManager.log.error("Couldn't update topology status for topology:" + this.topologyName + " as status object not initialized by manager.");
                        } else {
                            executionPlanStatusHolder.setStormTopologyStatus(topologyState);
                            map.replace(str, executionPlanStatusHolder);
                        }
                        map.unlock(str2);
                    } catch (Throwable th) {
                        map.unlock(str2);
                        throw th;
                    }
                } else {
                    StormTopologyManager.log.error("Couldn't update topology status for topology:" + this.topologyName + " as the hazelcast lock acquisition failed.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                StormTopologyManager.log.error("Couldn't update topology status for topology:" + this.topologyName + " as the hazelcast lock acquisition was interrupted.", e);
            }
        }
    }

    public StormTopologyManager() {
        String str = CarbonUtils.getCarbonConfigDirPath() + File.separator + "cep" + File.separator + "storm";
        try {
            Map map = (Map) new Yaml().load(new FileInputStream(new File(str + File.separator + "storm.yaml")));
            if (map != null) {
                this.stormConfig = Utils.readDefaultConfig();
                this.stormConfig.putAll(map);
            } else {
                this.stormConfig = Utils.readStormConfig();
            }
        } catch (FileNotFoundException e) {
            log.warn("Error occurred while reading storm configurations using default configurations", e);
        }
        this.lockTimeout = EventProcessorValueHolder.getStormDeploymentConfiguration().getStatusLockTimeout();
        this.jarLocation = str + File.separator + EventProcessorValueHolder.getStormDeploymentConfiguration().getJar();
    }

    public List<TopologySummary> getTopologies() throws StormDeploymentException {
        try {
            return NimbusClient.getConfiguredClient(this.stormConfig).getClient().getClusterInfo().get_topologies();
        } catch (TException e) {
            throw new StormDeploymentException("Cannot get topologies from storm cluster", e);
        }
    }

    public void submitTopology(ExecutionPlanConfiguration executionPlanConfiguration, List<String> list, List<String> list2, int i, int i2) throws StormDeploymentException, ExecutionPlanConfigurationException {
        String name = executionPlanConfiguration.getName();
        String topologyName = getTopologyName(name, i);
        try {
            String stringQueryPlan = getStringQueryPlan(StormQueryPlanBuilder.constructStormQueryPlanXML(executionPlanConfiguration, list, list2));
            if (log.isDebugEnabled()) {
                log.debug("Following is the generated Storm query plan for execution plan: " + executionPlanConfiguration.getName() + EventProcessorConstants.SIDDHI_LINE_SEPARATER + stringQueryPlan);
            }
            TopologySubmitter topologySubmitter = new TopologySubmitter(name, StormTopologyConstructor.constructTopologyBuilder(stringQueryPlan, name, i, EventProcessorValueHolder.getStormDeploymentConfiguration()).createTopology(), i, i2);
            synchronized (this.toDeployTopologies) {
                this.toDeployTopologies.put(topologyName, topologySubmitter);
            }
            this.topologyManagerThreadFactory.newThread(topologySubmitter).start();
        } catch (TransformerException e) {
            throw new StormDeploymentException("Error while converting to storm query plan string. Execution plan: " + name + " Tenant: " + i, e);
        } catch (StormQueryConstructionException e2) {
            throw new StormDeploymentException("Error while converting to XML storm query plan. Execution plan: " + name + " Tenant: " + i + ". " + e2.getMessage(), e2);
        } catch (XMLStreamException e3) {
            throw new StormDeploymentException("Invalid Config for Execution Plan " + name + " for tenant " + i, e3);
        }
    }

    public void killTopology(String str, int i) throws StormDeploymentException {
        try {
            synchronized (this.toDeployTopologies) {
                this.toDeployTopologies.remove(getTopologyName(str, i));
            }
            log.info("Killing storm topology '" + str + "' of tenant '" + i + EventProcessorConstants.SIDDHI_SINGLE_QUOTE);
            NimbusClient.getConfiguredClient(this.stormConfig).getClient().killTopologyWithOpts(getTopologyName(str, i), new KillOptions());
        } catch (TException e) {
            throw new StormDeploymentException("Error connecting to Storm", e);
        } catch (NotAliveException e2) {
        }
    }

    private String getStringQueryPlan(Document document) throws TransformerException {
        Transformer newTransformer = TransformerFactory.newInstance().newTransformer();
        newTransformer.setOutputProperty("indent", "yes");
        StringWriter stringWriter = new StringWriter();
        newTransformer.transform(new DOMSource(document), new StreamResult(stringWriter));
        return stringWriter.toString();
    }

    public static String getTopologyName(String str, int i) {
        return str + "[" + i + "]";
    }
}
