/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.api.am;

import com.google.auto.service.AutoService;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.ContainerResource;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.am.ApplicationManager;
import org.apache.dolphinscheduler.plugin.task.api.am.ApplicationManagerContext;
import org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManagerContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={ApplicationManager.class})
public class KubernetesApplicationManager
implements ApplicationManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KubernetesApplicationManager.class);
    private static final String PENDING = "Pending";
    private static final String RUNNING = "Running";
    private static final String FINISH = "Succeeded";
    private static final String FAILED = "Failed";
    private static final String UNKNOWN = "Unknown";
    private static final int MAX_RETRY_TIMES = 10;
    private final Map<String, KubernetesClient> cacheClientMap = new ConcurrentHashMap<String, KubernetesClient>();

    @Override
    public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
        boolean isKill;
        KubernetesApplicationManagerContext kubernetesApplicationManagerContext = (KubernetesApplicationManagerContext)applicationManagerContext;
        String labelValue = kubernetesApplicationManagerContext.getLabelValue();
        FilterWatchListDeletable<Pod, PodList, PodResource> watchList = this.getListenPod(kubernetesApplicationManagerContext);
        try {
            if (this.getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) {
                log.error("Driver pod is in FAILED or UNKNOWN status.");
                isKill = false;
            } else {
                watchList.delete();
                isKill = true;
            }
        }
        catch (Exception e) {
            throw new TaskException("Failed to kill Kubernetes application with label " + labelValue, e);
        }
        finally {
            this.removeCache(labelValue);
        }
        return isKill;
    }

    @Override
    public ResourceManagerType getResourceManagerType() {
        return ResourceManagerType.KUBERNETES;
    }

    private FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
        KubernetesClient client = this.getClient(kubernetesApplicationManagerContext);
        String labelValue = kubernetesApplicationManagerContext.getLabelValue();
        List podList = null;
        FilterWatchListDeletable watchList = null;
        for (int retryTimes = 0; CollectionUtils.isEmpty(podList) && retryTimes < 10 && CollectionUtils.isEmpty((Collection)(podList = ((PodList)(watchList = (FilterWatchListDeletable)((NonNamespaceOperation)client.pods().inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())).withLabel("dolphinscheduler-label", labelValue)).list()).getItems())); ++retryTimes) {
            ThreadUtils.sleep((long)1000L);
        }
        return watchList;
    }

    private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
        K8sTaskExecutionContext k8sTaskExecutionContext = kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
        return this.cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), key -> new KubernetesClientBuilder().withConfig(Config.fromKubeconfig((String)k8sTaskExecutionContext.getConfigYaml())).build());
    }

    public void removeCache(String cacheKey) {
        KubernetesClient ignored = this.cacheClientMap.remove(cacheKey);
        Throwable throwable = null;
        if (ignored != null) {
            if (throwable != null) {
                try {
                    ignored.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                ignored.close();
            }
        }
    }

    public TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) throws TaskException {
        return this.getApplicationStatus(kubernetesApplicationManagerContext, null);
    }

    private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext, FilterWatchListDeletable<Pod, PodList, PodResource> watchList) throws TaskException {
        String phase;
        try {
            List driverPod;
            if (Objects.isNull(watchList)) {
                watchList = this.getListenPod(kubernetesApplicationManagerContext);
            }
            if (!(driverPod = ((PodList)watchList.list()).getItems()).isEmpty()) {
                Pod driver = (Pod)driverPod.get(0);
                phase = driver.getStatus().getPhase();
            } else {
                phase = FINISH;
            }
        }
        catch (Exception e) {
            throw new TaskException("Failed to get Kubernetes application status", e);
        }
        return phase.equals(FAILED) || phase.equals(UNKNOWN) ? TaskExecutionStatus.FAILURE : TaskExecutionStatus.SUCCESS;
    }

    public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
        KubernetesClient client = this.getClient(kubernetesApplicationManagerContext);
        boolean podIsReady = false;
        Pod pod = null;
        while (!podIsReady) {
            List podList;
            FilterWatchListDeletable<Pod, PodList, PodResource> watchList = this.getListenPod(kubernetesApplicationManagerContext);
            List list = podList = watchList == null ? null : ((PodList)watchList.list()).getItems();
            if (CollectionUtils.isEmpty((Collection)podList)) {
                return null;
            }
            pod = (Pod)podList.get(0);
            String phase = pod.getStatus().getPhase();
            if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
                ThreadUtils.sleep((long)1000L);
                continue;
            }
            podIsReady = true;
        }
        return ((ContainerResource)((PodResource)((NonNamespaceOperation)client.pods().inNamespace(pod.getMetadata().getNamespace())).withName(pod.getMetadata().getName())).inContainer((Object)kubernetesApplicationManagerContext.getContainerName())).watchLog();
    }
}

