/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.schedule;

import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologyTaskHbInfo;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitorRunnable
implements Runnable {
    private static Logger LOG = LoggerFactory.getLogger(MonitorRunnable.class);
    private NimbusData data;

    public MonitorRunnable(NimbusData data) {
        this.data = data;
    }

    @Override
    public void run() {
        StormClusterState clusterState = this.data.getStormClusterState();
        try {
            List<String> active_topologys = clusterState.assignments(null);
            if (active_topologys == null) {
                LOG.info("Failed to get active topologies");
                return;
            }
            for (String topologyid : active_topologys) {
                if (clusterState.storm_base(topologyid, null) == null) continue;
                LOG.debug("Check tasks " + topologyid);
                Set<Integer> taskIds = clusterState.task_ids(topologyid);
                if (taskIds == null) {
                    LOG.info("Failed to get task ids of " + topologyid);
                    continue;
                }
                Assignment assignment = clusterState.assignment_info(topologyid, null);
                HashSet<Integer> deadTasks = new HashSet<Integer>();
                boolean needReassign = false;
                for (Integer task : taskIds) {
                    boolean isTaskDead = NimbusUtils.isTaskDead(this.data, topologyid, task);
                    if (!isTaskDead) continue;
                    deadTasks.add(task);
                    needReassign = true;
                }
                TopologyTaskHbInfo topologyHbInfo = this.data.getTasksHeartbeat().get(topologyid);
                if (needReassign) {
                    if (topologyHbInfo != null) {
                        int topologyMasterId = topologyHbInfo.get_topologyMasterId();
                        if (deadTasks.contains(topologyMasterId)) {
                            deadTasks.clear();
                            if (assignment != null) {
                                ResourceWorkerSlot resource = assignment.getWorkerByTaskId(topologyMasterId);
                                if (resource != null) {
                                    deadTasks.addAll(resource.getTasks());
                                } else {
                                    deadTasks.add(topologyMasterId);
                                }
                            }
                        } else {
                            Map<Integer, TaskHeartbeat> taskHbs = topologyHbInfo.get_taskHbs();
                            int launchTime = JStormUtils.parseInt(this.data.getConf().get("nimbus.task.launch.secs"));
                            if (taskHbs == null || taskHbs.get(topologyMasterId) == null || taskHbs.get(topologyMasterId).get_uptime() < launchTime) {
                                return;
                            }
                        }
                        HashMap<Integer, ResourceWorkerSlot> deadTaskWorkers = new HashMap<Integer, ResourceWorkerSlot>();
                        for (Integer task : deadTasks) {
                            LOG.info("Found " + topologyid + ",taskid:" + task + " is dead");
                            ResourceWorkerSlot resource = null;
                            if (assignment != null) {
                                resource = assignment.getWorkerByTaskId(task);
                            }
                            if (resource == null) continue;
                            deadTaskWorkers.put(task, resource);
                            Date now = new Date();
                            String nowStr = TimeFormat.getSecond(now);
                            String errorInfo = "Task-" + task + " is dead on " + resource.getHostname() + ":" + resource.getPort() + ", " + nowStr;
                            LOG.info(errorInfo);
                            clusterState.report_task_error(topologyid, task, errorInfo, "error", 300, 259200);
                        }
                        if (deadTaskWorkers.size() > 0) {
                            TopologyMetricsRunnable.TaskDeadEvent event = new TopologyMetricsRunnable.TaskDeadEvent();
                            event.clusterName = this.data.getClusterName();
                            event.topologyId = topologyid;
                            event.deadTasks = deadTaskWorkers;
                            event.timestamp = System.currentTimeMillis();
                            this.data.getMetricRunnable().pushEvent(event);
                        }
                    }
                    NimbusUtils.transition(this.data, topologyid, false, StatusType.monitor, new Object[0]);
                }
                if (topologyHbInfo == null) continue;
                try {
                    clusterState.topology_heartbeat(topologyid, topologyHbInfo);
                }
                catch (Exception e) {
                    LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, (Throwable)e);
                }
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }
}

