package org.wso2.carbon.analytics.spark.core.internal;

import com.hazelcast.core.IMap;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.deploy.master.LeaderElectable;
import org.apache.spark.deploy.master.Master;
import org.apache.spark.deploy.worker.Worker;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.jdbc.carbon.AnalyticsJDBCRelationProvider;
import org.apache.spark.util.Utils;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsServiceHolder;
import org.wso2.carbon.analytics.dataservice.core.clustering.AnalyticsClusterException;
import org.wso2.carbon.analytics.dataservice.core.clustering.AnalyticsClusterManager;
import org.wso2.carbon.analytics.dataservice.core.clustering.GroupEventListener;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.datasource.core.util.GenericUtils;
import org.wso2.carbon.analytics.spark.core.AnalyticsExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.AnalyticsRecoveryModeFactory;
import org.wso2.carbon.analytics.spark.core.deploy.CheckElectedLeaderExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.ElectLeaderExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.InitClientExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.StartWorkerExecutionCall;
import org.wso2.carbon.analytics.spark.core.exception.AnalyticsExecutionException;
import org.wso2.carbon.analytics.spark.core.exception.AnalyticsUDFException;
import org.wso2.carbon.analytics.spark.core.sources.AnalyticsRelationProvider;
import org.wso2.carbon.analytics.spark.core.sources.CompressedEventAnalyticsRelationProvider;
import org.wso2.carbon.analytics.spark.core.udf.AnalyticsUDFsRegister;
import org.wso2.carbon.analytics.spark.core.udf.CarbonUDF;
import org.wso2.carbon.analytics.spark.core.udf.config.UDFConfiguration;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsCommonUtils;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsConstants;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsQueryResult;
import org.wso2.carbon.analytics.spark.core.util.SparkTableNamesHolder;
import org.wso2.carbon.analytics.spark.utils.ComputeClasspath;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.utils.CarbonUtils;
import scala.None$;
import scala.Tuple2;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/core/internal/SparkAnalyticsExecutor.class */
public class SparkAnalyticsExecutor implements GroupEventListener {
    private static final String CLUSTER_GROUP_NAME = "CARBON_ANALYTICS_EXECUTION";
    private static final String DEFAULT_SPARK_APP_NAME = "DefaultCarbonAnalyticsApp";
    private static final Log log = LogFactory.getLog(SparkAnalyticsExecutor.class);
    private String sparkMaster;
    private SparkConf sparkConf;
    private SQLContext sqlCtx;
    private String myHost;
    private int portOffset;
    private SparkTableNamesHolder sparkTableNamesHolder;
    private int redundantMasterCount;
    private static final int MAX_RETRIES = 30;
    private static final long MAX_RETRY_WAIT_INTERVAL = 60000;
    private ClusterMode clusterMode;
    private int workerCount = 1;
    private Set<LeaderElectable> leaderElectable = new HashSet();
    private boolean masterActive = false;
    private boolean workerActive = false;
    private boolean clientActive = false;
    private boolean electedLeader = false;
    private Map<String, String> shorthandStringsMap = new HashMap();
    private UDFConfiguration udfConfiguration = loadUDFConfiguration();
    private AnalyticsClusterManager acm = AnalyticsServiceHolder.getAnalyticsClusterManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/analytics/spark/core/internal/SparkAnalyticsExecutor$ClusterMode.class */
    public enum ClusterMode {
        local("Local"),
        carbonSpark("Carbon Spark"),
        standaloneSpark("Standalone Spark"),
        yarn("Spark on YARN"),
        mesos("Spark on Mesos");

        private String name;

        ClusterMode(String str) {
            this.name = str;
        }

        private String getValue() {
            return this.name;
        }

        @Override // java.lang.Enum
        public String toString() {
            return getValue();
        }
    }

    public SparkAnalyticsExecutor(String str, int i) throws AnalyticsException {
        this.portOffset = 0;
        this.redundantMasterCount = 1;
        this.myHost = str;
        this.portOffset = i;
        String str2 = GenericUtils.getAnalyticsConfDirectory() + File.separator + "analytics" + File.separator + AnalyticsConstants.SPARK_CONF_DIR + File.separator + AnalyticsConstants.SPARK_DEFAULTS_FILE;
        if (!new File(str2).exists()) {
            throw new AnalyticsExecutionException("spark-defaults.conf file does not exists in path " + str2);
        }
        this.sparkConf = initializeSparkConf(this.portOffset, str2);
        this.sparkMaster = getStringFromSparkConf(AnalyticsConstants.CARBON_SPARK_MASTER, "local");
        this.clusterMode = getClusterMode(this.sparkMaster);
        this.redundantMasterCount = this.sparkConf.getInt(AnalyticsConstants.CARBON_SPARK_MASTER_COUNT, 1);
        this.sparkTableNamesHolder = new SparkTableNamesHolder(this.acm.isClusteringEnabled());
        registerShorthandStrings();
    }

    public void initializeSparkServer() throws AnalyticsException {
        this.sparkConf.setMaster(this.sparkMaster);
        switch (this.clusterMode) {
            case local:
            case standaloneSpark:
            case yarn:
            case mesos:
                log.info("Starting SPARK in the Client " + this.clusterMode.toString() + ". Master : " + this.sparkMaster);
                if (this.acm.isClusteringEnabled()) {
                    this.acm.joinGroup(CLUSTER_GROUP_NAME, this);
                }
                initializeAnalyticsClient();
                return;
            case carbonSpark:
                log.info("Starting SPARK in the Carbon Clustering mode");
                this.redundantMasterCount = this.sparkConf.getInt(AnalyticsConstants.CARBON_SPARK_MASTER_COUNT, 2);
                if (this.sparkTableNamesHolder == null) {
                    this.sparkTableNamesHolder = new SparkTableNamesHolder(true);
                }
                if (!this.acm.isClusteringEnabled()) {
                    throw new AnalyticsClusterException("Spark started in the cluster mode without enabling Carbon Clustering");
                }
                runClusteredSetupLogic();
                return;
            default:
                return;
        }
    }

    private void runClusteredSetupLogic() throws AnalyticsException {
        String str = "spark://" + this.myHost + ":" + this.sparkConf.get(AnalyticsConstants.SPARK_MASTER_PORT);
        logDebug("Spark master URL for this node : " + str);
        IMap map = AnalyticsServiceHolder.getHazelcastInstance().getMap(AnalyticsConstants.SPARK_MASTER_MAP);
        Object localMember = this.acm.getLocalMember();
        log.info("Local member : " + localMember);
        Set keySet = map.keySet();
        logDebug("Master URLs : " + Arrays.toString(keySet.toArray()));
        log.info("Current Spark Master map size : " + map.size());
        if (keySet.contains(str) || map.size() < this.redundantMasterCount) {
            log.info("Masters available are less than the redundant master count or This is/ has been a member of the MasterMap");
            if (!keySet.contains(str)) {
                log.info("Adding member to the Spark Master map : " + localMember);
                map.put(str, localMember);
            }
            log.info("Starting SPARK MASTER...");
            startMaster();
        }
        this.acm.joinGroup(CLUSTER_GROUP_NAME, this);
        log.info("Member joined the Carbon Analytics Execution cluster : " + localMember);
        processLeaderElectable();
        log.info("Spark Master map size after starting masters : " + map.size());
        if (map.size() >= this.redundantMasterCount) {
            log.info("Redundant master count reached. Starting workers in all members...");
            this.acm.executeAll(CLUSTER_GROUP_NAME, new StartWorkerExecutionCall());
            log.info("Redundant master count reached. Starting Spark client app in the carbon cluster master...");
            initializeAnalyticsClient();
        }
    }

    private String[] getSparkMastersFromCluster() {
        Set keySet = AnalyticsServiceHolder.getHazelcastInstance().getMap(AnalyticsConstants.SPARK_MASTER_MAP).keySet();
        return (String[]) keySet.toArray(new String[keySet.size()]);
    }

    private UDFConfiguration loadUDFConfiguration() throws AnalyticsException {
        try {
            File file = new File(GenericUtils.getAnalyticsConfDirectory() + File.separator + "analytics" + File.separator + AnalyticsConstants.SPARK_CONF_DIR + File.separator + AnalyticsConstants.SPARK_UDF_CONF_FILE);
            if (file.exists()) {
                return (UDFConfiguration) JAXBContext.newInstance(new Class[]{UDFConfiguration.class}).createUnmarshaller().unmarshal(file);
            }
            throw new AnalyticsUDFException("Cannot load UDFs, the UDF configuration file cannot be found at: " + file.getPath());
        } catch (JAXBException e) {
            throw new AnalyticsUDFException("Error in processing UDF configuration: " + e.getMessage(), e);
        }
    }

    private void initializeAnalyticsClient() throws AnalyticsException {
        if (this.acm.isClusteringEnabled()) {
            log.info("Sending a cluster message to the leader to initialize the Spark application");
            this.acm.executeOne(CLUSTER_GROUP_NAME, this.acm.getLeader(CLUSTER_GROUP_NAME), new InitClientExecutionCall());
        } else {
            log.info("Initializing the Spark application locally");
            initializeAnalyticsClientLocal();
        }
    }

    public synchronized void initializeAnalyticsClientLocal() throws AnalyticsException {
        if (!ServiceHolder.isAnalyticsSparkContextEnabled()) {
            logDebug("Analytics Spark Context is disabled in this node, therefore ignoring the client initiation.");
            return;
        }
        if (this.clientActive) {
            log.info("Client is already active in this node, therefore ignoring client init");
            return;
        }
        if (this.clusterMode == ClusterMode.carbonSpark) {
            updateMaster(this.sparkConf);
        }
        initializeSqlContext(initializeSparkContext(this.sparkConf));
        this.clientActive = true;
        log.info("Started Spark CLIENT in the cluster pointing to MASTER " + this.sparkConf.get(AnalyticsConstants.SPARK_MASTER) + " with the application name : " + this.sparkConf.get(AnalyticsConstants.SPARK_APP_NAME) + " and UI port : " + this.sparkConf.get(AnalyticsConstants.SPARK_UI_PORT));
    }

    private JavaSparkContext initializeSparkContext(SparkConf sparkConf) throws AnalyticsException {
        try {
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            ServiceHolder.setJavaSparkContext(javaSparkContext);
            return javaSparkContext;
        } catch (Throwable th) {
            throw new AnalyticsException("Unable to create analytics client. " + th.getMessage(), th);
        }
    }

    private void initializeSqlContext(JavaSparkContext javaSparkContext) throws AnalyticsUDFException {
        this.sqlCtx = new SQLContext(javaSparkContext);
        registerUDFs(this.sqlCtx);
    }

    public void registerUDFFromOSGIComponent(CarbonUDF carbonUDF) throws AnalyticsUDFException {
        if (this.sqlCtx == null) {
            ServiceHolder.addCarbonUDFs(carbonUDF);
            return;
        }
        AnalyticsUDFsRegister analyticsUDFsRegister = AnalyticsUDFsRegister.getInstance();
        Class<?> cls = carbonUDF.getClass();
        for (Method method : cls.getDeclaredMethods()) {
            if (Modifier.isPublic(method.getModifiers())) {
                analyticsUDFsRegister.registerUDF(cls, method, this.sqlCtx);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable, org.wso2.carbon.analytics.spark.core.exception.AnalyticsUDFException] */
    private void registerUDFs(SQLContext sQLContext) throws AnalyticsUDFException {
        ArrayList arrayList = new ArrayList();
        if (!this.udfConfiguration.getCustomUDFClass().isEmpty()) {
            arrayList.addAll(this.udfConfiguration.getCustomUDFClass());
        }
        if (!ServiceHolder.getCarbonUDFs().isEmpty()) {
            arrayList.addAll(ServiceHolder.getCarbonUDFs().keySet());
        }
        AnalyticsUDFsRegister analyticsUDFsRegister = AnalyticsUDFsRegister.getInstance();
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                String trim = ((String) it.next()).trim();
                if (!trim.isEmpty()) {
                    Class<?> cls = Class.forName(trim);
                    for (Method method : cls.getDeclaredMethods()) {
                        try {
                            if (Modifier.isPublic(method.getModifiers())) {
                                analyticsUDFsRegister.registerUDF(cls, method, sQLContext);
                            }
                        } catch (AnalyticsUDFException e) {
                            log.error("Error while registering the UDF method: " + method.getName() + ", " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (ClassNotFoundException e2) {
            throw new AnalyticsUDFException("Error While registering UDFs: " + e2.getMessage(), e2);
        }
    }

    private synchronized void startMaster() throws AnalyticsClusterException {
        if (this.masterActive) {
            logDebug("Master is already active in this node, therefore ignoring Master startup");
            return;
        }
        String str = this.myHost;
        int i = this.sparkConf.getInt(AnalyticsConstants.SPARK_MASTER_PORT, 7077 + this.portOffset);
        int i2 = this.sparkConf.getInt(AnalyticsConstants.SPARK_MASTER_WEBUI_PORT, 8081 + this.portOffset);
        Master.startSystemAndActor(str, i, i2, this.sparkConf);
        log.info("Started SPARK MASTER in spark://" + str + ":" + i + " with webUI port : " + i2);
        updateMaster(this.sparkConf);
        this.masterActive = true;
    }

    private void processLeaderElectable() throws AnalyticsClusterException {
        if (isElectedLeaderAvailable()) {
            return;
        }
        log.info("No elected leader is available. Hence electing this member as the leader");
        electAsLeader();
    }

    private boolean isElectedLeaderAvailable() throws AnalyticsClusterException {
        if (!this.acm.getMembers(CLUSTER_GROUP_NAME).isEmpty()) {
            return this.acm.executeAll(CLUSTER_GROUP_NAME, new CheckElectedLeaderExecutionCall()).contains(true);
        }
        log.info("Cluster is empty. Hence no elected leader available");
        return false;
    }

    public boolean isElectedLeader() {
        return this.electedLeader;
    }

    private void updateMaster(SparkConf sparkConf) {
        String[] sparkMastersFromCluster = getSparkMastersFromCluster();
        StringBuilder sb = new StringBuilder();
        sb.append("spark://");
        for (int i = 0; i < sparkMastersFromCluster.length; i++) {
            sb.append(sparkMastersFromCluster[i].replace("spark://", ""));
            if (i < sparkMastersFromCluster.length - 1) {
                sb.append(",");
            }
        }
        sparkConf.setMaster(sb.toString());
    }

    private void addSparkPropertiesPortOffset(SparkConf sparkConf, int i) {
        for (Tuple2 tuple2 : sparkConf.getAll()) {
            String trim = ((String) tuple2._1()).trim();
            if (trim.startsWith("spark.") && trim.endsWith(".port")) {
                sparkConf.set(trim, Integer.toString(Integer.parseInt((String) tuple2._2()) + i));
            }
        }
    }

    public synchronized void startWorker() {
        if (this.workerActive) {
            logDebug("Worker is already active in this node, therefore ignoring worker startup");
            return;
        }
        String str = this.myHost;
        int i = this.sparkConf.getInt(AnalyticsConstants.SPARK_WORKER_PORT, 10000 + this.portOffset);
        int i2 = this.sparkConf.getInt(AnalyticsConstants.SPARK_WORKER_WEBUI_PORT, 10500 + this.portOffset);
        int i3 = this.sparkConf.getInt(AnalyticsConstants.SPARK_WORKER_CORES, 1);
        String stringFromSparkConf = getStringFromSparkConf(AnalyticsConstants.SPARK_WORKER_MEMORY, "1g");
        String[] sparkMastersFromCluster = getSparkMastersFromCluster();
        Worker.startSystemAndActor(str, i, i2, i3, Utils.memoryStringToMb(stringFromSparkConf), sparkMastersFromCluster, getStringFromSparkConf(AnalyticsConstants.SPARK_WORKER_DIR, "work"), None$.MODULE$, this.sparkConf);
        log.info("Started SPARK WORKER in " + str + ":" + i + " with webUI port " + i2 + " with Masters " + Arrays.toString(sparkMastersFromCluster));
        this.workerActive = true;
    }

    private SparkConf initializeSparkConf(int i, String str) throws AnalyticsException {
        SparkConf sparkConf = new SparkConf(false);
        log.info("Loading Spark defaults from " + str);
        sparkConf.setAll(Utils.getPropertiesFromFile(str));
        setAdditionalConfigs(sparkConf);
        addSparkPropertiesPortOffset(sparkConf, i);
        return sparkConf;
    }

    private void setAdditionalConfigs(SparkConf sparkConf) throws AnalyticsException {
        String analyticsConfDirectory;
        String str = null;
        try {
            str = sparkConf.get(AnalyticsConstants.CARBON_DAS_SYMBOLIC_LINK);
            logDebug("CARBON HOME set with the symbolic link " + str);
        } catch (NoSuchElementException e) {
            try {
                str = CarbonUtils.getCarbonHome();
            } catch (Throwable th) {
                logDebug("CARBON HOME can not be found. Spark conf in non-carbon environment");
            }
        }
        logDebug("CARBON HOME used for Spark Conf : " + str);
        if (str != null) {
            analyticsConfDirectory = str + File.separator + "repository" + File.separator + "conf";
        } else {
            logDebug("CARBON HOME is NULL. Spark conf in non-carbon environment. Using the custom conf path");
            analyticsConfDirectory = GenericUtils.getAnalyticsConfDirectory();
        }
        String str2 = analyticsConfDirectory + File.separator + "analytics" + File.separator + AnalyticsConstants.SPARK_CONF_DIR;
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_APP_NAME, DEFAULT_SPARK_APP_NAME);
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_DRIVER_CORES, "1");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_DRIVER_MEMORY, "512m");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_EXECUTOR_MEMORY, "512m");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_UI_PORT, "4040");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_HISTORY_OPTS, "18080");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_SERIALIZER, KryoSerializer.class.getName());
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_KRYOSERIALIZER_BUFFER, "256k");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_KRYOSERIALIZER_BUFFER_MAX, "256m");
        sparkConf.setIfMissing("spark.blockManager.port", "12000");
        sparkConf.setIfMissing("spark.broadcast.port", "12500");
        sparkConf.setIfMissing("spark.driver.port", "13000");
        sparkConf.setIfMissing("spark.executor.port", "13500");
        sparkConf.setIfMissing("spark.fileserver.port", "14000");
        sparkConf.setIfMissing("spark.replClassServer.port", "14500");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_MASTER_PORT, "7077");
        sparkConf.setIfMissing("spark.master.rest.port", "6066");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_MASTER_WEBUI_PORT, "8081");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_WORKER_CORES, "1");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_WORKER_MEMORY, "1g");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_WORKER_DIR, "work");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_WORKER_PORT, "11000");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_WORKER_WEBUI_PORT, "11500");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_SCHEDULER_MODE, "FAIR");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_SCHEDULER_POOL, AnalyticsConstants.DEFAULT_CARBON_SCHEDULER_POOL_NAME);
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_SCHEDULER_ALLOCATION_FILE, str2 + File.separator + AnalyticsConstants.FAIR_SCHEDULER_XML);
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_RECOVERY_MODE, "CUSTOM");
        sparkConf.setIfMissing(AnalyticsConstants.SPARK_RECOVERY_MODE_FACTORY, AnalyticsRecoveryModeFactory.class.getName());
        String str3 = " -Dwso2_custom_conf_dir=" + analyticsConfDirectory + " -Dcarbon.home=" + str + " -DdisableLocalIndexQueue=true -DdisableIndexing=true -DdisableDataPurging=true -DdisableEventSink=true -Djavax.net.ssl.trustStore=" + System.getProperty("javax.net.ssl.trustStore") + " -Djavax.net.ssl.trustStorePassword=" + System.getProperty("javax.net.ssl.trustStorePassword") + " -DAgent.Config.Path=" + (str + File.separator + "repository" + File.separator + "conf" + File.separator + "data-bridge" + File.separator + "data-agent-config.xml") + getLog4jPropertiesJvmOpt(str2);
        sparkConf.set("spark.executor.extraJavaOptions", sparkConf.get("spark.executor.extraJavaOptions", "") + str3);
        sparkConf.set("spark.driver.extraJavaOptions", sparkConf.get("spark.driver.extraJavaOptions", "") + str3);
        sparkConf.setIfMissing("carbon.spark.results.limit", "1000");
        String property = System.getProperty("SPARK_CLASSPATH") == null ? "" : System.getProperty("SPARK_CLASSPATH");
        if (str != null) {
            try {
                ClusterMode clusterMode = getClusterMode(sparkConf.get(AnalyticsConstants.CARBON_SPARK_MASTER));
                property = (clusterMode == ClusterMode.local || clusterMode == ClusterMode.carbonSpark) ? ComputeClasspath.getSparkClasspath(property, str) : ComputeClasspath.getSparkClasspath(property, str, new String[]{"slf4j"});
            } catch (IOException e2) {
                throw new AnalyticsExecutionException("Unable to create the extra spark classpath" + e2.getMessage(), e2);
            }
        } else {
            logDebug("CARBON HOME is NULL. Spark conf in non-carbon environment");
        }
        try {
            sparkConf.set("spark.executor.extraClassPath", sparkConf.get("spark.executor.extraClassPath") + ";" + property);
        } catch (NoSuchElementException e3) {
            sparkConf.set("spark.executor.extraClassPath", property);
        }
        try {
            sparkConf.set("spark.driver.extraClassPath", sparkConf.get("spark.driver.extraClassPath") + ";" + property);
        } catch (NoSuchElementException e4) {
            sparkConf.set("spark.driver.extraClassPath", property);
        }
    }

    private String getLog4jPropertiesJvmOpt(String str) {
        File file = new File(str + File.separator + "log4j.properties");
        return file.exists() ? " -Dlog4j.configuration=file:" + File.separator + File.separator + file.getAbsolutePath() : "";
    }

    private String getStringFromSparkConf(String str, String str2) {
        try {
            return this.sparkConf.get(str);
        } catch (NoSuchElementException e) {
            return str2;
        }
    }

    public void stop() {
        if (this.sqlCtx != null) {
            this.sqlCtx.sparkContext().stop();
        }
    }

    public int getNumPartitionsHint() throws AnalyticsException {
        int workerCount = getWorkerCount();
        int i = workerCount * this.sparkConf.getInt(AnalyticsConstants.SPARK_WORKER_CORES, 1);
        if (workerCount == 0) {
            throw new AnalyticsException("Error while calculating NumPartitionsHint. Worker count is zero.");
        }
        if (log.isDebugEnabled()) {
            log.debug("Partition count: " + i);
        }
        return i;
    }

    public AnalyticsQueryResult executeQuery(int i, String str) throws AnalyticsExecutionException {
        AnalyticsClusterManager analyticsClusterManager = AnalyticsServiceHolder.getAnalyticsClusterManager();
        if (!analyticsClusterManager.isClusteringEnabled() || analyticsClusterManager.isLeader(CLUSTER_GROUP_NAME)) {
            return executeQueryLocal(i, str);
        }
        try {
            return (AnalyticsQueryResult) analyticsClusterManager.executeOne(CLUSTER_GROUP_NAME, analyticsClusterManager.getLeader(CLUSTER_GROUP_NAME), new AnalyticsExecutionCall(i, str));
        } catch (AnalyticsClusterException e) {
            throw new AnalyticsExecutionException("Error executing analytics query: " + e.getMessage(), e);
        }
    }

    private AnalyticsQueryResult executeQueryLocal(int i, String str) throws AnalyticsExecutionException {
        if (i != -1234 && isCarbonJDBCQuery(str)) {
            throw new RuntimeException("The CarbonJDBC relation provider is not available for tenants.");
        }
        if (AnalyticsDataServiceUtils.isCarbonServer()) {
            PrivilegedCarbonContext.startTenantFlow();
            PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(i);
        }
        String trim = str.trim();
        String trim2 = str.trim();
        if (trim2.endsWith(";")) {
            trim2 = trim2.substring(0, trim2.length() - 1).trim();
        }
        try {
            if (checkIncrementalQuery(trim2)) {
                AnalyticsQueryResult processIncQuery = processIncQuery(i, trim2);
                if (AnalyticsDataServiceUtils.isCarbonServer()) {
                    PrivilegedCarbonContext.endTenantFlow();
                }
                return processIncQuery;
            }
            String encodeQueryWithTenantId = encodeQueryWithTenantId(i, trim2);
            if (log.isDebugEnabled()) {
                log.debug("Executing : " + trim);
            }
            long currentTimeMillis = System.currentTimeMillis();
            try {
                if (this.sqlCtx == null) {
                    throw new AnalyticsExecutionException("Spark SQL Context is not available. Check if the cluster has instantiated properly.");
                }
                this.sqlCtx.sparkContext().setLocalProperty(AnalyticsConstants.SPARK_SCHEDULER_POOL, this.sparkConf.get(AnalyticsConstants.SPARK_SCHEDULER_POOL));
                AnalyticsQueryResult result = toResult(this.sqlCtx.sql(encodeQueryWithTenantId));
                long currentTimeMillis2 = System.currentTimeMillis();
                if (ServiceHolder.isAnalyticsStatsEnabled()) {
                    log.info("Executed query: " + trim + " \nTime Elapsed: " + ((currentTimeMillis2 - currentTimeMillis) / 1000.0d) + " seconds.");
                }
                if (AnalyticsDataServiceUtils.isCarbonServer()) {
                    PrivilegedCarbonContext.endTenantFlow();
                }
                return result;
            } catch (Throwable th) {
                long currentTimeMillis3 = System.currentTimeMillis();
                if (ServiceHolder.isAnalyticsStatsEnabled()) {
                    log.info("Executed query: " + trim + " \nTime Elapsed: " + ((currentTimeMillis3 - currentTimeMillis) / 1000.0d) + " seconds.");
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (AnalyticsDataServiceUtils.isCarbonServer()) {
                PrivilegedCarbonContext.endTenantFlow();
            }
            throw th2;
        }
    }

    private AnalyticsQueryResult processIncQuery(int i, String str) throws AnalyticsExecutionException {
        AnalyticsQueryResult processIncTableShow;
        String[] split = str.split("(\\s*,\\s*|\\s+)");
        String lowerCase = split[0].toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -1044051783:
                if (lowerCase.equals(AnalyticsConstants.INC_TABLE_RESET)) {
                    z = true;
                    break;
                }
                break;
            case 520542803:
                if (lowerCase.equals(AnalyticsConstants.INC_TABLE_SHOW)) {
                    z = 2;
                    break;
                }
                break;
            case 1573759757:
                if (lowerCase.equals(AnalyticsConstants.INC_TABLE_COMMIT)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                processIncTableShow = processIncTableCommit(i, (String[]) Arrays.copyOfRange(split, 1, split.length));
                break;
            case true:
                processIncTableShow = processIncTableReset(i, (String[]) Arrays.copyOfRange(split, 1, split.length));
                break;
            case true:
                processIncTableShow = processIncTableShow(i, (String[]) Arrays.copyOfRange(split, 1, split.length));
                break;
            default:
                throw new AnalyticsExecutionException("Invalid incremental query: " + str);
        }
        return processIncTableShow;
    }

    private AnalyticsQueryResult processIncTableShow(int i, String[] strArr) throws AnalyticsExecutionException {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            ArrayList arrayList2 = new ArrayList(3);
            arrayList2.add(str);
            try {
                arrayList2.add(Long.valueOf(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(i, str, false)));
                arrayList2.add(Long.valueOf(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(i, str, true)));
                arrayList.add(arrayList2);
            } catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
        }
        return new AnalyticsQueryResult(new String[]{"TABLE_ID", "TEMP_VAL", "PRIMARY_VAL"}, arrayList);
    }

    private AnalyticsQueryResult processIncTableReset(int i, String[] strArr) throws AnalyticsExecutionException {
        for (String str : strArr) {
            try {
                ServiceHolder.getIncrementalMetaStore().resetIncrementalTimestamps(i, str);
            } catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
        }
        return AnalyticsQueryResult.emptyAnalyticsQueryResult();
    }

    private AnalyticsQueryResult processIncTableCommit(int i, String[] strArr) throws AnalyticsExecutionException {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            ArrayList arrayList2 = new ArrayList(2);
            arrayList2.add(str);
            try {
                arrayList2.add(Long.valueOf(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(i, str, true)));
                long lastProcessedTimestamp = ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(i, str, false);
                ServiceHolder.getIncrementalMetaStore().setLastProcessedTimestamp(i, str, lastProcessedTimestamp, true);
                arrayList2.add(Long.valueOf(lastProcessedTimestamp));
                arrayList.add(arrayList2);
            } catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
        }
        return new AnalyticsQueryResult(new String[]{"TABLE_ID", "PREV_PRIMARY_VAL", "NEW_PRIMARY_VAL"}, arrayList);
    }

    private boolean checkIncrementalQuery(String str) {
        return str.trim().toLowerCase().startsWith(AnalyticsConstants.INC_TABLE);
    }

    private String encodeQueryWithTenantId(int i, String str) throws AnalyticsExecutionException {
        String replaceTableNamesInQuery;
        Matcher matcher = Pattern.compile("(?i)(?<=(create\\stemporary\\stable))\\s+\\w+(?=\\s+using\\s\\D+\\s+options\\s*\\()").matcher(str.trim());
        if (matcher.find()) {
            String trim = matcher.group().trim();
            if (trim.matches("(?i)if")) {
                throw new AnalyticsExecutionException("Malformed query: CREATE TEMPORARY TABLE IF NOT EXISTS is not supported");
            }
            synchronized (this.sparkTableNamesHolder) {
                this.sparkTableNamesHolder.addTableName(i, trim);
            }
            String replaceShorthandStrings = replaceShorthandStrings(str);
            int indexOf = replaceShorthandStrings.toLowerCase().indexOf(AnalyticsConstants.TERM_OPTIONS, matcher.end());
            int indexOf2 = replaceShorthandStrings.indexOf("(", indexOf);
            int indexOf3 = replaceShorthandStrings.indexOf(")", indexOf2);
            replaceTableNamesInQuery = replaceTableNamesInQuery(i, replaceShorthandStrings.substring(0, indexOf)) + (isCarbonQuery(str) ? replaceShorthandStrings.substring(indexOf, indexOf2 + 1) + addTenantIdToOptions(i, replaceShorthandStrings.substring(indexOf2 + 1, indexOf3)) + ")" : replaceShorthandStrings.substring(indexOf, indexOf3 + 1)) + replaceTableNamesInQuery(i, replaceShorthandStrings.substring(indexOf3 + 1, replaceShorthandStrings.length()));
        } else {
            replaceTableNamesInQuery = replaceTableNamesInQuery(i, str);
        }
        return replaceTableNamesInQuery.trim();
    }

    private boolean isCarbonQuery(String str) {
        return str.contains(AnalyticsConstants.SPARK_SHORTHAND_STRING) || str.contains(AnalyticsConstants.COMPRESSED_EVENT_ANALYTICS_SHORTHAND);
    }

    private boolean isCarbonJDBCQuery(String str) {
        return Pattern.compile("using\\s*CarbonJDBC").matcher(str.trim()).find();
    }

    private String replaceShorthandStrings(String str) {
        for (Map.Entry<String, String> entry : this.shorthandStringsMap.entrySet()) {
            str = str.replaceFirst("\\b" + entry.getKey() + "\\b", entry.getValue());
        }
        return str;
    }

    private void registerShorthandStrings() {
        addShorthandString(AnalyticsConstants.SPARK_SHORTHAND_STRING, AnalyticsRelationProvider.class.getName());
        addShorthandString(AnalyticsConstants.SPARK_JDBC_SHORTHAND_STRING, AnalyticsJDBCRelationProvider.class.getName());
        addShorthandString(AnalyticsConstants.COMPRESSED_EVENT_ANALYTICS_SHORTHAND, CompressedEventAnalyticsRelationProvider.class.getName());
    }

    private void addShorthandString(String str, String str2) {
        try {
            Class.forName(str2);
            this.shorthandStringsMap.put(str, str2);
        } catch (ClassNotFoundException e) {
            log.error(e);
        }
    }

    private String addTenantIdToOptions(int i, String str) throws AnalyticsExecutionException {
        boolean z = false;
        for (String str2 : str.split("\\s*,\\s*")) {
            String[] split = str2.trim().split("\\s+", 2);
            z = split[0].equals(AnalyticsConstants.TENANT_ID);
            if (z && i != Integer.parseInt(split[1].replaceAll("^\"|\"$", ""))) {
                throw new AnalyticsExecutionException("Mismatching tenants : " + i + " and " + split[1].replaceAll("^\"|\"$", ""));
            }
        }
        if (!z) {
            str = str + " , " + AnalyticsConstants.TENANT_ID + " \"" + i + "\"";
        }
        return str;
    }

    private String replaceTableNamesInQuery(int i, String str) {
        String str2;
        String str3 = str;
        synchronized (this.sparkTableNamesHolder) {
            for (String str4 : this.sparkTableNamesHolder.getTableNames(i)) {
                str3 = str3.replaceAll("\\b" + str4 + "\\b", AnalyticsCommonUtils.encodeTableNameWithTenantId(i, str4));
            }
            str2 = str3;
        }
        return str2;
    }

    private AnalyticsQueryResult toResult(DataFrame dataFrame) throws AnalyticsExecutionException {
        int i = this.sparkConf.getInt("carbon.spark.results.limit", -1);
        return i != -1 ? new AnalyticsQueryResult(dataFrame.schema().fieldNames(), convertRowsToObjects(dataFrame.limit(i).collect())) : new AnalyticsQueryResult(dataFrame.schema().fieldNames(), convertRowsToObjects(dataFrame.collect()));
    }

    private List<List<Object>> convertRowsToObjects(Row[] rowArr) {
        ArrayList arrayList = new ArrayList();
        for (Row row : rowArr) {
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < row.length(); i++) {
                arrayList2.add(row.get(i));
            }
            arrayList.add(arrayList2);
        }
        return arrayList;
    }

    public void onBecomingLeader() {
        log.info("This node is now the CARBON CLUSTERING LEADER");
        boolean z = !executeOnBecomingLeaderFlow();
        for (int i = 0; z && i < MAX_RETRIES; i++) {
            log.info("Retrying executing On Becoming Leader flow. Retry count = " + i);
            retryWait(Math.min(getWaitTimeExp(i), MAX_RETRY_WAIT_INTERVAL));
            z = !executeOnBecomingLeaderFlow();
        }
    }

    private boolean executeOnBecomingLeaderFlow() {
        if (log.isDebugEnabled()) {
            log.debug("Executing On Becoming Leader Flow : ");
        }
        try {
            if (this.clusterMode != ClusterMode.carbonSpark) {
                log.info("Analytics cluster leadership has changed. Hence, re-creating the Spark Client application");
                initializeAnalyticsClient();
                return true;
            }
            IMap map = AnalyticsServiceHolder.getHazelcastInstance().getMap(AnalyticsConstants.SPARK_MASTER_MAP);
            if (map.isEmpty()) {
                log.info("Spark master map is empty...");
                String str = "spark://" + this.myHost + ":" + this.sparkConf.getInt(AnalyticsConstants.SPARK_MASTER_PORT, 7077 + this.portOffset);
                map.put(str, this.acm.getLocalMember());
                log.info("Added " + str + " to the MasterMap");
            } else if (map.size() >= this.redundantMasterCount) {
                log.info("Redundant master count fulfilled : " + map.size());
                if (!isElectedLeaderAvailable()) {
                    log.info("No Elected SPARK LEADER in the cluster. Electing a suitable leader...");
                    try {
                        electSuitableLeader();
                    } catch (AnalyticsClusterException e) {
                        String str2 = "Unable to elect a suitable leader : " + e.getMessage();
                        log.error(str2, e);
                        throw new RuntimeException(str2, e);
                    }
                }
                log.info("Initializing new spark client app...");
                initializeAnalyticsClient();
            } else {
                log.info("Master map size is less than the redundant master count");
            }
            return true;
        } catch (Exception e2) {
            log.warn("Error in processing on becoming leader cluster message: " + e2.getMessage(), e2);
            return false;
        }
    }

    private void electSuitableLeader() throws AnalyticsClusterException {
        ArrayList arrayList = new ArrayList(AnalyticsServiceHolder.getHazelcastInstance().getMap(AnalyticsConstants.SPARK_MASTER_MAP).values());
        List members = this.acm.getMembers(CLUSTER_GROUP_NAME);
        boolean z = false;
        Iterator it = arrayList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Object next = it.next();
            if (members.contains(next)) {
                this.acm.executeOne(CLUSTER_GROUP_NAME, next, new ElectLeaderExecutionCall());
                z = true;
                log.info("Suitable leader elected : " + next);
                break;
            }
        }
        if (z) {
            return;
        }
        log.error("No Spark master is available in the cluster to be elected as the leader");
    }

    public synchronized void electAsLeader() {
        log.info("Elected as the Spark Leader");
        Iterator<LeaderElectable> it = this.leaderElectable.iterator();
        while (it.hasNext()) {
            it.next().electedLeader();
        }
        this.electedLeader = true;
    }

    public void onLeaderUpdate() {
    }

    private int getWorkerCount() {
        return this.workerCount;
    }

    public void onMembersChangeForLeader(boolean z) {
        log.info("Member change, remove: " + z);
        boolean z2 = !executeOnMembersChangeForLeaderFlow(z);
        for (int i = 0; z2 && i < MAX_RETRIES; i++) {
            log.info("Retrying executing On Member Change for Leader Flow. Retry count = " + i);
            retryWait(Math.min(getWaitTimeExp(i), MAX_RETRY_WAIT_INTERVAL));
            z2 = !executeOnMembersChangeForLeaderFlow(z);
        }
    }

    private boolean executeOnMembersChangeForLeaderFlow(boolean z) {
        logDebug("Execute On Members Change For Leader Flow");
        try {
            if (this.clusterMode != ClusterMode.carbonSpark) {
                return true;
            }
            this.workerCount = AnalyticsServiceHolder.getAnalyticsClusterManager().getMembers(CLUSTER_GROUP_NAME).size();
            log.info("Analytics worker updated, total count: " + getWorkerCount());
            if (!z) {
                return true;
            }
            if (isElectedLeaderAvailable()) {
                log.info("Elected leader already available.");
                return true;
            }
            log.info("Removed member was the Spark elected leader. Electing a suitable leader...");
            electSuitableLeader();
            return true;
        } catch (Exception e) {
            log.warn("Error while executing On Members Change For Leader Flow: " + e.getMessage(), e);
            return false;
        }
    }

    public void registerLeaderElectable(LeaderElectable leaderElectable) {
        this.leaderElectable.add(leaderElectable);
        log.info("Spark leader electable registered");
    }

    private void logDebug(String str) {
        if (log.isDebugEnabled()) {
            log.debug(str);
        }
    }

    public void onMemberRemoved() {
    }

    private long getWaitTimeExp(int i) {
        return ((long) Math.pow(2.0d, i)) * 100;
    }

    private void retryWait(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private ClusterMode getClusterMode(String str) throws AnalyticsExecutionException {
        if (str.toLowerCase().startsWith("local")) {
            if (!this.acm.isClusteringEnabled()) {
                return ClusterMode.local;
            }
            log.warn("Using 'local' with Carbon clustering is deprecated. Please use 'carbon.spark.master carbon' instead!");
            return ClusterMode.carbonSpark;
        }
        if (str.toLowerCase().startsWith("carbon")) {
            if (this.acm.isClusteringEnabled()) {
                return ClusterMode.carbonSpark;
            }
            throw new AnalyticsExecutionException("Using Carbon Clustering without enabling clustering in axis2. Please refer axis2 settings.");
        }
        if (str.toLowerCase().startsWith(AnalyticsConstants.SPARK_CONF_DIR)) {
            return ClusterMode.standaloneSpark;
        }
        if (str.toLowerCase().startsWith("yarn")) {
            if (str.equalsIgnoreCase("yarn-cluster")) {
                throw new AnalyticsExecutionException("\"yarn-cluster\" mode is not supported in DAS. Please use \"yarn-cluster\"!");
            }
            return ClusterMode.yarn;
        }
        if (str.toLowerCase().startsWith("mesos")) {
            return ClusterMode.mesos;
        }
        throw new AnalyticsExecutionException("Unknown cluster mode for Spark : " + str);
    }
}
