/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.AbstractServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan(value={"org.apache.dolphinscheduler"})
public class WorkerServer
extends AbstractServer {
    private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
    private ZKWorkerClient zkWorkerClient = null;
    @Autowired
    private ProcessDao processDao;
    @Autowired
    private AlertDao alertDao;
    private ScheduledExecutorService heartbeatWorkerService;
    protected ITaskQueue taskQueue;
    private ExecutorService killExecutorService;
    private ExecutorService fetchTaskExecutorService;
    @Autowired
    private SpringApplicationContext springApplicationContext;
    private CountDownLatch latch;
    @Value(value="${server.is-combined-server:false}")
    private Boolean isCombinedServer;

    public static void main(String[] args) {
        SpringApplication.run(WorkerServer.class, (String[])args);
    }

    @PostConstruct
    public void run() {
        try {
            conf = new PropertiesConfiguration("worker.properties");
        }
        catch (ConfigurationException e) {
            logger.error("load configuration failed", (Throwable)e);
            System.exit(1);
        }
        this.zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor((String)"Worker-Kill-Thread-Executor");
        this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor((String)"Worker-Fetch-Thread-Executor");
        this.heartBeatInterval = conf.getInt("worker.heartbeat.interval", 60);
        this.heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor((String)"Worker-Heartbeat-Thread-Executor", (int)1);
        Runnable heartBeatThread = this.heartBeatThread();
        this.zkWorkerClient.setStoppable(this);
        this.heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5L, this.heartBeatInterval, TimeUnit.SECONDS);
        Runnable killProcessThread = this.getKillProcessThread();
        this.killExecutorService.execute(killProcessThread);
        int taskNum = conf.getInt("worker.fetch.task.num", 1);
        FetchTaskThread fetchTaskThread = new FetchTaskThread(taskNum, this.zkWorkerClient, this.processDao, conf, this.taskQueue);
        this.fetchTaskExecutorService.execute(fetchTaskThread);
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                if (WorkerServer.this.zkWorkerClient.getActiveMasterNum() <= 1) {
                    for (int i = 0; i < 3; ++i) {
                        WorkerServer.this.alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
                    }
                }
                WorkerServer.this.stop("shutdownhook");
            }
        }));
        this.latch = new CountDownLatch(1);
        if (!this.isCombinedServer.booleanValue()) {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public synchronized void stop(String cause) {
        try {
            if (Stopper.isStoped()) {
                return;
            }
            logger.info("worker server is stopping ..., cause : {}", (Object)cause);
            Stopper.stop();
            try {
                Thread.sleep(3000L);
            }
            catch (Exception e) {
                logger.warn("thread sleep exception:" + e.getMessage(), (Throwable)e);
            }
            try {
                this.heartbeatWorkerService.shutdownNow();
            }
            catch (Exception e) {
                logger.warn("heartbeat service stopped exception");
            }
            logger.info("heartbeat service stopped");
            try {
                ThreadPoolExecutors.getInstance().shutdown();
            }
            catch (Exception e) {
                logger.warn("threadpool service stopped exception:{}", (Object)e.getMessage());
            }
            logger.info("threadpool service stopped");
            try {
                this.killExecutorService.shutdownNow();
            }
            catch (Exception e) {
                logger.warn("worker kill executor service stopped exception:{}", (Object)e.getMessage());
            }
            logger.info("worker kill executor service stopped");
            try {
                this.fetchTaskExecutorService.shutdownNow();
            }
            catch (Exception e) {
                logger.warn("worker fetch task service stopped exception:{}", (Object)e.getMessage());
            }
            logger.info("worker fetch task service stopped");
            try {
                this.zkWorkerClient.close();
            }
            catch (Exception e) {
                logger.warn("zookeeper service stopped exception:{}", (Object)e.getMessage());
            }
            this.latch.countDown();
            logger.info("zookeeper service stopped");
        }
        catch (Exception e) {
            logger.error("worker server stop exception : " + e.getMessage(), (Throwable)e);
            System.exit(-1);
        }
    }

    private Runnable heartBeatThread() {
        Runnable heartBeatThread = new Runnable(){

            @Override
            public void run() {
                if (StringUtils.isEmpty((String)WorkerServer.this.zkWorkerClient.getWorkerZNode())) {
                    logger.error("worker send heartbeat to zk failed");
                }
                WorkerServer.this.zkWorkerClient.heartBeatForZk(WorkerServer.this.zkWorkerClient.getWorkerZNode(), "worker");
            }
        };
        return heartBeatThread;
    }

    private Runnable getKillProcessThread() {
        Runnable killProcessThread = new Runnable(){

            @Override
            public void run() {
                Set taskInfoSet = WorkerServer.this.taskQueue.smembers("tasks_kill");
                while (Stopper.isRunning()) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        logger.error("interrupted exception", (Throwable)e);
                    }
                    if (CollectionUtils.isNotEmpty((Collection)taskInfoSet)) {
                        for (String taskInfo : taskInfoSet) {
                            WorkerServer.this.killTask(taskInfo, WorkerServer.this.processDao);
                            WorkerServer.this.removeKillInfoFromQueue(taskInfo);
                        }
                    }
                    taskInfoSet = WorkerServer.this.taskQueue.smembers("tasks_kill");
                }
            }
        };
        return killProcessThread;
    }

    private void killTask(String taskInfo, ProcessDao pd) {
        logger.info("get one kill command from tasks kill queue: " + taskInfo);
        String[] taskInfoArray = taskInfo.split("-");
        if (taskInfoArray.length != 2) {
            logger.error("error format kill info: " + taskInfo);
            return;
        }
        String host = taskInfoArray[0];
        int taskInstanceId = Integer.parseInt(taskInfoArray[1]);
        TaskInstance taskInstance = pd.getTaskInstanceDetailByTaskId(taskInstanceId);
        if (taskInstance == null) {
            logger.error("cannot find the kill task :" + taskInfo);
            return;
        }
        if (host.equals("NULL") && StringUtils.isEmpty((String)taskInstance.getHost())) {
            this.deleteTaskFromQueue(taskInstance, pd);
            taskInstance.setState(ExecutionStatus.KILL);
            pd.saveTaskInstance(taskInstance);
        } else if (taskInstance.getTaskType().equals(TaskType.DEPENDENT.toString())) {
            taskInstance.setState(ExecutionStatus.KILL);
            pd.saveTaskInstance(taskInstance);
        } else if (!taskInstance.getState().typeIsFinished()) {
            ProcessUtils.kill(taskInstance);
        } else {
            logger.info("the task aleady finish: task id: " + taskInstance.getId() + " state: " + taskInstance.getState().toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessDao pd) {
        InterProcessMutex mutex = null;
        logger.info("delete task from tasks queue: " + taskInstance.getId());
        try {
            mutex = this.zkWorkerClient.acquireZkLock(this.zkWorkerClient.getZkClient(), this.zkWorkerClient.getWorkerLockPath());
            if (pd.checkTaskExistsInTaskQueue(taskInstance)) {
                String taskQueueStr = pd.taskZkInfo(taskInstance);
                this.taskQueue.removeNode("tasks_queue", taskQueueStr);
            }
            AbstractZKClient.releaseMutex((InterProcessMutex)mutex);
        }
        catch (Exception e) {
            logger.error("remove task thread failure", (Throwable)e);
        }
        finally {
            AbstractZKClient.releaseMutex(mutex);
        }
    }

    private void removeKillInfoFromQueue(String taskInfo) {
        this.taskQueue.srem("tasks_kill", taskInfo);
    }
}

