package org.apache.linkis.engineplugin.spark.client.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.RestartPolicy;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplication;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationList;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationSpec;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationStatus;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkPodSpec;
import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
import org.apache.spark.launcher.SparkAppHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.class */
public class KubernetesOperatorClusterDescriptorAdapter extends ClusterDescriptorAdapter {
    private static final Logger logger = LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
    protected SparkConfig sparkConfig;
    protected KubernetesClient client;

    public KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) {
        super(executionContext);
        this.sparkConfig = executionContext.getSparkConfig();
        this.client = KubernetesHelper.getKubernetesClient(this.sparkConfig.getK8sConfigFile(), this.sparkConfig.getK8sMasterUrl(), this.sparkConfig.getK8sUsername(), this.sparkConfig.getK8sPassword());
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public void deployCluster(String str, String str2, Map<String, String> map) {
        logger.info("The spark k8s operator task start，k8sNamespace: {},appName: {}", this.sparkConfig.getK8sNamespace(), this.sparkConfig.getAppName());
        CustomResourceDefinitionList customResourceDefinitionList = (CustomResourceDefinitionList) this.client.apiextensions().v1().customResourceDefinitions().list();
        String cRDName = CustomResource.getCRDName(SparkApplication.class);
        if (CollectionUtils.isEmpty((List) customResourceDefinitionList.getItems().stream().filter(customResourceDefinition -> {
            return customResourceDefinition.getMetadata().getName().equals(cRDName);
        }).collect(Collectors.toList()))) {
            throw new RuntimeException("The Spark operator crd does not exist");
        }
        NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>> sparkApplicationClient = getSparkApplicationClient(this.client);
        SparkApplication sparkApplication = getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace());
        SparkPodSpec build = SparkPodSpec.Builder().cores(this.sparkConfig.getDriverCores()).memory(this.sparkConfig.getDriverMemory()).serviceAccount(this.sparkConfig.getK8sServiceAccount()).build();
        SparkPodSpec build2 = SparkPodSpec.Builder().cores(this.sparkConfig.getExecutorCores()).instances(this.sparkConfig.getNumExecutors()).memory(this.sparkConfig.getExecutorMemory()).build();
        HashMap hashMap = new HashMap();
        hashMap.put(SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(), this.sparkConfig.getK8sFileUploadPath());
        SparkApplicationSpec build3 = SparkApplicationSpec.Builder().type(this.sparkConfig.getK8sLanguageType()).mode("cluster").image(this.sparkConfig.getK8sImage()).imagePullPolicy(this.sparkConfig.getK8sImagePullPolicy()).mainClass(str).mainApplicationFile(this.sparkConfig.getAppResource()).sparkVersion(this.sparkConfig.getK8sSparkVersion()).restartPolicy(new RestartPolicy(this.sparkConfig.getK8sRestartPolicy())).driver(build).executor(build2).sparkConf(hashMap).build();
        logger.info("Spark k8s operator task parameters: {}", build3);
        sparkApplication.setSpec(build3);
        logger.info("Preparing to submit the Spark k8s operator Task: {}", (SparkApplication) sparkApplicationClient.createOrReplace(new SparkApplication[]{sparkApplication}));
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
        }
        List list = (List) ((SparkApplicationList) getSparkApplicationClient(this.client).list()).getItems().stream().filter(sparkApplication2 -> {
            return sparkApplication2.getMetadata().getName().equals(this.sparkConfig.getAppName());
        }).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(list)) {
            SparkApplicationStatus sparkApplicationStatus = (SparkApplicationStatus) ((SparkApplication) list.get(0)).getStatus();
            if (Objects.nonNull(sparkApplicationStatus)) {
                logger.info("Spark k8s task: {},status: {}", this.sparkConfig.getAppName(), sparkApplicationStatus.getApplicationState().getState());
            }
        }
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public boolean initJobId() {
        SparkApplicationStatus kubernetesOperatorState = getKubernetesOperatorState();
        if (Objects.nonNull(kubernetesOperatorState)) {
            this.applicationId = kubernetesOperatorState.getSparkApplicationId();
            this.jobState = kubernetesOperatorStateConvertSparkState(kubernetesOperatorState.getApplicationState().getState());
        }
        return null != getApplicationId() || (this.jobState != null && this.jobState.isFinal());
    }

    private SparkApplicationStatus getKubernetesOperatorState() {
        List<SparkApplication> items = ((SparkApplicationList) getSparkApplicationClient(this.client).list()).getItems();
        if (!CollectionUtils.isNotEmpty(items)) {
            return null;
        }
        for (SparkApplication sparkApplication : items) {
            if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace()) && sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) {
                return (SparkApplicationStatus) sparkApplication.getStatus();
            }
        }
        return null;
    }

    public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String str) {
        if (StringUtils.isBlank(str)) {
            return SparkAppHandle.State.UNKNOWN;
        }
        boolean z = -1;
        switch (str.hashCode()) {
            case -2026200673:
                if (str.equals("RUNNING")) {
                    z = true;
                    break;
                }
                break;
            case 35394935:
                if (str.equals("PENDING")) {
                    z = false;
                    break;
                }
                break;
            case 1383663147:
                if (str.equals("COMPLETED")) {
                    z = 2;
                    break;
                }
                break;
            case 2066319421:
                if (str.equals("FAILED")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return SparkAppHandle.State.CONNECTED;
            case true:
                return SparkAppHandle.State.RUNNING;
            case true:
                return SparkAppHandle.State.FINISHED;
            case true:
                return SparkAppHandle.State.FAILED;
            default:
                return SparkAppHandle.State.UNKNOWN;
        }
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public boolean isDisposed() {
        return this.jobState.isFinal();
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
    public String toString() {
        return "ClusterDescriptorAdapter{applicationId=" + getApplicationId() + '}';
    }

    @Override // org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("Start to close job {}.", getApplicationId());
        getSparkApplicationClient(this.client).delete(new SparkApplication[]{getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace())});
        this.client.close();
    }

    public static NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>> getSparkApplicationClient(KubernetesClient kubernetesClient) {
        return kubernetesClient.customResources(SparkApplication.class, SparkApplicationList.class);
    }

    public static SparkApplication getSparkApplication(String str, String str2) {
        SparkApplication sparkApplication = new SparkApplication();
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setName(str);
        objectMeta.setNamespace(str2);
        sparkApplication.setMetadata(objectMeta);
        return sparkApplication;
    }
}
