package org.apache.linkis.engineplugin.spark.client.deployment;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
import org.apache.logging.log4j.util.Strings;
import org.apache.spark.launcher.CustomSparkSubmitLauncher;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.class */
public class KubernetesApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter {
    private static final Logger logger = LoggerFactory.getLogger(KubernetesApplicationClusterDescriptorAdapter.class);
    protected SparkConfig sparkConfig;
    protected KubernetesClient client;
    protected String driverPodName;
    protected String namespace;

    public KubernetesApplicationClusterDescriptorAdapter(ExecutionContext executionContext) {
        super(executionContext);
        this.sparkConfig = executionContext.getSparkConfig();
        this.client = KubernetesHelper.getKubernetesClient(this.sparkConfig.getK8sConfigFile(), this.sparkConfig.getK8sMasterUrl(), this.sparkConfig.getK8sUsername(), this.sparkConfig.getK8sPassword());
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public void deployCluster(String str, String str2, Map<String, String> map) throws IOException {
        SparkConfig sparkConfig = this.executionContext.getSparkConfig();
        this.sparkLauncher = new CustomSparkSubmitLauncher();
        this.sparkLauncher.setJavaHome(sparkConfig.getJavaHome()).setSparkHome(sparkConfig.getSparkHome()).setMaster(sparkConfig.getK8sMasterUrl()).setDeployMode("cluster").setAppName(sparkConfig.getAppName()).setVerbose(true);
        this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
        this.namespace = sparkConfig.getK8sNamespace();
        setConf(this.sparkLauncher, "spark.app.name", sparkConfig.getAppName());
        setConf(this.sparkLauncher, "spark.ui.port", sparkConfig.getK8sSparkUIPort());
        setConf(this.sparkLauncher, "spark.kubernetes.namespace", this.namespace);
        setConf(this.sparkLauncher, "spark.kubernetes.container.image", sparkConfig.getK8sImage());
        setConf(this.sparkLauncher, "spark.kubernetes.driver.pod.name", this.driverPodName);
        setConf(this.sparkLauncher, "spark.kubernetes.driver.request.cores", sparkConfig.getK8sDriverRequestCores());
        setConf(this.sparkLauncher, "spark.kubernetes.executor.request.cores", sparkConfig.getK8sExecutorRequestCores());
        setConf(this.sparkLauncher, "spark.kubernetes.container.image.pullPolicy", sparkConfig.getK8sImagePullPolicy());
        setConf(this.sparkLauncher, "spark.kubernetes.authenticate.driver.serviceAccountName", sparkConfig.getK8sServiceAccount());
        if (map != null) {
            map.forEach((str3, str4) -> {
                this.sparkLauncher.setConf(str3, str4);
            });
        }
        addSparkArg(this.sparkLauncher, "--jars", sparkConfig.getJars());
        addSparkArg(this.sparkLauncher, "--packages", sparkConfig.getPackages());
        addSparkArg(this.sparkLauncher, "--exclude-packages", sparkConfig.getExcludePackages());
        addSparkArg(this.sparkLauncher, "--repositories", sparkConfig.getRepositories());
        addSparkArg(this.sparkLauncher, "--files", sparkConfig.getFiles());
        addSparkArg(this.sparkLauncher, "--archives", sparkConfig.getArchives());
        addSparkArg(this.sparkLauncher, "--driver-memory", sparkConfig.getDriverMemory());
        addSparkArg(this.sparkLauncher, "--driver-java-options", sparkConfig.getDriverJavaOptions());
        addSparkArg(this.sparkLauncher, "--driver-library-path", sparkConfig.getDriverLibraryPath());
        addSparkArg(this.sparkLauncher, "--driver-class-path", sparkConfig.getDriverClassPath());
        addSparkArg(this.sparkLauncher, "--executor-memory", sparkConfig.getExecutorMemory());
        addSparkArg(this.sparkLauncher, "--proxy-user", sparkConfig.getProxyUser());
        addSparkArg(this.sparkLauncher, "--driver-cores", sparkConfig.getDriverCores().toString());
        addSparkArg(this.sparkLauncher, "--total-executor-cores", sparkConfig.getTotalExecutorCores());
        addSparkArg(this.sparkLauncher, "--executor-cores", sparkConfig.getExecutorCores().toString());
        addSparkArg(this.sparkLauncher, "--num-executors", sparkConfig.getNumExecutors().toString());
        addSparkArg(this.sparkLauncher, "--principal", sparkConfig.getPrincipal());
        addSparkArg(this.sparkLauncher, "--keytab", sparkConfig.getKeytab());
        addSparkArg(this.sparkLauncher, "--py-files", sparkConfig.getPyFiles());
        this.sparkLauncher.setAppResource(sparkConfig.getAppResource());
        this.sparkLauncher.setMainClass(str);
        Arrays.stream(str2.split("\\s+")).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).forEach(str5 -> {
            this.sparkLauncher.addAppArgs(new String[]{str5});
        });
        this.sparkAppHandle = this.sparkLauncher.startApplication(new SparkAppHandle.Listener[]{new SparkAppHandle.Listener() { // from class: org.apache.linkis.engineplugin.spark.client.deployment.KubernetesApplicationClusterDescriptorAdapter.1
            public void stateChanged(SparkAppHandle sparkAppHandle) {
            }

            public void infoChanged(SparkAppHandle sparkAppHandle) {
            }
        }});
        this.sparkLauncher.setSparkAppHandle(this.sparkAppHandle);
    }

    private void addSparkArg(SparkLauncher sparkLauncher, String str, String str2) {
        if (StringUtils.isNotBlank(str) && StringUtils.isNotBlank(str2)) {
            sparkLauncher.addSparkArg(str, str2);
        }
    }

    private void setConf(SparkLauncher sparkLauncher, String str, String str2) {
        if (StringUtils.isNotBlank(str) && StringUtils.isNotBlank(str2)) {
            sparkLauncher.setConf(str, str2);
        }
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public boolean initJobId() {
        Pod sparkDriverPod = getSparkDriverPod();
        if (null == sparkDriverPod) {
            return false;
        }
        String phase = sparkDriverPod.getStatus().getPhase();
        String str = (String) sparkDriverPod.getMetadata().getLabels().get("spark-app-selector");
        if (Strings.isNotBlank(str)) {
            this.applicationId = str;
        }
        if (Strings.isNotBlank(phase)) {
            this.jobState = kubernetesPodStateConvertSparkState(phase);
        }
        return null != getApplicationId() || (this.jobState != null && this.jobState.isFinal());
    }

    protected Pod getSparkDriverPod() {
        return (Pod) ((PodResource) ((NonNamespaceOperation) this.client.pods().inNamespace(this.namespace)).withName(this.driverPodName)).get();
    }

    public String getSparkUIUrl() {
        Pod sparkDriverPod = getSparkDriverPod();
        if (null == sparkDriverPod) {
            logger.info("spark driver pod is not exist");
            return "";
        }
        String podIP = sparkDriverPod.getStatus().getPodIP();
        if (StringUtils.isNotBlank(podIP)) {
            return podIP + ":" + this.sparkConfig.getK8sSparkUIPort();
        }
        logger.info("spark driver pod IP is null, the application may be pending");
        return "";
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public SparkAppHandle.State getJobState() {
        Pod sparkDriverPod = getSparkDriverPod();
        if (null == sparkDriverPod) {
            return null;
        }
        this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPod.getStatus().getPhase());
        logger.info("Job {} state is {}.", getApplicationId(), this.jobState);
        return this.jobState;
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("Start to close job {}.", getApplicationId());
        this.client.close();
        if (isDisposed()) {
            logger.info("Job has finished, close action return.");
            return;
        }
        PodResource podResource = (PodResource) ((NonNamespaceOperation) this.client.pods().inNamespace(this.namespace)).withName(this.driverPodName);
        if (null != podResource.get()) {
            podResource.delete();
        }
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public boolean isDisposed() {
        return this.jobState.isFinal();
    }

    public SparkAppHandle.State kubernetesPodStateConvertSparkState(String str) {
        if (StringUtils.isBlank(str)) {
            return SparkAppHandle.State.UNKNOWN;
        }
        String upperCase = str.toUpperCase();
        boolean z = -1;
        switch (upperCase.hashCode()) {
            case -2026200673:
                if (upperCase.equals("RUNNING")) {
                    z = true;
                    break;
                }
                break;
            case -562638271:
                if (upperCase.equals("SUCCEEDED")) {
                    z = 2;
                    break;
                }
                break;
            case 35394935:
                if (upperCase.equals("PENDING")) {
                    z = false;
                    break;
                }
                break;
            case 2066319421:
                if (upperCase.equals("FAILED")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return SparkAppHandle.State.CONNECTED;
            case true:
                return SparkAppHandle.State.RUNNING;
            case true:
                return SparkAppHandle.State.FINISHED;
            case true:
                return SparkAppHandle.State.FAILED;
            default:
                return SparkAppHandle.State.UNKNOWN;
        }
    }

    public String generateDriverPodName(String str) {
        return str + "-" + UUID.randomUUID().toString().replace("-", "") + "-driver";
    }
}
