package org.wso2.micro.integrator.ntask.coordination.task;

import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.micro.integrator.coordination.ClusterCoordinator;
import org.wso2.micro.integrator.coordination.MemberEventListener;
import org.wso2.micro.integrator.coordination.node.NodeDetail;
import org.wso2.micro.integrator.ntask.common.TaskException;
import org.wso2.micro.integrator.ntask.coordination.TaskCoordinationException;
import org.wso2.micro.integrator.ntask.coordination.task.resolver.TaskLocationResolver;
import org.wso2.micro.integrator.ntask.coordination.task.scehduler.CoordinatedTaskScheduler;
import org.wso2.micro.integrator.ntask.coordination.task.store.TaskStore;
import org.wso2.micro.integrator.ntask.core.impl.standalone.ScheduledTaskManager;
import org.wso2.micro.integrator.ntask.core.internal.CoordinatedTaskScheduleManager;
import org.wso2.micro.integrator.ntask.core.internal.DataHolder;

/* loaded from: input_file:org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.class */
public class TaskEventListener extends MemberEventListener {
    private static final Log LOG = LogFactory.getLog(TaskEventListener.class);
    private DataHolder dataHolder = DataHolder.getInstance();
    private ClusterCoordinator clusterCoordinator = this.dataHolder.getClusterCoordinator();
    private TaskStore taskStore;
    private TaskLocationResolver locationResolver;
    private ScheduledTaskManager taskManager;

    public TaskEventListener(ScheduledTaskManager scheduledTaskManager, TaskStore taskStore, TaskLocationResolver taskLocationResolver) {
        this.taskManager = scheduledTaskManager;
        this.taskStore = taskStore;
        this.locationResolver = taskLocationResolver;
    }

    public void memberAdded(NodeDetail nodeDetail) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Member added : " + nodeDetail.getNodeId());
        }
        if (this.clusterCoordinator.isLeader()) {
            LOG.debug("Current node is leader, hence resolving unassigned tasks upon member addition.");
            try {
                new CoordinatedTaskScheduler(this.taskManager, this.taskStore, this.locationResolver, new ClusterCommunicator(this.clusterCoordinator)).resolveUnassignedNotCompletedTasksAndUpdateStore();
            } catch (TaskCoordinationException e) {
                LOG.error("Exception occurred while resolving un assigned tasks upon member addition " + nodeDetail.getNodeId(), e);
            }
        }
    }

    public void memberRemoved(NodeDetail nodeDetail) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Member removed : " + nodeDetail.getNodeId());
        }
        String nodeId = nodeDetail.getNodeId();
        try {
            this.taskStore.unAssignAndUpdateState(nodeId);
        } catch (TaskCoordinationException e) {
            LOG.error("Error occurred while cleaning the tasks of node " + nodeId, e);
        }
    }

    public void coordinatorChanged(NodeDetail nodeDetail) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Coordinator changed : " + nodeDetail.getNodeId());
        }
    }

    public void becameUnresponsive(String str) {
        LOG.debug("This node became unresponsive.");
        ScheduledExecutorService taskScheduler = this.dataHolder.getTaskScheduler();
        if (taskScheduler != null) {
            LOG.info("Shutting down coordinated task scheduler scheduler since the node became unresponsive.");
            taskScheduler.shutdown();
            this.dataHolder.setTaskScheduler(null);
        }
        this.taskManager.getLocallyRunningCoordinatedTasks().forEach(str2 -> {
            try {
                this.taskManager.stopExecution(str2);
            } catch (TaskException e) {
                LOG.error("Unable to pause the task " + str2, e);
            }
        });
    }

    public void reJoined(String str) {
        LOG.debug("This node re-joined the cluster successfully.");
        try {
            this.taskStore.unAssignAndUpdateState(str);
        } catch (Throwable th) {
            LOG.error("Error occurred while cleaning the tasks of node " + str, th);
        }
        new CoordinatedTaskScheduleManager(this.taskManager, this.taskStore, this.clusterCoordinator, this.locationResolver).startTaskScheduler(" upon rejoining the cluster");
    }
}
