package org.apache.dolphinscheduler.server.worker;

import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.AbstractServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan({"org.apache.dolphinscheduler"})
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/WorkerServer.class */
public class WorkerServer extends AbstractServer {
    private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
    private ZKWorkerClient zkWorkerClient = null;

    @Autowired
    private ProcessDao processDao;

    @Autowired
    private AlertDao alertDao;
    private ScheduledExecutorService heartbeatWorkerService;
    protected ITaskQueue taskQueue;
    private ExecutorService killExecutorService;
    private ExecutorService fetchTaskExecutorService;

    @Autowired
    private SpringApplicationContext springApplicationContext;
    private CountDownLatch latch;

    @Value("${server.is-combined-server:false}")
    private Boolean isCombinedServer;

    public static void main(String[] strArr) {
        SpringApplication.run(WorkerServer.class, strArr);
    }

    @PostConstruct
    public void run() {
        try {
            conf = new PropertiesConfiguration("worker.properties");
        } catch (ConfigurationException e) {
            logger.error("load configuration failed", e);
            System.exit(1);
        }
        this.zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
        this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
        this.heartBeatInterval = conf.getInt("worker.heartbeat.interval", 60);
        this.heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", 1);
        Runnable heartBeatThread = heartBeatThread();
        this.zkWorkerClient.setStoppable(this);
        this.heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5L, this.heartBeatInterval, TimeUnit.SECONDS);
        this.killExecutorService.execute(getKillProcessThread());
        this.fetchTaskExecutorService.execute(new FetchTaskThread(conf.getInt("worker.fetch.task.num", 1), this.zkWorkerClient, this.processDao, conf, this.taskQueue));
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: org.apache.dolphinscheduler.server.worker.WorkerServer.1
            @Override // java.lang.Runnable
            public void run() {
                if (WorkerServer.this.zkWorkerClient.getActiveMasterNum() <= 1) {
                    for (int i = 0; i < 3; i++) {
                        WorkerServer.this.alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
                    }
                }
                WorkerServer.this.stop("shutdownhook");
            }
        }));
        this.latch = new CountDownLatch(1);
        if (this.isCombinedServer.booleanValue()) {
            return;
        }
        try {
            this.latch.await();
        } catch (InterruptedException e2) {
        }
    }

    @Override // org.apache.dolphinscheduler.server.master.AbstractServer
    public synchronized void stop(String str) {
        try {
            if (Stopper.isStoped()) {
                return;
            }
            logger.info("worker server is stopping ..., cause : {}", str);
            Stopper.stop();
            try {
                Thread.sleep(3000L);
            } catch (Exception e) {
                logger.warn("thread sleep exception:" + e.getMessage(), e);
            }
            try {
                this.heartbeatWorkerService.shutdownNow();
            } catch (Exception e2) {
                logger.warn("heartbeat service stopped exception");
            }
            logger.info("heartbeat service stopped");
            try {
                ThreadPoolExecutors.getInstance().shutdown();
            } catch (Exception e3) {
                logger.warn("threadpool service stopped exception:{}", e3.getMessage());
            }
            logger.info("threadpool service stopped");
            try {
                this.killExecutorService.shutdownNow();
            } catch (Exception e4) {
                logger.warn("worker kill executor service stopped exception:{}", e4.getMessage());
            }
            logger.info("worker kill executor service stopped");
            try {
                this.fetchTaskExecutorService.shutdownNow();
            } catch (Exception e5) {
                logger.warn("worker fetch task service stopped exception:{}", e5.getMessage());
            }
            logger.info("worker fetch task service stopped");
            try {
                this.zkWorkerClient.close();
            } catch (Exception e6) {
                logger.warn("zookeeper service stopped exception:{}", e6.getMessage());
            }
            this.latch.countDown();
            logger.info("zookeeper service stopped");
        } catch (Exception e7) {
            logger.error("worker server stop exception : " + e7.getMessage(), e7);
            System.exit(-1);
        }
    }

    private Runnable heartBeatThread() {
        return new Runnable() { // from class: org.apache.dolphinscheduler.server.worker.WorkerServer.2
            @Override // java.lang.Runnable
            public void run() {
                if (StringUtils.isEmpty(WorkerServer.this.zkWorkerClient.getWorkerZNode())) {
                    WorkerServer.logger.error("worker send heartbeat to zk failed");
                }
                WorkerServer.this.zkWorkerClient.heartBeatForZk(WorkerServer.this.zkWorkerClient.getWorkerZNode(), "worker");
            }
        };
    }

    private Runnable getKillProcessThread() {
        return new Runnable() { // from class: org.apache.dolphinscheduler.server.worker.WorkerServer.3
            @Override // java.lang.Runnable
            public void run() {
                Set smembers = WorkerServer.this.taskQueue.smembers("tasks_kill");
                while (true) {
                    Set<String> set = smembers;
                    if (!Stopper.isRunning()) {
                        return;
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        WorkerServer.logger.error("interrupted exception", e);
                    }
                    if (CollectionUtils.isNotEmpty(set)) {
                        for (String str : set) {
                            WorkerServer.this.killTask(str, WorkerServer.this.processDao);
                            WorkerServer.this.removeKillInfoFromQueue(str);
                        }
                    }
                    smembers = WorkerServer.this.taskQueue.smembers("tasks_kill");
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void killTask(String str, ProcessDao processDao) {
        logger.info("get one kill command from tasks kill queue: " + str);
        String[] split = str.split("-");
        if (split.length != 2) {
            logger.error("error format kill info: " + str);
            return;
        }
        String str2 = split[0];
        TaskInstance taskInstanceDetailByTaskId = processDao.getTaskInstanceDetailByTaskId(Integer.parseInt(split[1]));
        if (taskInstanceDetailByTaskId == null) {
            logger.error("cannot find the kill task :" + str);
            return;
        }
        if (str2.equals("NULL") && StringUtils.isEmpty(taskInstanceDetailByTaskId.getHost())) {
            deleteTaskFromQueue(taskInstanceDetailByTaskId, processDao);
            taskInstanceDetailByTaskId.setState(ExecutionStatus.KILL);
            processDao.saveTaskInstance(taskInstanceDetailByTaskId);
        } else if (taskInstanceDetailByTaskId.getTaskType().equals(TaskType.DEPENDENT.toString())) {
            taskInstanceDetailByTaskId.setState(ExecutionStatus.KILL);
            processDao.saveTaskInstance(taskInstanceDetailByTaskId);
        } else if (taskInstanceDetailByTaskId.getState().typeIsFinished()) {
            logger.info("the task aleady finish: task id: " + taskInstanceDetailByTaskId.getId() + " state: " + taskInstanceDetailByTaskId.getState().toString());
        } else {
            ProcessUtils.kill(taskInstanceDetailByTaskId);
        }
    }

    private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessDao processDao) {
        InterProcessMutex interProcessMutex = null;
        logger.info("delete task from tasks queue: " + taskInstance.getId());
        try {
            try {
                interProcessMutex = this.zkWorkerClient.acquireZkLock(this.zkWorkerClient.getZkClient(), this.zkWorkerClient.getWorkerLockPath());
                if (processDao.checkTaskExistsInTaskQueue(taskInstance)) {
                    this.taskQueue.removeNode("tasks_queue", processDao.taskZkInfo(taskInstance));
                }
                AbstractZKClient.releaseMutex(interProcessMutex);
            } catch (Exception e) {
                logger.error("remove task thread failure", e);
                AbstractZKClient.releaseMutex(interProcessMutex);
            }
        } catch (Throwable th) {
            AbstractZKClient.releaseMutex(interProcessMutex);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeKillInfoFromQueue(String str) {
        this.taskQueue.srem("tasks_kill", str);
    }
}
