/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.spark.client.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.RestartPolicy;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplication;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationList;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationSpec;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkApplicationStatus;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.SparkPodSpec;
import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
import org.apache.spark.launcher.SparkAppHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescriptorAdapter {
    private static final Logger logger = LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
    protected SparkConfig sparkConfig;
    protected KubernetesClient client;

    public KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) {
        super(executionContext);
        this.sparkConfig = executionContext.getSparkConfig();
        this.client = KubernetesHelper.getKubernetesClient(this.sparkConfig.getK8sConfigFile(), this.sparkConfig.getK8sMasterUrl(), this.sparkConfig.getK8sUsername(), this.sparkConfig.getK8sPassword());
    }

    @Override
    public void deployCluster(String mainClass, String args, Map<String, String> confMap) {
        SparkApplicationStatus status;
        logger.info("The spark k8s operator task start\uff0ck8sNamespace: {},appName: {}", (Object)this.sparkConfig.getK8sNamespace(), (Object)this.sparkConfig.getAppName());
        CustomResourceDefinitionList crds = (CustomResourceDefinitionList)this.client.apiextensions().v1().customResourceDefinitions().list();
        String sparkApplicationCRDName = CustomResource.getCRDName(SparkApplication.class);
        List sparkCRDList = crds.getItems().stream().filter(crd -> crd.getMetadata().getName().equals(sparkApplicationCRDName)).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(sparkCRDList)) {
            throw new RuntimeException("The Spark operator crd does not exist");
        }
        MixedOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>> sparkApplicationClient = KubernetesOperatorClusterDescriptorAdapter.getSparkApplicationClient(this.client);
        SparkApplication sparkApplication = KubernetesOperatorClusterDescriptorAdapter.getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace());
        SparkPodSpec driver = SparkPodSpec.Builder().cores(this.sparkConfig.getDriverCores()).memory(this.sparkConfig.getDriverMemory()).serviceAccount(this.sparkConfig.getK8sServiceAccount()).build();
        SparkPodSpec executor = SparkPodSpec.Builder().cores(this.sparkConfig.getExecutorCores()).instances(this.sparkConfig.getNumExecutors()).memory(this.sparkConfig.getExecutorMemory()).build();
        HashMap<String, String> sparkConfMap = new HashMap<String, String>();
        sparkConfMap.put(SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(), this.sparkConfig.getK8sFileUploadPath());
        SparkApplicationSpec sparkApplicationSpec = SparkApplicationSpec.Builder().type(this.sparkConfig.getK8sLanguageType()).mode("cluster").image(this.sparkConfig.getK8sImage()).imagePullPolicy(this.sparkConfig.getK8sImagePullPolicy()).mainClass(mainClass).mainApplicationFile(this.sparkConfig.getAppResource()).sparkVersion(this.sparkConfig.getK8sSparkVersion()).restartPolicy(new RestartPolicy(this.sparkConfig.getK8sRestartPolicy())).driver(driver).executor(executor).sparkConf(sparkConfMap).build();
        logger.info("Spark k8s operator task parameters: {}", (Object)sparkApplicationSpec);
        sparkApplication.setSpec(sparkApplicationSpec);
        SparkApplication created = (SparkApplication)((Object)sparkApplicationClient.createOrReplace((Object[])new SparkApplication[]{sparkApplication}));
        logger.info("Preparing to submit the Spark k8s operator Task: {}", (Object)created);
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        SparkApplicationList list = (SparkApplicationList)((Object)KubernetesOperatorClusterDescriptorAdapter.getSparkApplicationClient(this.client).list());
        List sparkApplicationList = list.getItems().stream().filter(crd -> crd.getMetadata().getName().equals(this.sparkConfig.getAppName())).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(sparkApplicationList) && Objects.nonNull(status = (SparkApplicationStatus)((SparkApplication)((Object)sparkApplicationList.get(0))).getStatus())) {
            logger.info("Spark k8s task: {},status: {}", (Object)this.sparkConfig.getAppName(), (Object)status.getApplicationState().getState());
        }
    }

    @Override
    public boolean initJobId() {
        try {
            this.getKubernetesOperatorState();
        }
        catch (Exception e) {
            try {
                Thread.sleep(5000L);
                this.getKubernetesOperatorState();
            }
            catch (InterruptedException interruptedException) {
                logger.error("Use k8s watch obtain the status failed");
            }
        }
        return null != this.getApplicationId() || this.jobState != null && this.jobState.isFinal();
    }

    private void getKubernetesOperatorState() {
        ((Resource)((NonNamespaceOperation)KubernetesOperatorClusterDescriptorAdapter.getSparkApplicationClient(this.client).inNamespace(this.sparkConfig.getK8sNamespace())).withName(this.sparkConfig.getAppName())).watch((Object)new Watcher<SparkApplication>(){

            public void eventReceived(Watcher.Action action, SparkApplication sparkApplication) {
                if (Objects.nonNull(sparkApplication.getStatus())) {
                    KubernetesOperatorClusterDescriptorAdapter.this.applicationId = ((SparkApplicationStatus)sparkApplication.getStatus()).getSparkApplicationId();
                    KubernetesOperatorClusterDescriptorAdapter.this.jobState = KubernetesOperatorClusterDescriptorAdapter.this.kubernetesOperatorStateConvertSparkState(((SparkApplicationStatus)sparkApplication.getStatus()).getApplicationState().getState());
                }
            }

            public void onClose(WatcherException e) {
                logger.error("Use k8s watch obtain the status failed", (Throwable)e);
                KubernetesOperatorClusterDescriptorAdapter.this.getKubernetesOperatorState();
            }
        });
    }

    public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) {
        if (StringUtils.isBlank((CharSequence)kubernetesState)) {
            return SparkAppHandle.State.UNKNOWN;
        }
        switch (kubernetesState) {
            case "PENDING": {
                return SparkAppHandle.State.CONNECTED;
            }
            case "RUNNING": {
                return SparkAppHandle.State.RUNNING;
            }
            case "COMPLETED": {
                return SparkAppHandle.State.FINISHED;
            }
            case "FAILED": {
                return SparkAppHandle.State.FAILED;
            }
        }
        return SparkAppHandle.State.UNKNOWN;
    }

    @Override
    public boolean isDisposed() {
        return this.jobState.isFinal();
    }

    @Override
    public String toString() {
        return "ClusterDescriptorAdapter{applicationId=" + this.getApplicationId() + '}';
    }

    @Override
    public void close() {
        logger.info("Start to close job {}.", (Object)this.getApplicationId());
        SparkApplication SparkApplication2 = KubernetesOperatorClusterDescriptorAdapter.getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace());
        KubernetesOperatorClusterDescriptorAdapter.getSparkApplicationClient(this.client).delete((Object[])new SparkApplication[]{SparkApplication2});
        this.client.close();
    }

    public static MixedOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>> getSparkApplicationClient(KubernetesClient client) {
        return client.customResources(SparkApplication.class, SparkApplicationList.class);
    }

    public static SparkApplication getSparkApplication(String sparkOperatorName, String namespace) {
        SparkApplication sparkApplication = new SparkApplication();
        ObjectMeta metadata = new ObjectMeta();
        metadata.setName(sparkOperatorName);
        metadata.setNamespace(namespace);
        sparkApplication.setMetadata(metadata);
        return sparkApplication;
    }
}

