/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.task.flink;

import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;

public class FlinkTask
extends AbstractYarnTask {
    private static final String FLINK_COMMAND = "flink";
    private static final String FLINK_RUN = "run";
    private FlinkParameters flinkParameters;

    public FlinkTask(TaskProps props, Logger logger) {
        super(props, logger);
    }

    @Override
    public void init() {
        this.logger.info("flink task params {}", (Object)this.taskProps.getTaskParams());
        this.flinkParameters = (FlinkParameters)JSONUtils.parseObject((String)this.taskProps.getTaskParams(), FlinkParameters.class);
        if (!this.flinkParameters.checkParameters()) {
            throw new RuntimeException("flink task params is not valid");
        }
        this.flinkParameters.setQueue(this.taskProps.getQueue());
        if (StringUtils.isNotEmpty((CharSequence)this.flinkParameters.getMainArgs())) {
            String args = this.flinkParameters.getMainArgs();
            ProcessInstance processInstance = this.processDao.findProcessInstanceByTaskId(this.taskProps.getTaskInstId());
            Map<String, Property> paramsMap = ParamUtils.convert(this.taskProps.getUserDefParamsMap(), this.taskProps.getDefinedParams(), this.flinkParameters.getLocalParametersMap(), processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime());
            this.logger.info("param Map : {}", paramsMap);
            if (paramsMap != null) {
                args = ParameterUtils.convertParameterPlaceholders((String)args, ParamUtils.convert(paramsMap));
                this.logger.info("param args : {}", (Object)args);
            }
            this.flinkParameters.setMainArgs(args);
        }
    }

    @Override
    protected String buildCommand() {
        ArrayList<String> args = new ArrayList<String>();
        args.add(FLINK_COMMAND);
        args.add(FLINK_RUN);
        this.logger.info("flink task args : {}", args);
        args.addAll(FlinkArgsUtils.buildArgs(this.flinkParameters));
        String command = ParameterUtils.convertParameterPlaceholders((String)String.join((CharSequence)" ", args), this.taskProps.getDefinedParams());
        this.logger.info("flink task command : {}", (Object)command);
        return command;
    }

    @Override
    public AbstractParameters getParameters() {
        return this.flinkParameters;
    }
}

