/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.api.k8s.impl;

import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.PodSpecFluent;
import io.fabric8.kubernetes.api.model.PodTemplateSpecFluent;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobFluent;
import io.fabric8.kubernetes.api.model.batch.v1.JobSpecFluent;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.slf4j.Logger;

public class K8sTaskExecutor
extends AbstractK8sTaskExecutor {
    private Job job;

    public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
        super(logger, taskRequest);
    }

    public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
        String taskInstanceId = String.valueOf(this.taskRequest.getTaskInstanceId());
        String taskName = this.taskRequest.getTaskName().toLowerCase(Locale.ROOT);
        String image = k8STaskMainParameters.getImage();
        String namespaceName = k8STaskMainParameters.getNamespaceName();
        Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
        Double podMem = k8STaskMainParameters.getMinMemorySpace();
        Double podCpu = k8STaskMainParameters.getMinCpuCores();
        Double limitPodMem = podMem * 2.0;
        Double limitPodCpu = podCpu * 2.0;
        int retryNum = 0;
        String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
        HashMap<String, Quantity> reqRes = new HashMap<String, Quantity>();
        reqRes.put("memory", new Quantity(String.format("%s%s", podMem, "Mi")));
        reqRes.put("cpu", new Quantity(String.valueOf(podCpu)));
        HashMap<String, Quantity> limitRes = new HashMap<String, Quantity>();
        limitRes.put("memory", new Quantity(String.format("%s%s", limitPodMem, "Mi")));
        limitRes.put("cpu", new Quantity(String.valueOf(limitPodCpu)));
        HashMap<String, String> labelMap = new HashMap<String, String>();
        labelMap.put("k8s.cn/layer", "batch");
        labelMap.put("k8s.cn/name", k8sJobName);
        EnvVar taskInstanceIdVar = new EnvVar("taskInstanceId", taskInstanceId, null);
        ArrayList<EnvVar> envVars = new ArrayList<EnvVar>();
        envVars.add(taskInstanceIdVar);
        if (MapUtils.isNotEmpty(otherParams)) {
            for (Map.Entry<String, String> entry : otherParams.entrySet()) {
                String param = entry.getKey();
                String paramValue = entry.getValue();
                EnvVar envVar = new EnvVar(param, paramValue, null);
                envVars.add(envVar);
            }
        }
        return ((JobBuilder)((JobFluent.SpecNested)((JobFluent.SpecNested)((JobSpecFluent.TemplateNested)((PodTemplateSpecFluent.SpecNested)((PodTemplateSpecFluent.SpecNested)((PodSpecFluent.ContainersNested)((PodSpecFluent.ContainersNested)((PodSpecFluent.ContainersNested)((PodSpecFluent.ContainersNested)((PodSpecFluent.ContainersNested)((JobFluent.SpecNested)((JobBuilder)((JobFluent.MetadataNested)((JobFluent.MetadataNested)((JobFluent.MetadataNested)((JobBuilder)new JobBuilder().withApiVersion("batch/v1")).withNewMetadata().withName(k8sJobName)).withLabels(labelMap)).withNamespace(namespaceName)).endMetadata()).withNewSpec().withTtlSecondsAfterFinished(Integer.valueOf(300))).withNewTemplate().withNewSpec().addNewContainer().withName(k8sJobName)).withImage(image)).withImagePullPolicy("Always")).withResources(new ResourceRequirements(limitRes, reqRes))).withEnv(envVars)).endContainer()).withRestartPolicy("Never")).endSpec()).endTemplate()).withBackoffLimit(Integer.valueOf(retryNum))).endSpec()).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerBatchJobWatcher(final Job job, final String taskInstanceId, final TaskResponse taskResponse, final K8sTaskMainParameters k8STaskMainParameters) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Watcher<Job> watcher = new Watcher<Job>(){

            public void eventReceived(Watcher.Action action, Job job2) {
                if (action != Watcher.Action.ADDED) {
                    int jobStatus = K8sTaskExecutor.this.getK8sJobStatus(job2);
                    K8sTaskExecutor.this.setTaskStatus(jobStatus, taskInstanceId, taskResponse, k8STaskMainParameters);
                    countDownLatch.countDown();
                }
            }

            public void onClose(WatcherException e) {
                K8sTaskExecutor.this.logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), e.getMessage()));
                taskResponse.setExitStatusCode(-1);
                countDownLatch.countDown();
            }
        };
        try (Watch watch = null;){
            boolean timeoutFlag;
            watch = this.k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
            boolean bl = timeoutFlag = this.taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || this.taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
            if (timeoutFlag) {
                Boolean timeout = !countDownLatch.await(this.taskRequest.getTaskTimeout(), TimeUnit.SECONDS);
                this.waitTimeout(timeout);
            } else {
                countDownLatch.await();
            }
            this.flushLog(taskResponse);
        }
    }

    @Override
    public TaskResponse run(String k8sParameterStr) throws Exception {
        TaskResponse result = new TaskResponse();
        int taskInstanceId = this.taskRequest.getTaskInstanceId();
        K8sTaskMainParameters k8STaskMainParameters = (K8sTaskMainParameters)JSONUtils.parseObject((String)k8sParameterStr, K8sTaskMainParameters.class);
        try {
            if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
                result.setExitStatusCode(137);
                return result;
            }
            if (StringUtils.isEmpty((CharSequence)k8sParameterStr)) {
                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
                return result;
            }
            K8sTaskExecutionContext k8sTaskExecutionContext = this.taskRequest.getK8sTaskExecutionContext();
            String configYaml = k8sTaskExecutionContext.getConfigYaml();
            this.k8sUtils.buildClient(configYaml);
            this.submitJob2k8s(k8sParameterStr);
            this.registerBatchJobWatcher(this.job, Integer.toString(taskInstanceId), result, k8STaskMainParameters);
        }
        catch (Exception e) {
            this.cancelApplication(k8sParameterStr);
            result.setExitStatusCode(-1);
            throw e;
        }
        return result;
    }

    @Override
    public void cancelApplication(String k8sParameterStr) {
        if (this.job != null) {
            this.stopJobOnK8s(k8sParameterStr);
        }
    }

    @Override
    public void submitJob2k8s(String k8sParameterStr) {
        int taskInstanceId = this.taskRequest.getTaskInstanceId();
        String taskName = this.taskRequest.getTaskName().toLowerCase(Locale.ROOT);
        K8sTaskMainParameters k8STaskMainParameters = (K8sTaskMainParameters)JSONUtils.parseObject((String)k8sParameterStr, K8sTaskMainParameters.class);
        try {
            this.logger.info("[K8sJobExecutor-{}-{}] start to submit job", (Object)taskName, (Object)taskInstanceId);
            this.job = this.buildK8sJob(k8STaskMainParameters);
            this.stopJobOnK8s(k8sParameterStr);
            String namespaceName = k8STaskMainParameters.getNamespaceName();
            this.k8sUtils.createJob(namespaceName, this.job);
            this.logger.info("[K8sJobExecutor-{}-{}]  submitted job successfully", (Object)taskName, (Object)taskInstanceId);
        }
        catch (Exception e) {
            this.logger.error("[K8sJobExecutor-{}-{}]  fail to submit job", (Object)taskName, (Object)taskInstanceId);
            throw new TaskException("K8sJobExecutor fail to submit job", e);
        }
    }

    @Override
    public void stopJobOnK8s(String k8sParameterStr) {
        K8sTaskMainParameters k8STaskMainParameters = (K8sTaskMainParameters)JSONUtils.parseObject((String)k8sParameterStr, K8sTaskMainParameters.class);
        String namespaceName = k8STaskMainParameters.getNamespaceName();
        String jobName = this.job.getMetadata().getName();
        try {
            if (Boolean.TRUE.equals(this.k8sUtils.jobExist(jobName, namespaceName))) {
                this.k8sUtils.deleteJob(jobName, namespaceName);
            }
        }
        catch (Exception e) {
            this.logger.error("[K8sJobExecutor-{}]  fail to stop job", (Object)jobName);
            throw new TaskException("K8sJobExecutor fail to stop job", e);
        }
    }

    public int getK8sJobStatus(Job job) {
        JobStatus jobStatus = job.getStatus();
        if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
            return 0;
        }
        if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) {
            return -1;
        }
        return 1;
    }

    public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
        if (jobStatus == 0 || jobStatus == -1) {
            if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) {
                this.logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", this.job.getMetadata().getName()));
                taskResponse.setExitStatusCode(137);
            } else if (jobStatus == 0) {
                this.logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s", this.job.getMetadata().getName()));
                taskResponse.setExitStatusCode(0);
            } else {
                String errorMessage = this.k8sUtils.getPodLog(this.job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName());
                this.logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", this.job.getMetadata().getName(), errorMessage));
                taskResponse.setExitStatusCode(-1);
            }
        }
    }

    public Job getJob() {
        return this.job;
    }

    public void setJob(Job job) {
        this.job = job;
    }
}

