package org.apache.dolphinscheduler.server.worker.task.sql;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlBinds;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.class */
public class SqlTask extends AbstractTask {
    private SqlParameters sqlParameters;
    private AlertDao alertDao;
    private BaseDataSource baseDataSource;
    private TaskExecutionContext taskExecutionContext;
    private static final int QUERY_LIMIT = 10000;

    public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) {
        super(taskExecutionContext, logger);
        this.taskExecutionContext = taskExecutionContext;
        logger.info("sql task params {}", taskExecutionContext.getTaskParams());
        this.sqlParameters = (SqlParameters) JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
        if (!this.sqlParameters.checkParameters()) {
            throw new RuntimeException("sql task params is not valid");
        }
        this.alertDao = (AlertDao) SpringApplicationContext.getBean(AlertDao.class);
    }

    @Override // org.apache.dolphinscheduler.server.worker.task.AbstractTask
    public void handle() throws Exception {
        Thread.currentThread().setName(String.format("TaskLogInfo-%s", this.taskExecutionContext.getTaskAppId()));
        this.logger.info("Full sql parameters: {}", this.sqlParameters);
        this.logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}, query max result limit : {}", new Object[]{this.sqlParameters.getType(), Integer.valueOf(this.sqlParameters.getDatasource()), this.sqlParameters.getSql(), this.sqlParameters.getLocalParams(), this.sqlParameters.getUdfs(), this.sqlParameters.getShowType(), this.sqlParameters.getConnParams(), Integer.valueOf(this.sqlParameters.getLimit())});
        try {
            SQLTaskExecutionContext sqlTaskExecutionContext = this.taskExecutionContext.getSqlTaskExecutionContext();
            DataSourceFactory.loadClass(DbType.valueOf(this.sqlParameters.getType()));
            this.baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(this.sqlParameters.getType()), sqlTaskExecutionContext.getConnectionParams());
            executeFuncAndSql(getSqlAndSqlParamsMap(this.sqlParameters.getSql()), (List) ((List) Optional.ofNullable(this.sqlParameters.getPreStatements()).orElse(new ArrayList())).stream().map(this::getSqlAndSqlParamsMap).collect(Collectors.toList()), (List) ((List) Optional.ofNullable(this.sqlParameters.getPostStatements()).orElse(new ArrayList())).stream().map(this::getSqlAndSqlParamsMap).collect(Collectors.toList()), UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), this.logger));
            setExitStatusCode(0);
        } catch (Exception e) {
            setExitStatusCode(-1);
            this.logger.error("sql task error: {}", e.toString());
            throw e;
        }
    }

    private SqlBinds getSqlAndSqlParamsMap(String str) {
        HashMap hashMap = new HashMap();
        StringBuilder sb = new StringBuilder();
        Map<String, Property> convert = ParamUtils.convert(ParamUtils.getUserDefParamsMap(this.taskExecutionContext.getDefinedParams()), this.taskExecutionContext.getDefinedParams(), this.sqlParameters.getLocalParametersMap(), CommandType.of(Integer.valueOf(this.taskExecutionContext.getCmdTypeIfComplement())), this.taskExecutionContext.getScheduleTime());
        if (convert == null) {
            sb.append(str);
            return new SqlBinds(sb.toString(), hashMap);
        }
        if (MapUtils.isNotEmpty(this.taskExecutionContext.getParamsMap())) {
            convert.putAll(this.taskExecutionContext.getParamsMap());
        }
        if (StringUtils.isNotEmpty(this.sqlParameters.getTitle())) {
            String convertParameterPlaceholders = ParameterUtils.convertParameterPlaceholders(this.sqlParameters.getTitle(), ParamUtils.convert(convert));
            this.logger.info("SQL title : {}", convertParameterPlaceholders);
            this.sqlParameters.setTitle(convertParameterPlaceholders);
        }
        String replaceScheduleTime = ParameterUtils.replaceScheduleTime(str, this.taskExecutionContext.getScheduleTime());
        setSqlParamsMap(replaceScheduleTime, "['\"]*\\$\\{(.*?)\\}['\"]*", hashMap, convert);
        String replaceAll = replaceScheduleTime.replaceAll("['\"]*\\$\\{(.*?)\\}['\"]*", "?");
        sb.append(replaceAll);
        printReplacedSql(replaceScheduleTime, replaceAll, "['\"]*\\$\\{(.*?)\\}['\"]*", hashMap);
        return new SqlBinds(sb.toString(), hashMap);
    }

    @Override // org.apache.dolphinscheduler.server.worker.task.AbstractTask
    public AbstractParameters getParameters() {
        return this.sqlParameters;
    }

    public void executeFuncAndSql(SqlBinds sqlBinds, List<SqlBinds> list, List<SqlBinds> list2, List<String> list3) throws Exception {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            try {
                CommonUtils.loadKerberosConf();
                connection = createConnection();
                if (CollectionUtils.isNotEmpty(list3)) {
                    createTempFunction(connection, list3);
                }
                preSql(connection, list);
                preparedStatement = prepareStatementAndBind(connection, sqlBinds);
                if (this.sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
                    resultSet = preparedStatement.executeQuery();
                    resultProcess(resultSet);
                } else if (this.sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
                    preparedStatement.executeUpdate();
                }
                postSql(connection, list2);
                close(resultSet, preparedStatement, connection);
            } catch (Exception e) {
                this.logger.error("execute sql error: {}", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            close(resultSet, preparedStatement, connection);
            throw th;
        }
    }

    private void resultProcess(ResultSet resultSet) throws Exception {
        JSONArray jSONArray = new JSONArray();
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();
        int limit = this.sqlParameters.getLimit() == 0 ? QUERY_LIMIT : this.sqlParameters.getLimit();
        for (int i = 0; i < limit && resultSet.next(); i++) {
            JSONObject jSONObject = new JSONObject(true);
            for (int i2 = 1; i2 <= columnCount; i2++) {
                jSONObject.put(metaData.getColumnLabel(i2), resultSet.getObject(i2));
            }
            jSONArray.add(jSONObject);
        }
        this.logger.debug("execute sql result : {}", JSONUtils.toJsonString(jSONArray));
        int min = Math.min(this.sqlParameters.getDisplayRows() > 0 ? this.sqlParameters.getDisplayRows() : 10, jSONArray.size());
        this.logger.info("display sql result {} rows as follows:", Integer.valueOf(min));
        for (int i3 = 0; i3 < min; i3++) {
            this.logger.info("row {} : {}", Integer.valueOf(i3 + 1), JSONUtils.toJsonString(jSONArray.get(i3)));
        }
        if (resultSet.next()) {
            this.logger.info("sql result limit : {} exceeding results are filtered", Integer.valueOf(limit));
            jSONArray.add(String.format("sql result limit : %d exceeding results are filtered", Integer.valueOf(limit)));
        }
        if (this.sqlParameters.getSendEmail() == null || this.sqlParameters.getSendEmail().booleanValue()) {
            sendAttachment(StringUtils.isNotEmpty(this.sqlParameters.getTitle()) ? this.sqlParameters.getTitle() : this.taskExecutionContext.getTaskName() + " query result sets", JSONUtils.toJsonString(jSONArray));
        }
    }

    private void preSql(Connection connection, List<SqlBinds> list) throws Exception {
        for (SqlBinds sqlBinds : list) {
            PreparedStatement prepareStatementAndBind = prepareStatementAndBind(connection, sqlBinds);
            Throwable th = null;
            try {
                try {
                    this.logger.info("pre statement execute result: {}, for sql: {}", Integer.valueOf(prepareStatementAndBind.executeUpdate()), sqlBinds.getSql());
                    if (prepareStatementAndBind != null) {
                        if (0 != 0) {
                            try {
                                prepareStatementAndBind.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            prepareStatementAndBind.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (prepareStatementAndBind != null) {
                    if (th != null) {
                        try {
                            prepareStatementAndBind.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        prepareStatementAndBind.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void postSql(Connection connection, List<SqlBinds> list) throws Exception {
        for (SqlBinds sqlBinds : list) {
            PreparedStatement prepareStatementAndBind = prepareStatementAndBind(connection, sqlBinds);
            Throwable th = null;
            try {
                try {
                    this.logger.info("post statement execute result: {},for sql: {}", Integer.valueOf(prepareStatementAndBind.executeUpdate()), sqlBinds.getSql());
                    if (prepareStatementAndBind != null) {
                        if (0 != 0) {
                            try {
                                prepareStatementAndBind.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            prepareStatementAndBind.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (prepareStatementAndBind != null) {
                    if (th != null) {
                        try {
                            prepareStatementAndBind.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        prepareStatementAndBind.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void createTempFunction(Connection connection, List<String> list) throws Exception {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            for (String str : list) {
                this.logger.info("hive create function sql: {}", str);
                createStatement.execute(str);
            }
            if (createStatement != null) {
                if (0 == 0) {
                    createStatement.close();
                    return;
                }
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }

    private Connection createConnection() throws Exception {
        Connection connection;
        if (DbType.HIVE == DbType.valueOf(this.sqlParameters.getType())) {
            Properties properties = new Properties();
            properties.setProperty("user", this.baseDataSource.getUser());
            properties.setProperty("password", this.baseDataSource.getPassword());
            properties.putAll(CollectionUtils.stringToMap(this.sqlParameters.getConnParams(), ";", "hiveconf:"));
            connection = DriverManager.getConnection(this.baseDataSource.getJdbcUrl(), properties);
        } else {
            connection = DriverManager.getConnection(this.baseDataSource.getJdbcUrl(), this.baseDataSource.getUser(), this.baseDataSource.getPassword());
        }
        return connection;
    }

    private void close(ResultSet resultSet, PreparedStatement preparedStatement, Connection connection) {
        if (resultSet != null) {
            try {
                resultSet.close();
            } catch (SQLException e) {
                this.logger.error("close result set error : {}", e.getMessage(), e);
            }
        }
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e2) {
                this.logger.error("close prepared statement error : {}", e2.getMessage(), e2);
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e3) {
                this.logger.error("close connection error : {}", e3.getMessage(), e3);
            }
        }
    }

    private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
        boolean z = TaskTimeoutStrategy.of(this.taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED || TaskTimeoutStrategy.of(this.taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
        PreparedStatement prepareStatement = connection.prepareStatement(sqlBinds.getSql());
        if (z) {
            prepareStatement.setQueryTimeout(this.taskExecutionContext.getTaskTimeout());
        }
        Map paramsMap = sqlBinds.getParamsMap();
        if (paramsMap != null) {
            for (Map.Entry entry : paramsMap.entrySet()) {
                Property property = (Property) entry.getValue();
                ParameterUtils.setInParameter(((Integer) entry.getKey()).intValue(), prepareStatement, property.getType(), property.getValue());
            }
        }
        this.logger.info("prepare statement replace sql : {} ", prepareStatement);
        return prepareStatement;
    }

    public void sendAttachment(String str, String str2) {
        List queryUserByAlertGroupId = this.alertDao.queryUserByAlertGroupId(this.taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
        ArrayList arrayList = new ArrayList();
        Iterator it = queryUserByAlertGroupId.iterator();
        while (it.hasNext()) {
            arrayList.add(((User) it.next()).getEmail().trim());
        }
        String receivers = this.sqlParameters.getReceivers();
        if (StringUtils.isNotEmpty(receivers)) {
            for (String str3 : receivers.split(",")) {
                arrayList.add(str3.trim());
            }
        }
        ArrayList arrayList2 = new ArrayList();
        String receiversCc = this.sqlParameters.getReceiversCc();
        if (StringUtils.isNotEmpty(receiversCc)) {
            for (String str4 : receiversCc.split(",")) {
                arrayList2.add(str4.trim());
            }
        }
        String trim = this.sqlParameters.getShowType().replace(",", "").trim();
        if (!EnumUtils.isValidEnum(ShowType.class, trim)) {
            this.logger.error("showType: {} is not valid ", trim);
            throw new RuntimeException(String.format("showType: %s is not valid ", trim));
        }
        if (!((Boolean) MailUtils.sendMails(arrayList, arrayList2, str, str2, ShowType.valueOf(trim).getDescp()).get("status")).booleanValue()) {
            throw new RuntimeException("send mail failed!");
        }
    }

    public void setSqlParamsMap(String str, String str2, Map<Integer, Property> map, Map<String, Property> map2) {
        Matcher matcher = Pattern.compile(str2).matcher(str);
        int i = 1;
        while (matcher.find()) {
            String group = matcher.group(1);
            Property property = map2.get(group);
            if (property == null) {
                this.logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance with id: {}. So couldn't put Property in sqlParamsMap.", group, Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
            } else {
                map.put(Integer.valueOf(i), property);
                i++;
                this.logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", group, str);
            }
        }
    }

    public void printReplacedSql(String str, String str2, String str3, Map<Integer, Property> map) {
        this.logger.info("after replace sql , preparing : {}", str2);
        StringBuilder sb = new StringBuilder("replaced sql , parameters:");
        if (map == null) {
            this.logger.info("printReplacedSql: sqlParamsMap is null.");
        } else {
            for (int i = 1; i <= map.size(); i++) {
                sb.append(map.get(Integer.valueOf(i)).getValue() + "(" + map.get(Integer.valueOf(i)).getType() + ")");
            }
        }
        this.logger.info("Sql Params are {}", sb);
    }
}
