package org.wso2.micro.integrator.ntask.coordination.task.scehduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.micro.integrator.coordination.ClusterCoordinator;
import org.wso2.micro.integrator.ntask.common.TaskException;
import org.wso2.micro.integrator.ntask.coordination.TaskCoordinationException;
import org.wso2.micro.integrator.ntask.coordination.task.ClusterCommunicator;
import org.wso2.micro.integrator.ntask.coordination.task.CoordinatedTask;
import org.wso2.micro.integrator.ntask.coordination.task.resolver.TaskLocationResolver;
import org.wso2.micro.integrator.ntask.coordination.task.store.TaskStore;
import org.wso2.micro.integrator.ntask.coordination.task.store.cleaner.TaskStoreCleaner;
import org.wso2.micro.integrator.ntask.core.impl.standalone.ScheduledTaskManager;
import org.wso2.micro.integrator.ntask.core.internal.DataHolder;

/* loaded from: input_file:org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.class */
public class CoordinatedTaskScheduler implements Runnable {
    private static final Log LOG = LogFactory.getLog(CoordinatedTaskScheduler.class);
    private DataHolder dataHolder;
    private TaskLocationResolver taskLocationResolver;
    private ClusterCoordinator clusterCoordinator;
    private TaskStore taskStore;
    private TaskStoreCleaner taskStoreCleaner;
    private int resolvingFrequency;
    private int resolveCount;
    private ClusterCommunicator clusterCommunicator;
    private ScheduledTaskManager taskManager;
    private String localNodeId;

    public CoordinatedTaskScheduler(ScheduledTaskManager scheduledTaskManager, TaskStore taskStore, TaskLocationResolver taskLocationResolver, ClusterCommunicator clusterCommunicator) {
        this(scheduledTaskManager, taskStore, taskLocationResolver, clusterCommunicator, null, 1);
    }

    public CoordinatedTaskScheduler(ScheduledTaskManager scheduledTaskManager, TaskStore taskStore, TaskLocationResolver taskLocationResolver, ClusterCommunicator clusterCommunicator, TaskStoreCleaner taskStoreCleaner, int i) {
        this.dataHolder = DataHolder.getInstance();
        this.clusterCoordinator = this.dataHolder.getClusterCoordinator();
        this.resolveCount = 0;
        this.taskManager = scheduledTaskManager;
        this.taskStore = taskStore;
        this.taskLocationResolver = taskLocationResolver;
        this.clusterCommunicator = clusterCommunicator;
        this.taskStoreCleaner = taskStoreCleaner;
        this.resolvingFrequency = i;
        this.localNodeId = this.clusterCoordinator.getThisNodeId();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            pauseDeactivatedTasks();
            scheduleAssignedTasks(CoordinatedTask.States.ACTIVATED);
            if (this.clusterCoordinator.isLeader()) {
                if (this.resolveCount % this.resolvingFrequency == 0) {
                    LOG.debug("This node is leader hence cleaning task store.");
                    this.taskStoreCleaner.clean();
                    this.resolveCount = 0;
                }
                LOG.debug("This node is leader hence resolving unassigned tasks.");
                addFailedTasks();
                this.resolveCount++;
                resolveUnassignedNotCompletedTasksAndUpdateStore();
            } else {
                LOG.debug("This node is not leader. Hence not cleaning task store or resolving un assigned tasks.");
            }
            scheduleAssignedTasks(CoordinatedTask.States.NONE);
        } catch (Throwable th) {
            LOG.fatal("Unexpected error occurred while trying to schedule tasks.", th);
        }
    }

    private void pauseDeactivatedTasks() throws TaskCoordinationException {
        List<String> retrieveTaskNames = this.taskStore.retrieveTaskNames(this.localNodeId, CoordinatedTask.States.DEACTIVATED);
        if (LOG.isDebugEnabled()) {
            Stream<R> map = retrieveTaskNames.stream().map(str -> {
                return "Task [" + str + "] retrieved in [" + CoordinatedTask.States.DEACTIVATED + "] state.";
            });
            Log log = LOG;
            log.getClass();
            map.forEachOrdered((v1) -> {
                r1.debug(v1);
            });
        }
        ScheduledTaskManager scheduledTaskManager = this.taskManager;
        ArrayList arrayList = new ArrayList();
        List<String> allCoordinatedTasksDeployed = this.taskManager.getAllCoordinatedTasksDeployed();
        retrieveTaskNames.forEach(str2 -> {
            if (!allCoordinatedTasksDeployed.contains(str2)) {
                LOG.info("The task [" + str2 + "] retrieved to be paused is not a deployed task in this node or an invalid entry, hence ignoring it.");
                return;
            }
            try {
                scheduledTaskManager.stopExecution(str2);
                arrayList.add(str2);
            } catch (TaskException e) {
                LOG.error("Error stopping the task [" + str2 + "]", e);
            }
        });
        this.taskStore.updateTaskState(arrayList, CoordinatedTask.States.PAUSED);
    }

    private void addFailedTasks() throws TaskCoordinationException {
        List<String> additionFailedTasks = this.taskManager.getAdditionFailedTasks();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Following list of tasks were found in the failed list.");
            Log log = LOG;
            log.getClass();
            additionFailedTasks.forEach((v1) -> {
                r1.debug(v1);
            });
        }
        for (String str : additionFailedTasks) {
            this.taskStore.addTaskIfNotExist(str);
            this.taskManager.removeTaskFromAdditionFailedTaskList(str);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully added the failed task [" + str + "]");
            }
        }
    }

    private void scheduleAssignedTasks(CoordinatedTask.States states) throws TaskCoordinationException {
        LOG.debug("Retrieving tasks assigned to this node and to be scheduled.");
        List<String> retrieveTaskNames = this.taskStore.retrieveTaskNames(this.localNodeId, states);
        if (retrieveTaskNames.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No tasks assigned to this node to be scheduled in state " + states);
                return;
            }
            return;
        }
        List<String> allCoordinatedTasksDeployed = this.taskManager.getAllCoordinatedTasksDeployed();
        ArrayList arrayList = new ArrayList();
        for (String str : retrieveTaskNames) {
            if (allCoordinatedTasksDeployed.contains(str)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Submitting retrieved task [" + str + "] to the task manager.");
                }
                try {
                    this.taskManager.scheduleCoordinatedTask(str);
                } catch (TaskException e) {
                    if (!TaskException.Code.DATABASE_ERROR.equals(e.getCode())) {
                        arrayList.add(str);
                    }
                    LOG.error("Exception occurred while scheduling coordinated task : " + str, e);
                }
            } else {
                LOG.info("The task [" + str + "] retrieved to be scheduled is not a deployed task in this node or an invalid entry, hence ignoring it.");
            }
        }
        this.taskStore.updateTaskState(arrayList, CoordinatedTask.States.NONE);
    }

    public synchronized void resolveUnassignedNotCompletedTasksAndUpdateStore() throws TaskCoordinationException {
        List<String> retrieveAllUnAssignedAndIncompleteTasks = this.taskStore.retrieveAllUnAssignedAndIncompleteTasks();
        if (retrieveAllUnAssignedAndIncompleteTasks.isEmpty()) {
            LOG.debug("No un assigned tasks found.");
            return;
        }
        HashMap hashMap = new HashMap();
        retrieveAllUnAssignedAndIncompleteTasks.forEach(str -> {
            String taskNodeLocation = this.taskLocationResolver.getTaskNodeLocation(this.clusterCommunicator, str);
            if (taskNodeLocation != null) {
                hashMap.put(str, taskNodeLocation);
            }
        });
        this.taskStore.updateAssignmentAndState(hashMap);
    }
}
