/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.NimbusStat;
import backtype.storm.generated.NimbusSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.utils.ThriftTopologyUtils;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NimbusUtils {
    private static Logger LOG = LoggerFactory.getLogger(NimbusUtils.class);

    private static Map mapifySerializations(List sers) {
        HashMap rtn = new HashMap();
        if (sers != null) {
            int size = sers.size();
            for (int i = 0; i < size; ++i) {
                if (sers.get(i) instanceof Map) {
                    rtn.putAll((Map)sers.get(i));
                    continue;
                }
                rtn.put(sers.get(i), null);
            }
        }
        return rtn;
    }

    public static Map normalizeConf(Map conf, Map stormConf, StormTopology topology) throws Exception {
        Object totalDecorator;
        ArrayList<Object> kryoRegisterList = new ArrayList<Object>();
        ArrayList<Object> kryoDecoratorList = new ArrayList<Object>();
        HashMap totalConf = new HashMap();
        totalConf.putAll(conf);
        totalConf.putAll(stormConf);
        Object totalRegister = totalConf.get("topology.kryo.register");
        if (totalRegister != null) {
            LOG.info("topology:" + stormConf.get("topology.name") + ", TOPOLOGY_KRYO_REGISTER" + totalRegister.getClass().getName());
            JStormUtils.mergeList(kryoRegisterList, totalRegister);
        }
        if ((totalDecorator = totalConf.get("topology.kryo.decorators")) != null) {
            LOG.info("topology:" + stormConf.get("topology.name") + ", TOPOLOGY_KRYO_DECORATOR" + totalDecorator.getClass().getName());
            JStormUtils.mergeList(kryoDecoratorList, totalDecorator);
        }
        Set<String> cids = ThriftTopologyUtils.getComponentIds(topology);
        for (String componentId : cids) {
            Object componentDecorator;
            ComponentCommon common = ThriftTopologyUtils.getComponentCommon(topology, componentId);
            String json = common.get_json_conf();
            if (json == null) continue;
            Map mtmp = (Map)JStormUtils.from_json(json);
            if (mtmp == null) {
                StringBuilder sb = new StringBuilder();
                sb.append("Failed to deserilaize " + componentId);
                sb.append(" json configuration: ");
                sb.append(json);
                LOG.info(sb.toString());
                throw new Exception(sb.toString());
            }
            Object componentKryoRegister = mtmp.get("topology.kryo.register");
            if (componentKryoRegister != null) {
                LOG.info("topology:" + stormConf.get("topology.name") + ", componentId:" + componentId + ", TOPOLOGY_KRYO_REGISTER" + componentKryoRegister.getClass().getName());
                JStormUtils.mergeList(kryoRegisterList, componentKryoRegister);
            }
            if ((componentDecorator = mtmp.get("topology.kryo.decorators")) == null) continue;
            LOG.info("topology:" + stormConf.get("topology.name") + ", componentId:" + componentId + ", TOPOLOGY_KRYO_DECORATOR" + componentDecorator.getClass().getName());
            JStormUtils.mergeList(kryoDecoratorList, componentDecorator);
        }
        Map kryoRegisterMap = NimbusUtils.mapifySerializations(kryoRegisterList);
        List<Object> decoratorList = JStormUtils.distinctList(kryoDecoratorList);
        Integer ackerNum = JStormUtils.parseInt(totalConf.get("topology.acker.executors"));
        if (ackerNum == null) {
            ackerNum = 1;
        }
        HashMap rtn = new HashMap();
        rtn.put("storm.cluster.mode", conf.get("storm.cluster.mode"));
        rtn.putAll(stormConf);
        rtn.put("topology.kryo.decorators", decoratorList);
        rtn.put("topology.kryo.register", kryoRegisterMap);
        rtn.put("topology.acker.executors", ackerNum);
        rtn.put("topology.max.task.parallelism", totalConf.get("topology.max.task.parallelism"));
        return rtn;
    }

    public static Integer componentParalism(Map stormConf, ComponentCommon common) {
        Object maxTaskParalismObject;
        Integer taskNum;
        HashMap mergeMap = new HashMap();
        mergeMap.putAll(stormConf);
        String jsonConfString = common.get_json_conf();
        if (jsonConfString != null) {
            Map componentMap = (Map)JStormUtils.from_json(jsonConfString);
            mergeMap.putAll(componentMap);
        }
        if ((taskNum = Integer.valueOf(common.get_parallelism_hint())) == null) {
            taskNum = 1;
        }
        if ((maxTaskParalismObject = mergeMap.get("topology.max.task.parallelism")) == null) {
            return taskNum;
        }
        int maxTaskParalism = JStormUtils.parseInt(maxTaskParalismObject);
        return Math.min(maxTaskParalism, taskNum);
    }

    public static StormTopology normalizeTopology(Map stormConf, StormTopology topology, boolean fromConf) {
        StormTopology ret = topology.deepCopy();
        Map<String, Object> rawComponents = ThriftTopologyUtils.getComponents(topology);
        Map<String, Object> components = ThriftTopologyUtils.getComponents(ret);
        if (!rawComponents.keySet().equals(components.keySet())) {
            String errMsg = "Failed to normalize topology binary, maybe due to wrong dependency";
            LOG.info(errMsg + " raw components:" + rawComponents.keySet() + ", normalized " + components.keySet());
            throw new InvalidParameterException(errMsg);
        }
        for (Map.Entry<String, Object> entry : components.entrySet()) {
            Integer paraNum;
            Object component = entry.getValue();
            String componentName = entry.getKey();
            ComponentCommon common = null;
            if (component instanceof Bolt) {
                common = ((Bolt)component).get_common();
                if (fromConf && (paraNum = ConfigExtension.getBoltParallelism(stormConf, componentName)) != null) {
                    LOG.info("Set " + componentName + " as " + paraNum);
                    common.set_parallelism_hint(paraNum);
                }
            }
            if (component instanceof SpoutSpec) {
                common = ((SpoutSpec)component).get_common();
                if (fromConf && (paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName)) != null) {
                    LOG.info("Set " + componentName + " as " + paraNum);
                    common.set_parallelism_hint(paraNum);
                }
            }
            if (component instanceof StateSpoutSpec) {
                common = ((StateSpoutSpec)component).get_common();
                if (fromConf && (paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName)) != null) {
                    LOG.info("Set " + componentName + " as " + paraNum);
                    common.set_parallelism_hint(paraNum);
                }
            }
            HashMap<String, Integer> componentMap = new HashMap<String, Integer>();
            String jsonConfString = common.get_json_conf();
            if (jsonConfString != null) {
                componentMap.putAll((Map)JStormUtils.from_json(jsonConfString));
            }
            Integer taskNum = NimbusUtils.componentParalism(stormConf, common);
            componentMap.put("topology.tasks", taskNum);
            common.set_parallelism_hint(taskNum);
            LOG.info("Set " + componentName + " parallelism " + taskNum);
            common.set_json_conf(JStormUtils.to_json(componentMap));
        }
        return ret;
    }

    public static void cleanupCorruptTopologies(NimbusData data) throws Exception {
        StormClusterState stormClusterState = data.getStormClusterState();
        String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
        List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
        List<String> active_ids = data.getStormClusterState().active_storms();
        if (active_ids != null && active_ids.size() > 0) {
            if (code_ids != null) {
                active_ids.removeAll(code_ids);
            }
            for (String corrupt : active_ids) {
                LOG.info("Corrupt topology " + corrupt + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
                stormClusterState.remove_storm(corrupt);
            }
        }
        LOG.info("Successfully cleanup all old toplogies");
    }

    public static boolean isTaskDead(NimbusData data, String topologyId, Integer taskId) {
        Map<Integer, TkHbCacheTime> taskHBs;
        TkHbCacheTime taskHB;
        String idStr = " topology:" + topologyId + ",taskid:" + taskId;
        TopologyTaskHbInfo topoTasksHbInfo = data.getTasksHeartbeat().get(topologyId);
        Map<Integer, TaskHeartbeat> taskHbMap = null;
        Integer taskReportTime = null;
        if (topoTasksHbInfo != null && (taskHbMap = topoTasksHbInfo.get_taskHbs()) != null) {
            TaskHeartbeat tHb = taskHbMap.get(taskId);
            Integer n = taskReportTime = tHb != null ? Integer.valueOf(tHb.get_time()) : null;
        }
        if ((taskHB = (taskHBs = data.getTaskHeartbeatsCache(topologyId, true)).get(taskId)) == null) {
            LOG.info("No task heartbeat cache " + idStr);
            if (topoTasksHbInfo == null || taskHbMap == null) {
                LOG.info("No task hearbeat was reported for " + idStr);
                return true;
            }
            taskHB = new TkHbCacheTime();
            taskHB.update(taskHbMap.get(taskId));
            taskHBs.put(taskId, taskHB);
            return false;
        }
        if (taskReportTime == null || taskReportTime < taskHB.getTaskAssignedTime()) {
            LOG.debug("No task heartbeat was reported for " + idStr);
            int nowSecs = TimeUtils.current_time_secs();
            int assignSecs = taskHB.getTaskAssignedTime();
            int waitInitTimeout = JStormUtils.parseInt(data.getConf().get("nimbus.task.launch.secs"));
            if (nowSecs - assignSecs > waitInitTimeout) {
                LOG.info(idStr + " failed to init ");
                return true;
            }
            return false;
        }
        int nimbusTime = taskHB.getNimbusTime();
        int reportTime = taskHB.getTaskReportedTime();
        int nowSecs = TimeUtils.current_time_secs();
        if (nimbusTime == 0) {
            taskHB.setNimbusTime(nowSecs);
            taskHB.setTaskReportedTime(taskReportTime);
            LOG.info("Update taskheartbeat to nimbus cache " + idStr);
            return false;
        }
        if (reportTime != taskReportTime) {
            taskHB.setNimbusTime(nowSecs);
            taskHB.setTaskReportedTime(taskReportTime);
            LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:" + taskReportTime + ",report:" + reportTime);
            return false;
        }
        Integer taskHBTimeout = data.getTopologyTaskTimeout().get(topologyId);
        if (taskHBTimeout == null) {
            taskHBTimeout = JStormUtils.parseInt(data.getConf().get("nimbus.task.timeout.secs"));
        }
        if (taskId.intValue() == topoTasksHbInfo.get_topologyMasterId()) {
            taskHBTimeout = taskHBTimeout / 2;
        }
        if (nowSecs - nimbusTime > taskHBTimeout) {
            long ts = (long)nimbusTime * 1000L;
            Date lastTaskHBDate = new Date(ts);
            StringBuilder sb = new StringBuilder();
            sb.append(idStr);
            sb.append(" last tasktime is ");
            sb.append(nimbusTime);
            sb.append(":").append(lastTaskHBDate);
            sb.append(",current ");
            sb.append(nowSecs);
            sb.append(":").append(new Date((long)nowSecs * 1000L));
            LOG.debug(sb.toString());
            return true;
        }
        return false;
    }

    public static void updateTaskHbStartTime(NimbusData data, Assignment assignment, String topologyId) {
        Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true);
        Map<Integer, Integer> taskStartTimes = assignment.getTaskStartTimeSecs();
        for (Map.Entry<Integer, Integer> entry : taskStartTimes.entrySet()) {
            Integer taskId = entry.getKey();
            Integer taskStartTime = entry.getValue();
            TkHbCacheTime taskHB = taskHBs.get(taskId);
            if (taskHB == null) {
                taskHB = new TkHbCacheTime();
                taskHBs.put(taskId, taskHB);
            }
            taskHB.setTaskAssignedTime(taskStartTime);
        }
    }

    public static <T> void transitionName(NimbusData data, String topologyName, boolean errorOnNoTransition, StatusType transition_status, T ... args) throws Exception {
        StormClusterState stormClusterState = data.getStormClusterState();
        String topologyId = Cluster.get_topology_id(stormClusterState, topologyName);
        if (topologyId == null) {
            throw new NotAliveException(topologyName);
        }
        NimbusUtils.transition(data, topologyId, errorOnNoTransition, transition_status, args);
    }

    public static <T> void transition(NimbusData data, String topologyid, boolean errorOnNoTransition, StatusType transition_status, T ... args) {
        try {
            data.getStatusTransition().transition(topologyid, errorOnNoTransition, transition_status, args);
        }
        catch (Exception e) {
            LOG.error("Failed to do status transition,", (Throwable)e);
        }
    }

    public static int getTopologyTaskNum(Assignment assignment) {
        int numTasks = 0;
        for (ResourceWorkerSlot worker : assignment.getWorkers()) {
            numTasks += worker.getTasks().size();
        }
        return numTasks;
    }

    public static List<TopologySummary> getTopologySummary(StormClusterState stormClusterState, Map<String, Assignment> assignments) throws Exception {
        ArrayList<TopologySummary> topologySummaries = new ArrayList<TopologySummary>();
        HashMap<String, StormBase> bases = Cluster.get_all_StormBase(stormClusterState);
        for (Map.Entry entry : bases.entrySet()) {
            String topologyId = (String)entry.getKey();
            StormBase base = (StormBase)entry.getValue();
            Assignment assignment = stormClusterState.assignment_info(topologyId, null);
            if (assignment == null) {
                LOG.error("Failed to get assignment of " + topologyId);
                continue;
            }
            assignments.put(topologyId, assignment);
            int num_workers = assignment.getWorkers().size();
            int num_tasks = NimbusUtils.getTopologyTaskNum(assignment);
            String errorString = null;
            errorString = Cluster.is_topology_exist_error(stormClusterState, topologyId) ? "Y" : "";
            TopologySummary topology = new TopologySummary();
            topology.set_id(topologyId);
            topology.set_name(base.getStormName());
            topology.set_status(base.getStatusString());
            topology.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
            topology.set_numWorkers(num_workers);
            topology.set_numTasks(num_tasks);
            topology.set_errorInfo(errorString);
            topologySummaries.add(topology);
        }
        return topologySummaries;
    }

    public static SupervisorSummary mkSupervisorSummary(SupervisorInfo supervisorInfo, String supervisorId, Map<String, Integer> supervisorToUsedSlotNum) {
        Integer usedNum = supervisorToUsedSlotNum.get(supervisorId);
        SupervisorSummary summary = new SupervisorSummary(supervisorInfo.getHostName(), supervisorId, supervisorInfo.getUptimeSecs(), supervisorInfo.getWorkerPorts().size(), usedNum == null ? 0 : usedNum);
        return summary;
    }

    public static List<SupervisorSummary> mkSupervisorSummaries(Map<String, SupervisorInfo> supervisorInfos, Map<String, Assignment> assignments) {
        HashMap<String, Integer> supervisorToLeftSlotNum = new HashMap<String, Integer>();
        for (Map.Entry<String, Assignment> entry : assignments.entrySet()) {
            Set<ResourceWorkerSlot> workers = entry.getValue().getWorkers();
            for (ResourceWorkerSlot worker : workers) {
                String supervisorId = worker.getNodeId();
                SupervisorInfo supervisorInfo = supervisorInfos.get(supervisorId);
                if (supervisorInfo == null) continue;
                Integer slots = (Integer)supervisorToLeftSlotNum.get(supervisorId);
                if (slots == null) {
                    slots = 0;
                    supervisorToLeftSlotNum.put(supervisorId, slots);
                }
                slots = slots + 1;
                supervisorToLeftSlotNum.put(supervisorId, slots);
            }
        }
        ArrayList<SupervisorSummary> ret = new ArrayList<SupervisorSummary>();
        for (Map.Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
            String supervisorId = entry.getKey();
            SupervisorInfo supervisorInfo = entry.getValue();
            SupervisorSummary summary = NimbusUtils.mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToLeftSlotNum);
            ret.add(summary);
        }
        Collections.sort(ret, new Comparator<SupervisorSummary>(){

            @Override
            public int compare(SupervisorSummary o1, SupervisorSummary o2) {
                return o1.get_host().compareTo(o2.get_host());
            }
        });
        return ret;
    }

    public static NimbusSummary getNimbusSummary(StormClusterState stormClusterState, List<SupervisorSummary> supervisorSummaries, NimbusData data) throws Exception {
        NimbusSummary ret = new NimbusSummary();
        String master = stormClusterState.get_leader_host();
        NimbusStat nimbusMaster = new NimbusStat();
        nimbusMaster.set_host(master);
        nimbusMaster.set_uptimeSecs(String.valueOf(data.uptime()));
        ret.set_nimbusMaster(nimbusMaster);
        ArrayList<NimbusStat> nimbusSlaveList = new ArrayList<NimbusStat>();
        ret.set_nimbusSlaves(nimbusSlaveList);
        Map<String, String> nimbusSlaveMap = Cluster.get_all_nimbus_slave(stormClusterState);
        if (nimbusSlaveMap != null) {
            for (Map.Entry<String, String> entry : nimbusSlaveMap.entrySet()) {
                NimbusStat slave = new NimbusStat();
                slave.set_host(entry.getKey());
                slave.set_uptimeSecs(entry.getValue());
                nimbusSlaveList.add(slave);
            }
        }
        int totalPort = 0;
        int usedPort = 0;
        for (SupervisorSummary supervisor : supervisorSummaries) {
            totalPort += supervisor.get_numWorkers();
            usedPort += supervisor.get_numUsedWorkers();
        }
        ret.set_supervisorNum(supervisorSummaries.size());
        ret.set_totalPortNum(totalPort);
        ret.set_usedPortNum(usedPort);
        ret.set_freePortNum(totalPort - usedPort);
        ret.set_version(Utils.getVersion());
        return ret;
    }

    public static void updateTopologyTaskTimeout(NimbusData data, String topologyId) {
        Map topologyConf = null;
        try {
            topologyConf = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId);
        }
        catch (IOException e) {
            LOG.warn("Failed to read configuration of " + topologyId + ", " + e.getMessage());
        }
        Integer timeout = JStormUtils.parseInt(topologyConf.get("nimbus.task.timeout.secs"));
        if (timeout == null) {
            timeout = JStormUtils.parseInt(data.getConf().get("nimbus.task.timeout.secs"));
        }
        LOG.info("Setting taskTimeout:" + timeout + " for " + topologyId);
        data.getTopologyTaskTimeout().put(topologyId, timeout);
    }

    public static void removeTopologyTaskTimeout(NimbusData data, String topologyId) {
        data.getTopologyTaskTimeout().remove(topologyId);
    }

    public static void updateTopologyTaskHb(NimbusData data, String topologyId) {
        StormClusterState clusterState = data.getStormClusterState();
        TopologyTaskHbInfo topologyTaskHb = null;
        try {
            topologyTaskHb = clusterState.topology_heartbeat(topologyId);
        }
        catch (Exception e) {
            LOG.error("updateTopologyTaskHb: Failed to get topology task heartbeat info", (Throwable)e);
        }
        if (topologyTaskHb != null) {
            data.getTasksHeartbeat().put(topologyId, topologyTaskHb);
        }
    }

    public static void removeTopologyTaskHb(NimbusData data, String topologyId, int taskId) {
        Map<Integer, TaskHeartbeat> taskHbs;
        TopologyTaskHbInfo topologyTaskHbs = data.getTasksHeartbeat().get(topologyId);
        if (topologyTaskHbs != null && (taskHbs = topologyTaskHbs.get_taskHbs()) != null) {
            taskHbs.remove(taskId);
        }
    }

    public static int getTopologyMasterId(Map<Integer, TaskInfo> tasksInfo) {
        int ret = 0;
        for (Map.Entry<Integer, TaskInfo> entry : tasksInfo.entrySet()) {
            if (!entry.getValue().getComponentId().equalsIgnoreCase("__topology_master")) continue;
            ret = entry.getKey();
            break;
        }
        return ret;
    }
}

