package org.apache.dolphinscheduler.service.storage.impl;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.utils.HttpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.service.utils.CommonUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.class */
public class HadoopUtils implements Closeable, StorageOperate {
    private String hdfsUser;
    private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
    private volatile boolean yarnEnabled;
    private Configuration configuration;
    private FileSystem fs;
    private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
    public static final String RM_HA_IDS = PropertyUtils.getString("yarn.resourcemanager.ha.rm.ids");
    public static final String APP_ADDRESS = PropertyUtils.getString("yarn.application.status.address");
    public static final String JOB_HISTORY_ADDRESS = PropertyUtils.getString("yarn.job.history.status.address");
    public static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE = PropertyUtils.getInt("resource.manager.httpaddress.port", 8088);
    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder.newBuilder().expireAfterWrite(PropertyUtils.getInt("kerberos.expire.time", 2), TimeUnit.HOURS).build(new CacheLoader<String, HadoopUtils>() { // from class: org.apache.dolphinscheduler.service.storage.impl.HadoopUtils.1
        public HadoopUtils load(String str) throws Exception {
            return new HadoopUtils();
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/dolphinscheduler/service/storage/impl/HadoopUtils$YarnHAAdminUtils.class */
    public static final class YarnHAAdminUtils {
        private YarnHAAdminUtils() {
        }

        public static String getActiveRMName(String str, String str2) {
            String[] split = str2.split(",");
            String str3 = str + "%s:" + HadoopUtils.HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
            try {
                for (String str4 : split) {
                    if ("ACTIVE".equals(getRMState(String.format(str3, str4)))) {
                        return str4;
                    }
                }
                return null;
            } catch (Exception e) {
                HadoopUtils.logger.error("yarn ha application url generation failed, message:{}", e.getMessage());
                return null;
            }
        }

        public static String getRMState(String str) {
            String str2 = Boolean.TRUE.equals(PropertyUtils.getBoolean("hadoop.security.authentication.startup.state", false)) ? KerberosHttpClient.get(str) : HttpUtils.get(str);
            if (StringUtils.isEmpty(str2)) {
                return null;
            }
            ObjectNode parseObject = JSONUtils.parseObject(str2);
            if (parseObject.has("clusterInfo")) {
                return parseObject.get("clusterInfo").path("haState").asText();
            }
            return null;
        }
    }

    private HadoopUtils() {
        this.yarnEnabled = false;
        this.hdfsUser = PropertyUtils.getString("resource.hdfs.root.user");
        init();
        initHdfsPath();
    }

    public static HadoopUtils getInstance() {
        return (HadoopUtils) cache.getUnchecked(HADOOP_UTILS_KEY);
    }

    private void initHdfsPath() {
        Path path = new Path(RESOURCE_UPLOAD_PATH);
        try {
            if (!this.fs.exists(path)) {
                this.fs.mkdirs(path);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private void init() throws NullPointerException {
        try {
            this.configuration = new HdfsConfiguration();
            if (CommonUtils.loadKerberosConf(this.configuration)) {
                this.hdfsUser = "";
            }
            String str = this.configuration.get("resource.hdfs.fs.defaultFS");
            if (StringUtils.isBlank(str)) {
                str = PropertyUtils.getString("resource.hdfs.fs.defaultFS");
            }
            if (!StringUtils.isNotBlank(str)) {
                logger.error("property:{} can not to be empty, please set!", "resource.hdfs.fs.defaultFS");
                throw new NullPointerException(String.format("property: %s can not to be empty, please set!", "resource.hdfs.fs.defaultFS"));
            }
            Map prefixedProperties = PropertyUtils.getPrefixedProperties("fs.");
            this.configuration.set("fs.defaultFS", str);
            prefixedProperties.forEach((str2, str3) -> {
                this.configuration.set(str2, str3);
            });
            if (!str.startsWith("file")) {
                logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", "resource.hdfs.fs.defaultFS", str);
            }
            if (StringUtils.isNotEmpty(this.hdfsUser)) {
                UserGroupInformation.createRemoteUser(this.hdfsUser).doAs(() -> {
                    this.fs = FileSystem.get(this.configuration);
                    return true;
                });
            } else {
                logger.warn("resource.hdfs.root.user is not set value!");
                this.fs = FileSystem.get(this.configuration);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public String getDefaultFS() {
        return getConfiguration().get("resource.hdfs.fs.defaultFS");
    }

    public String getApplicationUrl(String str) throws BaseException {
        this.yarnEnabled = true;
        String appAddress = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS);
        if (StringUtils.isBlank(appAddress)) {
            throw new BaseException("yarn application url generation failed");
        }
        logger.debug("yarn application url:{}, applicationId:{}", appAddress, str);
        return String.format(appAddress, Integer.valueOf(HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE), str);
    }

    public String getJobHistoryUrl(String str) {
        return String.format(JOB_HISTORY_ADDRESS, str.replace("application", "job"));
    }

    public byte[] catFile(String str) throws IOException {
        if (StringUtils.isBlank(str)) {
            logger.error("hdfs file path:{} is blank", str);
            return new byte[0];
        }
        FSDataInputStream open = this.fs.open(new Path(str));
        try {
            byte[] byteArray = IOUtils.toByteArray(open);
            if (open != null) {
                open.close();
            }
            return byteArray;
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<String> catFile(String str, int i, int i2) throws IOException {
        if (StringUtils.isBlank(str)) {
            logger.error("hdfs file path:{} is blank", str);
            return Collections.emptyList();
        }
        FSDataInputStream open = this.fs.open(new Path(str));
        try {
            List<String> list = (List) new BufferedReader(new InputStreamReader((InputStream) open, StandardCharsets.UTF_8)).lines().skip(i).limit(i2).collect(Collectors.toList());
            if (open != null) {
                open.close();
            }
            return list;
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public List<String> vimFile(String str, String str2, int i, int i2) throws IOException {
        return catFile(str2, i, i2);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public void createTenantDirIfNotExists(String str) throws IOException {
        getInstance().mkdir(str, getHdfsResDir(str));
        getInstance().mkdir(str, getHdfsUdfDir(str));
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public String getResDir(String str) {
        return getHdfsResDir(str);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public String getUdfDir(String str) {
        return getHdfsUdfDir(str);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public boolean mkdir(String str, String str2) throws IOException {
        return this.fs.mkdirs(new Path(str2));
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public String getResourceFileName(String str, String str2) {
        return getHdfsResourceFileName(str, str2);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public String getFileName(ResourceType resourceType, String str, String str2) {
        return getHdfsFileName(resourceType, str, str2);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public void download(String str, String str2, String str3, boolean z, boolean z2) throws IOException {
        copyHdfsToLocal(str2, str3, z, z2);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public boolean copy(String str, String str2, boolean z, boolean z2) throws IOException {
        return FileUtil.copy(this.fs, new Path(str), this.fs, new Path(str2), z, z2, this.fs.getConf());
    }

    public boolean copyLocalToHdfs(String str, String str2, boolean z, boolean z2) throws IOException {
        this.fs.copyFromLocalFile(z, z2, new Path(str), new Path(str2));
        return true;
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public boolean upload(String str, String str2, String str3, boolean z, boolean z2) throws IOException {
        return copyLocalToHdfs(str2, str3, z, z2);
    }

    public boolean copyHdfsToLocal(String str, String str2, boolean z, boolean z2) throws IOException {
        Path path = new Path(str);
        File file = new File(str2);
        if (file.exists()) {
            if (!file.isFile()) {
                logger.error("destination file must be a file");
            } else if (z2) {
                Files.delete(file.toPath());
            }
        }
        if (file.getParentFile().exists() || file.getParentFile().mkdirs()) {
            return FileUtil.copy(this.fs, path, file, z, this.fs.getConf());
        }
        return false;
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public boolean delete(String str, String str2, boolean z) throws IOException {
        return this.fs.delete(new Path(str2), z);
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public boolean exists(String str, String str2) throws IOException {
        return this.fs.exists(new Path(str2));
    }

    public FileStatus[] listFileStatus(String str) throws IOException {
        try {
            return this.fs.listStatus(new Path(str));
        } catch (IOException e) {
            logger.error("Get file list exception", e);
            throw new IOException("Get file list exception", e);
        }
    }

    public boolean rename(String str, String str2) throws IOException {
        return this.fs.rename(new Path(str), new Path(str2));
    }

    public boolean isYarnEnabled() {
        return this.yarnEnabled;
    }

    public TaskExecutionStatus getApplicationStatus(String str) throws BaseException {
        String asText;
        if (StringUtils.isEmpty(str)) {
            return null;
        }
        String applicationUrl = getApplicationUrl(str);
        logger.debug("generate yarn application url, applicationUrl={}", applicationUrl);
        String str2 = Boolean.TRUE.equals(PropertyUtils.getBoolean("hadoop.security.authentication.startup.state", false)) ? KerberosHttpClient.get(applicationUrl) : HttpUtils.get(applicationUrl);
        if (str2 != null) {
            ObjectNode parseObject = JSONUtils.parseObject(str2);
            if (!parseObject.has("app")) {
                return TaskExecutionStatus.FAILURE;
            }
            asText = parseObject.path("app").path("finalStatus").asText();
        } else {
            String jobHistoryUrl = getJobHistoryUrl(str);
            logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
            String str3 = Boolean.TRUE.equals(PropertyUtils.getBoolean("hadoop.security.authentication.startup.state", false)) ? KerberosHttpClient.get(jobHistoryUrl) : HttpUtils.get(jobHistoryUrl);
            if (null == str3) {
                return TaskExecutionStatus.FAILURE;
            }
            ObjectNode parseObject2 = JSONUtils.parseObject(str3);
            if (!parseObject2.has("job")) {
                return TaskExecutionStatus.FAILURE;
            }
            asText = parseObject2.path("job").path("state").asText();
        }
        return getExecutionStatus(asText);
    }

    private TaskExecutionStatus getExecutionStatus(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -2078024579:
                if (str.equals("KILLED")) {
                    z = 7;
                    break;
                }
                break;
            case -2026200673:
                if (str.equals("RUNNING")) {
                    z = 8;
                    break;
                }
                break;
            case -1363898457:
                if (str.equals("ACCEPTED")) {
                    z = false;
                    break;
                }
                break;
            case -1159694117:
                if (str.equals("SUBMITTED")) {
                    z = 5;
                    break;
                }
                break;
            case -562638271:
                if (str.equals("SUCCEEDED")) {
                    z = true;
                    break;
                }
                break;
            case -450427239:
                if (str.equals("NEW_SAVING")) {
                    z = 4;
                    break;
                }
                break;
            case 77184:
                if (str.equals("NEW")) {
                    z = 3;
                    break;
                }
                break;
            case 66114202:
                if (str.equals("ENDED")) {
                    z = 2;
                    break;
                }
                break;
            case 2066319421:
                if (str.equals("FAILED")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return TaskExecutionStatus.SUBMITTED_SUCCESS;
            case true:
            case true:
                return TaskExecutionStatus.SUCCESS;
            case true:
            case true:
            case true:
            case true:
                return TaskExecutionStatus.FAILURE;
            case true:
                return TaskExecutionStatus.KILL;
            case true:
            default:
                return TaskExecutionStatus.RUNNING_EXECUTION;
        }
    }

    public static String getHdfsDataBasePath() {
        return "/".equals(RESOURCE_UPLOAD_PATH) ? "" : RESOURCE_UPLOAD_PATH;
    }

    public static String getHdfsDir(ResourceType resourceType, String str) {
        String str2 = "";
        if (resourceType.equals(ResourceType.FILE)) {
            str2 = getHdfsResDir(str);
        } else if (resourceType.equals(ResourceType.UDF)) {
            str2 = getHdfsUdfDir(str);
        }
        return str2;
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public String getDir(ResourceType resourceType, String str) {
        return getHdfsDir(resourceType, str);
    }

    public static String getHdfsResDir(String str) {
        return String.format("%s/resources", getHdfsTenantDir(str));
    }

    public static String getHdfsUdfDir(String str) {
        return String.format("%s/udfs", getHdfsTenantDir(str));
    }

    public static String getHdfsFileName(ResourceType resourceType, String str, String str2) {
        if (str2.startsWith("/")) {
            str2 = str2.replaceFirst("/", "");
        }
        return String.format("%s/%s", getHdfsDir(resourceType, str), str2);
    }

    public static String getHdfsResourceFileName(String str, String str2) {
        if (str2.startsWith("/")) {
            str2 = str2.replaceFirst("/", "");
        }
        return String.format("%s/%s", getHdfsResDir(str), str2);
    }

    public static String getHdfsUdfFileName(String str, String str2) {
        if (str2.startsWith("/")) {
            str2 = str2.replaceFirst("/", "");
        }
        return String.format("%s/%s", getHdfsUdfDir(str), str2);
    }

    public static String getHdfsTenantDir(String str) {
        return String.format("%s/%s", getHdfsDataBasePath(), str);
    }

    public static String getAppAddress(String str, String str2) {
        String[] split = str.split("//");
        if (split.length != 2) {
            return null;
        }
        String str3 = split[0] + "//";
        String[] split2 = split[1].split(":");
        if (split2.length != 2) {
            return null;
        }
        String str4 = ":" + split2[1];
        String activeRMName = YarnHAAdminUtils.getActiveRMName(str3, str2);
        if (StringUtils.isEmpty(activeRMName)) {
            return null;
        }
        return str3 + activeRMName + str4;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.fs != null) {
            try {
                this.fs.close();
            } catch (IOException e) {
                logger.error("Close HadoopUtils instance failed", e);
                throw new IOException("Close HadoopUtils instance failed", e);
            }
        }
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public void deleteTenant(String str) throws Exception {
        String str2 = getHdfsDataBasePath() + "/" + str;
        if (exists(str, str2)) {
            delete(str, str2, true);
        }
    }

    @Override // org.apache.dolphinscheduler.service.storage.StorageOperate
    public ResUploadType returnStorageType() {
        return ResUploadType.HDFS;
    }
}
