package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.utils.LocalState;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.MachineCheckStatus;
import com.alibaba.jstorm.daemon.worker.LocalAssignment;
import com.alibaba.jstorm.event.EventManager;
import com.alibaba.jstorm.event.EventManagerZkPusher;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.class */
class SyncSupervisorEvent extends RunnableCallback {
    private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
    private String supervisorId;
    private EventManager syncSupEventManager;
    private StormClusterState stormClusterState;
    private LocalState localState;
    private Map<Object, Object> conf;
    private SyncProcessEvent syncProcesses;
    private int lastTime;
    private Heartbeat heartbeat;

    public SyncSupervisorEvent(String str, Map map, EventManager eventManager, StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcessEvent, Heartbeat heartbeat) {
        this.syncProcesses = syncProcessEvent;
        this.syncSupEventManager = eventManager;
        this.stormClusterState = stormClusterState;
        this.conf = map;
        this.supervisorId = str;
        this.localState = localState;
        this.heartbeat = heartbeat;
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        LOG.debug("Synchronizing supervisor, interval seconds:" + TimeUtils.time_delta(this.lastTime));
        this.lastTime = TimeUtils.current_time_secs();
        MachineCheckStatus machineCheckStatus = new MachineCheckStatus();
        machineCheckStatus.SetType(this.heartbeat.getCheckStatus().getType());
        try {
            EventManagerZkPusher eventManagerZkPusher = new EventManagerZkPusher(this, this.syncSupEventManager);
            Map<String, Integer> map = (Map) this.localState.get(Common.LS_LOCAL_ZK_ASSIGNMENT_VERSION);
            if (map == null) {
                map = new HashMap();
            }
            Map<String, Assignment> map2 = (Map) this.localState.get(Common.LS_LOCAl_ZK_ASSIGNMENTS);
            if (map2 == null) {
                map2 = new HashMap();
            }
            LOG.debug("get local assignments  " + map2);
            LOG.debug("get local assignments version " + map);
            if (machineCheckStatus.getType().equals(MachineCheckStatus.StatusType.panic) || machineCheckStatus.getType().equals(MachineCheckStatus.StatusType.error)) {
                map.clear();
                map2.clear();
                LOG.warn("Supervisor Machine Check Status :" + machineCheckStatus.getType() + ", so kill all workers.");
            } else {
                getAllAssignments(map, map2, eventManagerZkPusher);
            }
            LOG.debug("Get all assignments " + map2);
            List<String> list = StormConfig.get_supervisor_toplogy_list(this.conf);
            LOG.debug("Downloaded storm ids: " + list);
            Map<Integer, LocalAssignment> localAssign = getLocalAssign(this.stormClusterState, this.supervisorId, map2);
            try {
                LOG.debug("Writing local assignment " + localAssign);
                Map<Integer, LocalAssignment> map3 = (Map) this.localState.get(Common.LS_LOCAL_ASSIGNMENTS);
                if (map3 == null) {
                    map3 = new HashMap();
                }
                this.localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssign);
                Set<String> updateTopologys = getUpdateTopologys(map3, localAssign, map2);
                Set<String> needReDownloadTopologys = getNeedReDownloadTopologys(map3);
                if (needReDownloadTopologys != null) {
                    updateTopologys.addAll(needReDownloadTopologys);
                }
                Map<String, String> topologyCodeLocations = getTopologyCodeLocations(map2, this.supervisorId);
                HashSet hashSet = new HashSet();
                downloadTopology(topologyCodeLocations, list, updateTopologys, map2, hashSet);
                removeUselessTopology(topologyCodeLocations, list);
                this.syncProcesses.run(localAssign, hashSet);
                this.heartbeat.updateHbTrigger(true);
                try {
                    this.localState.put(Common.LS_LOCAL_ZK_ASSIGNMENT_VERSION, map);
                    this.localState.put(Common.LS_LOCAl_ZK_ASSIGNMENTS, map2);
                } catch (IOException e) {
                    LOG.error("put LS_LOCAL_ZK_ASSIGNMENT_VERSION&&LS_LOCAl_ZK_ASSIGNMENTS  failed");
                    throw e;
                }
            } catch (IOException e2) {
                LOG.error("put LS_LOCAL_ASSIGNMENTS " + localAssign + " of localState failed");
                throw e2;
            }
        } catch (Exception e3) {
            LOG.error("Failed to Sync Supervisor", e3);
        }
        if (machineCheckStatus.getType().equals(MachineCheckStatus.StatusType.panic)) {
            JStormUtils.halt_process(0, "Supervisor Machine Check Status : Panic , !!!!shutdown!!!!");
        }
    }

    private void downloadStormCode(Map map, String str, String str2) throws IOException, TException {
        String cluster_mode = StormConfig.cluster_mode(map);
        if (cluster_mode.endsWith("distributed")) {
            downloadDistributeStormCode(map, str, str2);
        } else if (cluster_mode.endsWith("local")) {
            downloadLocalStormCode(map, str, str2);
        }
    }

    private void downloadLocalStormCode(Map map, String str, String str2) throws IOException, TException {
        String supervisor_stormdist_root = StormConfig.supervisor_stormdist_root(map, str);
        FileUtils.copyDirectory(new File(str2), new File(supervisor_stormdist_root));
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        String resourcesJar = resourcesJar();
        URL resource = contextClassLoader.getResource(StormConfig.RESOURCES_SUBDIR);
        String str3 = supervisor_stormdist_root + '/' + StormConfig.RESOURCES_SUBDIR;
        if (resourcesJar != null) {
            LOG.info("Extracting resources from jar at " + resourcesJar + " to " + str3);
            JStormUtils.extractDirFromJar(resourcesJar, StormConfig.RESOURCES_SUBDIR, supervisor_stormdist_root);
        } else if (resource != null) {
            LOG.info("Copying resources at " + resource.toString() + " to " + str3);
            FileUtils.copyDirectory(new File(resource.getFile()), new File(str3));
        }
    }

    private void downloadDistributeStormCode(Map map, String str, String str2) throws IOException, TException {
        String str3 = StormConfig.supervisorTmpDir(map) + File.separator + UUID.randomUUID().toString();
        String supervisor_stormdist_root = StormConfig.supervisor_stormdist_root(map, str);
        JStormServerUtils.downloadCodeFromMaster(map, str3, str2, str, true);
        JStormUtils.extractDirFromJar(StormConfig.stormjar_path(str3), StormConfig.RESOURCES_SUBDIR, str3);
        File file = new File(str3);
        File file2 = new File(supervisor_stormdist_root);
        try {
            FileUtils.moveDirectory(file, file2);
        } catch (FileExistsException e) {
            FileUtils.copyDirectory(file, file2);
            FileUtils.deleteQuietly(file);
        }
    }

    private String resourcesJar() {
        String property = System.getProperty("java.class.path");
        if (property == null) {
            return null;
        }
        String[] split = property.split(File.pathSeparator);
        ArrayList arrayList = new ArrayList();
        for (String str : split) {
            if (str.endsWith(".jar")) {
                arrayList.add(str);
            }
        }
        ArrayList arrayList2 = new ArrayList();
        int size = arrayList.size();
        for (int i = 0; i < size; i++) {
            if (JStormUtils.zipContainsDir((String) arrayList.get(i), StormConfig.RESOURCES_SUBDIR)) {
                arrayList2.add(arrayList.get(i));
            }
        }
        if (arrayList2.size() == 0) {
            return null;
        }
        return (String) arrayList2.get(0);
    }

    private Map<Integer, LocalAssignment> getLocalAssign(StormClusterState stormClusterState, String str, Map<String, Assignment> map) throws Exception {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            Map<Integer, LocalAssignment> readMyTasks = readMyTasks(stormClusterState, entry.getKey(), str, entry.getValue());
            if (readMyTasks != null) {
                for (Map.Entry<Integer, LocalAssignment> entry2 : readMyTasks.entrySet()) {
                    Integer key = entry2.getKey();
                    LocalAssignment value = entry2.getValue();
                    if (hashMap.containsKey(key)) {
                        throw new RuntimeException("Should not have multiple topologys assigned to one port");
                    }
                    hashMap.put(key, value);
                }
            }
        }
        return hashMap;
    }

    private Map<Integer, LocalAssignment> readMyTasks(StormClusterState stormClusterState, String str, String str2, Assignment assignment) throws Exception {
        HashMap hashMap = new HashMap();
        Set<ResourceWorkerSlot> workers = assignment.getWorkers();
        if (workers == null) {
            LOG.error("No worker of assignment's " + assignment);
            return hashMap;
        }
        for (ResourceWorkerSlot resourceWorkerSlot : workers) {
            if (str2.equals(resourceWorkerSlot.getNodeId())) {
                hashMap.put(Integer.valueOf(resourceWorkerSlot.getPort()), new LocalAssignment(str, resourceWorkerSlot.getTasks(), Common.topologyIdToName(str), resourceWorkerSlot.getMemSize(), resourceWorkerSlot.getCpu(), resourceWorkerSlot.getJvm(), assignment.getTimeStamp()));
            }
        }
        return hashMap;
    }

    public static Map<String, String> getTopologyCodeLocations(Map<String, Assignment> map, String str) throws Exception {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            String key = entry.getKey();
            Assignment value = entry.getValue();
            Iterator<ResourceWorkerSlot> it = value.getWorkers().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (str.equals(it.next().getNodeId())) {
                    hashMap.put(key, value.getMasterCodeDir());
                    break;
                }
            }
        }
        return hashMap;
    }

    public void downloadTopology(Map<String, String> map, List<String> list, Set<String> set, Map<String, Assignment> map2, Set<String> set2) throws Exception {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (!list.contains(key) || set.contains(key)) {
                LOG.info("Downloading code for storm id " + key + " from " + value);
                int i = 0;
                while (i < 3) {
                    try {
                        downloadStormCode(this.conf, key, value);
                        StormConfig.write_supervisor_topology_timestamp(this.conf, key, map2.get(key).getTimeStamp());
                        break;
                    } catch (TException e) {
                        LOG.error(e + " downloadStormCode failed topologyId:" + key + "masterCodeDir:" + value);
                        i++;
                    } catch (IOException e2) {
                        LOG.error(e2 + " downloadStormCode failed topologyId:" + key + "masterCodeDir:" + value);
                        i++;
                    }
                }
                if (i < 3) {
                    LOG.info("Finished downloading code for storm id " + key + " from " + value);
                    hashSet.add(key);
                } else {
                    LOG.error("Cann't  download code for storm id " + key + " from " + value);
                    set2.add(key);
                }
            }
        }
        for (String str : set2) {
            if (!list.contains(str)) {
                try {
                    FileUtils.deleteQuietly(new File(StormConfig.supervisor_stormdist_root(this.conf, str)));
                } catch (Exception e3) {
                    LOG.error("Cann't  clear directory about storm id " + str + " on supervisor ");
                }
            }
        }
        updateTaskCleanupTimeout(hashSet);
    }

    public void removeUselessTopology(Map<String, String> map, List<String> list) {
        for (String str : list) {
            if (!map.containsKey(str)) {
                LOG.info("Removing code for storm id " + str);
                String str2 = null;
                try {
                    str2 = StormConfig.supervisor_stormdist_root(this.conf, str);
                    PathUtils.rmr(str2);
                } catch (IOException e) {
                    LOG.error("rmr the path:" + str2 + "failed\n", e);
                }
            }
        }
    }

    private Set<String> getUpdateTopologys(Map<Integer, LocalAssignment> map, Map<Integer, LocalAssignment> map2, Map<String, Assignment> map3) {
        HashSet hashSet = new HashSet();
        if (map != null && map2 != null) {
            for (Map.Entry<Integer, LocalAssignment> entry : map.entrySet()) {
                Integer key = entry.getKey();
                LocalAssignment value = entry.getValue();
                LocalAssignment localAssignment = map2.get(key);
                if (value != null && localAssignment != null) {
                    Assignment assignment = map3.get(value.getTopologyId());
                    if (value.getTopologyId().equals(localAssignment.getTopologyId()) && assignment != null && assignment.isTopologyChange(value.getTimeStamp()) && hashSet.add(value.getTopologyId())) {
                        LOG.info("Topology " + value.getTopologyId() + " has been updated. LocalTs=" + value.getTimeStamp() + ", ZkTs=" + localAssignment.getTimeStamp());
                    }
                }
            }
        }
        return hashSet;
    }

    private Set<String> getNeedReDownloadTopologys(Map<Integer, LocalAssignment> map) {
        Set<String> andSet = this.syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
        if (andSet == null || andSet.size() == 0) {
            return null;
        }
        HashSet hashSet = new HashSet();
        Map<Integer, String> portToWorkerId = this.syncProcesses.getPortToWorkerId();
        for (Map.Entry<Integer, LocalAssignment> entry : map.entrySet()) {
            if (portToWorkerId.containsKey(entry.getKey())) {
                hashSet.add(entry.getValue().getTopologyId());
            }
        }
        LOG.debug("worker is starting on these topology, so delay download topology binary: " + hashSet);
        andSet.removeAll(hashSet);
        if (andSet.size() > 0) {
            LOG.info("Following topologys is going to re-download the jars, " + andSet);
        }
        return andSet;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17, types: [java.util.Map] */
    private void updateTaskCleanupTimeout(Set<String> set) {
        Map map = null;
        HashMap hashMap = new HashMap();
        for (String str : set) {
            try {
                map = StormConfig.read_supervisor_topology_conf(this.conf, str);
            } catch (IOException e) {
                LOG.info("Failed to read conf for " + str);
            }
            Integer parseInt = map != null ? JStormUtils.parseInt(map.get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC)) : null;
            if (parseInt == null) {
                parseInt = Integer.valueOf(ConfigExtension.getTaskCleanupTimeoutSec(this.conf));
            }
            hashMap.put(str, parseInt);
        }
        HashMap hashMap2 = null;
        try {
            hashMap2 = (Map) this.localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
        } catch (IOException e2) {
            LOG.error("Failed to read local task cleanup timeout map", e2);
        }
        if (hashMap2 == null) {
            hashMap2 = hashMap;
        } else {
            hashMap2.putAll(hashMap);
        }
        try {
            this.localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, hashMap2);
        } catch (IOException e3) {
            LOG.error("Failed to write local task cleanup timeout map", e3);
        }
    }

    private void getAllAssignments(Map<String, Integer> map, Map<String, Assignment> map2, RunnableCallback runnableCallback) throws Exception {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        List<String> assignments = this.stormClusterState.assignments(runnableCallback);
        if (assignments == null) {
            map.clear();
            map2.clear();
            LOG.debug("No assignment of ZK");
            return;
        }
        for (String str : assignments) {
            Integer assignment_version = this.stormClusterState.assignment_version(str, runnableCallback);
            LOG.debug(str + "'s assigment version of zk is :" + assignment_version);
            Integer num = map.get(str);
            LOG.debug(str + "'s assigment version of local is :" + num);
            Assignment assignment = null;
            if (num != null && assignment_version != null && num.equals(assignment_version)) {
                assignment = map2.get(str);
            }
            if (assignment == null) {
                assignment = this.stormClusterState.assignment_info(str, runnableCallback);
            }
            if (assignment == null) {
                LOG.error("Failed to get Assignment of " + str + " from ZK");
            } else {
                hashMap2.put(str, assignment_version);
                hashMap.put(str, assignment);
            }
        }
        map.clear();
        map.putAll(hashMap2);
        map2.clear();
        map2.putAll(hashMap);
    }
}
