/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.common.queue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskQueueZkImpl
implements ITaskQueue {
    private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
    private final ZookeeperOperator zookeeperOperator;

    @Autowired
    public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
        this.zookeeperOperator = zookeeperOperator;
        try {
            String tasksQueuePath = this.getTasksPath("tasks_queue");
            String tasksCancelPath = this.getTasksPath("tasks_kill");
            for (String key : new String[]{tasksQueuePath, tasksCancelPath}) {
                if (zookeeperOperator.isExisted(key)) continue;
                zookeeperOperator.persist(key, "");
                logger.info("create tasks queue parent node success : {}", (Object)key);
            }
        }
        catch (Exception e) {
            logger.error("create tasks queue parent node failure", (Throwable)e);
        }
    }

    @Override
    public List<String> getAllTasks(String key) {
        try {
            List<String> list = this.zookeeperOperator.getChildrenKeys(this.getTasksPath(key));
            return list;
        }
        catch (Exception e) {
            logger.error("get all tasks from tasks queue exception", (Throwable)e);
            return new ArrayList<String>();
        }
    }

    @Override
    public boolean checkTaskExists(String key, String task) {
        String taskPath = this.getTasksPath(key) + "/" + task;
        return this.zookeeperOperator.isExisted(taskPath);
    }

    @Override
    public boolean add(String key, String value) {
        try {
            String taskIdPath = this.getTasksPath(key) + "/" + value;
            this.zookeeperOperator.persist(taskIdPath, value);
            return true;
        }
        catch (Exception e) {
            logger.error("add task to tasks queue exception", (Throwable)e);
            return false;
        }
    }

    @Override
    public List<String> poll(String key, int tasksNum) {
        try {
            List<String> list = this.zookeeperOperator.getChildrenKeys(this.getTasksPath(key));
            if (list != null && list.size() > 0) {
                String workerIp = OSUtils.getHost();
                String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
                int size = list.size();
                TreeSet<String> taskTreeSet = new TreeSet<String>(new Comparator<String>(){

                    @Override
                    public int compare(String o1, String o2) {
                        String[] s2Array;
                        String s1 = o1;
                        String s2 = o2;
                        String[] s1Array = s1.split("_");
                        if (s1Array.length > 4) {
                            s1 = s1.substring(0, s1.lastIndexOf("_"));
                        }
                        if ((s2Array = s2.split("_")).length > 4) {
                            s2 = s2.substring(0, s2.lastIndexOf("_"));
                        }
                        return s1.compareTo(s2);
                    }
                });
                for (int i = 0; i < size; ++i) {
                    String taskDetail = list.get(i);
                    String[] taskDetailArrs = taskDetail.split("_");
                    if (taskDetailArrs.length < 4) continue;
                    String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
                    if (taskDetailArrs.length > 4) {
                        String[] taskHostsArr;
                        String taskHosts = taskDetailArrs[4];
                        if (!taskHosts.equals(String.valueOf(-1)) && !Arrays.asList(taskHostsArr = taskHosts.split(",")).contains(workerIpLongStr)) continue;
                        formatTask = formatTask + "_" + taskDetailArrs[4];
                    }
                    taskTreeSet.add(formatTask);
                }
                List<String> taskslist = this.getTasksListFromTreeSet(tasksNum, taskTreeSet);
                logger.info("consume tasks: {},there still have {} tasks need to be executed", (Object)Arrays.toString(taskslist.toArray()), (Object)(size - taskslist.size()));
                return taskslist;
            }
            Thread.sleep(1000L);
        }
        catch (Exception e) {
            logger.error("add task to tasks queue exception", (Throwable)e);
        }
        return new ArrayList<String>();
    }

    public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
        Iterator<String> iterator = taskTreeSet.iterator();
        int j = 0;
        ArrayList<String> taskslist = new ArrayList<String>(tasksNum);
        while (iterator.hasNext() && j++ < tasksNum) {
            String task = iterator.next();
            taskslist.add(this.getOriginTaskFormat(task));
        }
        return taskslist;
    }

    private String getOriginTaskFormat(String formatTask) {
        String[] taskArray = formatTask.split("_");
        if (taskArray.length < 4) {
            return formatTask;
        }
        int processInstanceId = Integer.parseInt(taskArray[1]);
        int taskId = Integer.parseInt(taskArray[3]);
        StringBuilder sb = new StringBuilder(50);
        String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[2], taskId);
        sb.append(destTask);
        if (taskArray.length > 4) {
            for (int index = 4; index < taskArray.length; ++index) {
                sb.append("_").append(taskArray[index]);
            }
        }
        return sb.toString();
    }

    @Override
    public void removeNode(String key, String nodeValue) {
        String tasksQueuePath = this.getTasksPath(key) + "/";
        String taskIdPath = tasksQueuePath + nodeValue;
        logger.info("removeNode task {}", (Object)taskIdPath);
        try {
            this.zookeeperOperator.remove(taskIdPath);
        }
        catch (Exception e) {
            logger.error(String.format("delete task:%s from zookeeper fail, exception:", nodeValue), (Throwable)e);
        }
    }

    @Override
    public void sadd(String key, String value) {
        try {
            if (value != null && value.trim().length() > 0) {
                String path = this.getTasksPath(key) + "/";
                if (!this.zookeeperOperator.isExisted(path + value)) {
                    this.zookeeperOperator.persist(path + value, value);
                    logger.info("add task:{} to tasks set ", (Object)value);
                } else {
                    logger.info("task {} exists in tasks set ", (Object)value);
                }
            } else {
                logger.warn("add host-taskId:{} to tasks set is empty ", (Object)value);
            }
        }
        catch (Exception e) {
            logger.error("add task to tasks set exception", (Throwable)e);
        }
    }

    @Override
    public void srem(String key, String value) {
        try {
            String path = this.getTasksPath(key) + "/";
            this.zookeeperOperator.remove(path + value);
        }
        catch (Exception e) {
            logger.error(String.format("delete task:" + value + " exception", new Object[0]), (Throwable)e);
        }
    }

    @Override
    public Set<String> smembers(String key) {
        HashSet<String> tasksSet = new HashSet<String>();
        try {
            List<String> list = this.zookeeperOperator.getChildrenKeys(this.getTasksPath(key));
            for (String task : list) {
                tasksSet.add(task);
            }
            return tasksSet;
        }
        catch (Exception e) {
            logger.error("get all tasks from tasks queue exception", (Throwable)e);
            return tasksSet;
        }
    }

    @Override
    public void delete() {
        try {
            String tasksQueuePath = this.getTasksPath("tasks_queue");
            String tasksCancelPath = this.getTasksPath("tasks_kill");
            for (String taskQueuePath : new String[]{tasksQueuePath, tasksCancelPath}) {
                if (!this.zookeeperOperator.isExisted(taskQueuePath)) continue;
                List<String> list = this.zookeeperOperator.getChildrenKeys(taskQueuePath);
                for (String task : list) {
                    this.zookeeperOperator.remove(taskQueuePath + "/" + task);
                    logger.info("delete task from tasks queue : {}/{} ", (Object)taskQueuePath, (Object)task);
                }
            }
        }
        catch (Exception e) {
            logger.error("delete all tasks in tasks queue failure", (Throwable)e);
        }
    }

    public String getTasksPath(String key) {
        return this.zookeeperOperator.getZookeeperConfig().getDsRoot() + "/" + key;
    }
}

