/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.task.datax;

import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;

public class DataxTask
extends AbstractTask {
    private static final String DATAX_PYTHON = "python2.7";
    private static final String DATAX_PATH = "${DATAX_HOME}/bin/datax.py";
    private static final int DATAX_CHANNEL_COUNT = 1;
    private DataxParameters dataXParameters;
    private ShellCommandExecutor shellCommandExecutor;
    private TaskExecutionContext taskExecutionContext;

    public DataxTask(TaskExecutionContext taskExecutionContext, Logger logger) {
        super(taskExecutionContext, logger);
        this.taskExecutionContext = taskExecutionContext;
        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
    }

    @Override
    public void init() {
        this.logger.info("datax task params {}", (Object)this.taskExecutionContext.getTaskParams());
        this.dataXParameters = (DataxParameters)JSONUtils.parseObject((String)this.taskExecutionContext.getTaskParams(), DataxParameters.class);
        if (!this.dataXParameters.checkParameters()) {
            throw new RuntimeException("datax task params is not valid");
        }
    }

    @Override
    public void handle() throws Exception {
        try {
            String threadLoggerInfoName = String.format("TaskLogInfo-%s", this.taskExecutionContext.getTaskAppId());
            Thread.currentThread().setName(threadLoggerInfoName);
            Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(this.taskExecutionContext.getDefinedParams()), this.taskExecutionContext.getDefinedParams(), this.dataXParameters.getLocalParametersMap(), CommandType.of((Integer)this.taskExecutionContext.getCmdTypeIfComplement()), this.taskExecutionContext.getScheduleTime());
            String jsonFilePath = this.buildDataxJsonFile(paramsMap);
            String shellCommandFilePath = this.buildShellCommandFile(jsonFilePath, paramsMap);
            CommandExecuteResult commandExecuteResult = this.shellCommandExecutor.run(shellCommandFilePath);
            this.setExitStatusCode(commandExecuteResult.getExitStatusCode());
            this.setAppIds(commandExecuteResult.getAppIds());
            this.setProcessId(commandExecuteResult.getProcessId());
        }
        catch (Exception e) {
            this.logger.error("datax task failure", (Throwable)e);
            this.setExitStatusCode(-1);
            throw e;
        }
    }

    @Override
    public void cancelApplication(boolean cancelApplication) throws Exception {
        this.shellCommandExecutor.cancelApplication();
    }

    private String buildDataxJsonFile(Map<String, Property> paramsMap) throws Exception {
        String json;
        String fileName = String.format("%s/%s_job.json", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        Path path = new File(fileName).toPath();
        if (Files.exists(path, new LinkOption[0])) {
            return fileName;
        }
        if (this.dataXParameters.getCustomConfig() == Flag.YES.ordinal()) {
            json = this.dataXParameters.getJson().replaceAll("\\r\\n", "\n");
        } else {
            JSONObject job = new JSONObject();
            job.put("content", this.buildDataxJobContentJson());
            job.put("setting", (Object)this.buildDataxJobSettingJson());
            JSONObject root = new JSONObject();
            root.put("job", (Object)job);
            root.put("core", (Object)this.buildDataxCoreJson());
            json = root.toString();
        }
        json = ParameterUtils.convertParameterPlaceholders((String)json, ParamUtils.convert(paramsMap));
        this.logger.debug("datax job json : {}", (Object)json);
        FileUtils.writeStringToFile((File)new File(fileName), (String)json, (Charset)StandardCharsets.UTF_8);
        return fileName;
    }

    private List<JSONObject> buildDataxJobContentJson() throws SQLException {
        DataxTaskExecutionContext dataxTaskExecutionContext = this.taskExecutionContext.getDataxTaskExecutionContext();
        BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource((DbType)DbType.of((int)dataxTaskExecutionContext.getSourcetype()), (String)dataxTaskExecutionContext.getSourceConnectionParams());
        BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource((DbType)DbType.of((int)dataxTaskExecutionContext.getTargetType()), (String)dataxTaskExecutionContext.getTargetConnectionParams());
        ArrayList<JSONObject> readerConnArr = new ArrayList<JSONObject>();
        JSONObject readerConn = new JSONObject();
        readerConn.put("querySql", (Object)new String[]{this.dataXParameters.getSql()});
        readerConn.put("jdbcUrl", (Object)new String[]{dataSourceCfg.getJdbcUrl()});
        readerConnArr.add(readerConn);
        JSONObject readerParam = new JSONObject();
        readerParam.put("username", (Object)dataSourceCfg.getUser());
        readerParam.put("password", (Object)dataSourceCfg.getPassword());
        readerParam.put("connection", readerConnArr);
        JSONObject reader = new JSONObject();
        reader.put("name", (Object)DataxUtils.getReaderPluginName(DbType.of((int)dataxTaskExecutionContext.getSourcetype())));
        reader.put("parameter", (Object)readerParam);
        ArrayList<JSONObject> writerConnArr = new ArrayList<JSONObject>();
        JSONObject writerConn = new JSONObject();
        writerConn.put("table", (Object)new String[]{this.dataXParameters.getTargetTable()});
        writerConn.put("jdbcUrl", (Object)dataTargetCfg.getJdbcUrl());
        writerConnArr.add(writerConn);
        JSONObject writerParam = new JSONObject();
        writerParam.put("username", (Object)dataTargetCfg.getUser());
        writerParam.put("password", (Object)dataTargetCfg.getPassword());
        writerParam.put("column", (Object)this.parsingSqlColumnNames(DbType.of((int)dataxTaskExecutionContext.getSourcetype()), DbType.of((int)dataxTaskExecutionContext.getTargetType()), dataSourceCfg, this.dataXParameters.getSql()));
        writerParam.put("connection", writerConnArr);
        if (CollectionUtils.isNotEmpty((Collection)this.dataXParameters.getPreStatements())) {
            writerParam.put("preSql", (Object)this.dataXParameters.getPreStatements());
        }
        if (CollectionUtils.isNotEmpty((Collection)this.dataXParameters.getPostStatements())) {
            writerParam.put("postSql", (Object)this.dataXParameters.getPostStatements());
        }
        JSONObject writer = new JSONObject();
        writer.put("name", (Object)DataxUtils.getWriterPluginName(DbType.of((int)dataxTaskExecutionContext.getTargetType())));
        writer.put("parameter", (Object)writerParam);
        ArrayList<JSONObject> contentList = new ArrayList<JSONObject>();
        JSONObject content = new JSONObject();
        content.put("reader", (Object)reader);
        content.put("writer", (Object)writer);
        contentList.add(content);
        return contentList;
    }

    private JSONObject buildDataxJobSettingJson() {
        JSONObject speed = new JSONObject();
        speed.put("channel", (Object)1);
        if (this.dataXParameters.getJobSpeedByte() > 0) {
            speed.put("byte", (Object)this.dataXParameters.getJobSpeedByte());
        }
        if (this.dataXParameters.getJobSpeedRecord() > 0) {
            speed.put("record", (Object)this.dataXParameters.getJobSpeedRecord());
        }
        JSONObject errorLimit = new JSONObject();
        errorLimit.put("record", (Object)0);
        errorLimit.put("percentage", (Object)0);
        JSONObject setting = new JSONObject();
        setting.put("speed", (Object)speed);
        setting.put("errorLimit", (Object)errorLimit);
        return setting;
    }

    private JSONObject buildDataxCoreJson() {
        JSONObject speed = new JSONObject();
        speed.put("channel", (Object)1);
        if (this.dataXParameters.getJobSpeedByte() > 0) {
            speed.put("byte", (Object)this.dataXParameters.getJobSpeedByte());
        }
        if (this.dataXParameters.getJobSpeedRecord() > 0) {
            speed.put("record", (Object)this.dataXParameters.getJobSpeedRecord());
        }
        JSONObject channel = new JSONObject();
        channel.put("speed", (Object)speed);
        JSONObject transport = new JSONObject();
        transport.put("channel", (Object)channel);
        JSONObject core = new JSONObject();
        core.put("transport", (Object)transport);
        return core;
    }

    private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
        String fileName = String.format("%s/%s_node.%s", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
        Path path = new File(fileName).toPath();
        if (Files.exists(path, new LinkOption[0])) {
            return fileName;
        }
        StringBuilder sbr = new StringBuilder();
        sbr.append(DATAX_PYTHON);
        sbr.append(" ");
        sbr.append(DATAX_PATH);
        sbr.append(" ");
        sbr.append(jobConfigFilePath);
        String dataxCommand = ParameterUtils.convertParameterPlaceholders((String)sbr.toString(), ParamUtils.convert(paramsMap));
        this.logger.debug("raw script : {}", (Object)dataxCommand);
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
        if (OSUtils.isWindows()) {
            Files.createFile(path, new FileAttribute[0]);
        } else {
            Files.createFile(path, attr);
        }
        Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
        return fileName;
    }

    private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) {
        String[] columnNames = this.tryGrammaticalAnalysisSqlColumnNames(dsType, sql);
        if (columnNames == null || columnNames.length == 0) {
            this.logger.info("try to execute sql analysis query column name");
            columnNames = this.tryExecuteSqlResolveColumnNames(dataSourceCfg, sql);
        }
        this.notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
        return DataxUtils.convertKeywordsColumns(dtType, columnNames);
    }

    private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
        String[] columnNames;
        try {
            SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
            this.notNull(parser, String.format("database driver [%s] is not support", dbType.toString()));
            SQLStatement sqlStatement = parser.parseStatement();
            SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
            SQLSelect sqlSelect = sqlSelectStatement.getSelect();
            List selectItemList = null;
            if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
                SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
                selectItemList = block.getSelectList();
            } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
                SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
                SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
                selectItemList = block.getSelectList();
            }
            this.notNull(selectItemList, String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
            columnNames = new String[selectItemList.size()];
            for (int i = 0; i < selectItemList.size(); ++i) {
                SQLSelectItem item = (SQLSelectItem)selectItemList.get(i);
                String columnName = null;
                if (item.getAlias() != null) {
                    columnName = item.getAlias();
                } else if (item.getExpr() != null) {
                    SQLPropertyExpr expr;
                    if (item.getExpr() instanceof SQLPropertyExpr) {
                        expr = (SQLPropertyExpr)item.getExpr();
                        columnName = expr.getName();
                    } else if (item.getExpr() instanceof SQLIdentifierExpr) {
                        expr = (SQLIdentifierExpr)item.getExpr();
                        columnName = expr.getName();
                    }
                } else {
                    throw new RuntimeException(String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
                }
                if (columnName == null) {
                    throw new RuntimeException(String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
                }
                columnNames[i] = columnName;
            }
        }
        catch (Exception e) {
            this.logger.warn(e.getMessage(), (Throwable)e);
            return null;
        }
        return columnNames;
    }

    public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) {
        String[] columnNames;
        sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
        sql = sql.replace(";", "");
        try (Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(), baseDataSource.getPassword());
             PreparedStatement stmt = connection.prepareStatement(sql);
             ResultSet resultSet = stmt.executeQuery();){
            ResultSetMetaData md = resultSet.getMetaData();
            int num = md.getColumnCount();
            columnNames = new String[num];
            for (int i = 1; i <= num; ++i) {
                columnNames[i - 1] = md.getColumnName(i);
            }
        }
        catch (SQLException e) {
            this.logger.warn(e.getMessage(), (Throwable)e);
            return null;
        }
        return columnNames;
    }

    @Override
    public AbstractParameters getParameters() {
        return this.dataXParameters;
    }

    private void notNull(Object obj, String message) {
        if (obj == null) {
            throw new RuntimeException(message);
        }
    }
}

