/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.utils.LocalState;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.Heartbeat;
import com.alibaba.jstorm.daemon.supervisor.MachineCheckStatus;
import com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent;
import com.alibaba.jstorm.daemon.worker.LocalAssignment;
import com.alibaba.jstorm.event.EventManager;
import com.alibaba.jstorm.event.EventManagerZkPusher;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SyncSupervisorEvent
extends RunnableCallback {
    private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
    private String supervisorId;
    private EventManager syncSupEventManager;
    private StormClusterState stormClusterState;
    private LocalState localState;
    private Map<Object, Object> conf;
    private SyncProcessEvent syncProcesses;
    private int lastTime;
    private Heartbeat heartbeat;

    public SyncSupervisorEvent(String supervisorId, Map conf, EventManager syncSupEventManager, StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
        this.syncProcesses = syncProcesses;
        this.syncSupEventManager = syncSupEventManager;
        this.stormClusterState = stormClusterState;
        this.conf = conf;
        this.supervisorId = supervisorId;
        this.localState = localState;
        this.heartbeat = heartbeat;
    }

    @Override
    public void run() {
        LOG.debug("Synchronizing supervisor, interval seconds:" + TimeUtils.time_delta(this.lastTime));
        this.lastTime = TimeUtils.current_time_secs();
        MachineCheckStatus checkStatus = new MachineCheckStatus();
        checkStatus.SetType(this.heartbeat.getCheckStatus().getType());
        try {
            HashMap<Integer, LocalAssignment> localAssignment;
            HashMap<String, Assignment> assignments;
            EventManagerZkPusher syncCallback = new EventManagerZkPusher(this, this.syncSupEventManager);
            HashMap<String, Integer> assignmentVersion = (HashMap<String, Integer>)this.localState.get("lcoal-zk-assignment-version");
            if (assignmentVersion == null) {
                assignmentVersion = new HashMap<String, Integer>();
            }
            if ((assignments = (HashMap<String, Assignment>)this.localState.get("local-zk-assignments")) == null) {
                assignments = new HashMap<String, Assignment>();
            }
            LOG.debug("get local assignments  " + assignments);
            LOG.debug("get local assignments version " + assignmentVersion);
            if (checkStatus.getType().equals((Object)MachineCheckStatus.StatusType.panic) || checkStatus.getType().equals((Object)MachineCheckStatus.StatusType.error)) {
                assignmentVersion.clear();
                assignments.clear();
                LOG.warn("Supervisor Machine Check Status :" + (Object)((Object)checkStatus.getType()) + ", so kill all workers.");
            } else {
                this.getAllAssignments(assignmentVersion, assignments, syncCallback);
            }
            LOG.debug("Get all assignments " + assignments);
            List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(this.conf);
            LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);
            Map<Integer, LocalAssignment> zkAssignment = this.getLocalAssign(this.stormClusterState, this.supervisorId, assignments);
            try {
                LOG.debug("Writing local assignment " + zkAssignment);
                localAssignment = (HashMap<Integer, LocalAssignment>)this.localState.get("local-assignments");
                if (localAssignment == null) {
                    localAssignment = new HashMap<Integer, LocalAssignment>();
                }
                this.localState.put("local-assignments", zkAssignment);
            }
            catch (IOException e) {
                LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " of localState failed");
                throw e;
            }
            Set<String> updateTopologys = this.getUpdateTopologys(localAssignment, zkAssignment, assignments);
            Set<String> reDownloadTopologys = this.getNeedReDownloadTopologys(localAssignment);
            if (reDownloadTopologys != null) {
                updateTopologys.addAll(reDownloadTopologys);
            }
            Map<String, String> topologyCodes = SyncSupervisorEvent.getTopologyCodeLocations(assignments, this.supervisorId);
            HashSet<String> downloadFailedTopologyIds = new HashSet<String>();
            this.downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologys, assignments, downloadFailedTopologyIds);
            this.removeUselessTopology(topologyCodes, downloadedTopologyIds);
            this.syncProcesses.run(zkAssignment, downloadFailedTopologyIds);
            this.heartbeat.updateHbTrigger(true);
            try {
                this.localState.put("lcoal-zk-assignment-version", assignmentVersion);
                this.localState.put("local-zk-assignments", assignments);
            }
            catch (IOException e) {
                LOG.error("put LS_LOCAL_ZK_ASSIGNMENT_VERSION&&LS_LOCAl_ZK_ASSIGNMENTS  failed");
                throw e;
            }
        }
        catch (Exception e) {
            LOG.error("Failed to Sync Supervisor", (Throwable)e);
        }
        if (checkStatus.getType().equals((Object)MachineCheckStatus.StatusType.panic)) {
            JStormUtils.halt_process(0, "Supervisor Machine Check Status : Panic , !!!!shutdown!!!!");
        }
    }

    private void downloadStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
        String clusterMode = StormConfig.cluster_mode(conf);
        if (clusterMode.endsWith("distributed")) {
            this.downloadDistributeStormCode(conf, topologyId, masterCodeDir);
        } else if (clusterMode.endsWith("local")) {
            this.downloadLocalStormCode(conf, topologyId, masterCodeDir);
        }
    }

    private void downloadLocalStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
        String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
        FileUtils.copyDirectory((File)new File(masterCodeDir), (File)new File(stormroot));
        ClassLoader classloader = Thread.currentThread().getContextClassLoader();
        String resourcesJar = this.resourcesJar();
        URL url = classloader.getResource("resources");
        String targetDir = stormroot + '/' + "resources";
        if (resourcesJar != null) {
            LOG.info("Extracting resources from jar at " + resourcesJar + " to " + targetDir);
            JStormUtils.extractDirFromJar(resourcesJar, "resources", stormroot);
        } else if (url != null) {
            LOG.info("Copying resources at " + url.toString() + " to " + targetDir);
            FileUtils.copyDirectory((File)new File(url.getFile()), (File)new File(targetDir));
        }
    }

    private void downloadDistributeStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
        String tmproot = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString();
        String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
        JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, topologyId, true);
        String localFileJarTmp = StormConfig.stormjar_path(tmproot);
        JStormUtils.extractDirFromJar(localFileJarTmp, "resources", tmproot);
        File srcDir = new File(tmproot);
        File destDir = new File(stormroot);
        try {
            FileUtils.moveDirectory((File)srcDir, (File)destDir);
        }
        catch (FileExistsException e) {
            FileUtils.copyDirectory((File)srcDir, (File)destDir);
            FileUtils.deleteQuietly((File)srcDir);
        }
    }

    private String resourcesJar() {
        String path = System.getProperty("java.class.path");
        if (path == null) {
            return null;
        }
        String[] paths = path.split(File.pathSeparator);
        ArrayList<String> jarPaths = new ArrayList<String>();
        for (String s : paths) {
            if (!s.endsWith(".jar")) continue;
            jarPaths.add(s);
        }
        ArrayList rtn = new ArrayList();
        int size = jarPaths.size();
        for (int i = 0; i < size; ++i) {
            if (!JStormUtils.zipContainsDir((String)jarPaths.get(i), "resources")) continue;
            rtn.add(jarPaths.get(i));
        }
        if (rtn.size() == 0) {
            return null;
        }
        return (String)rtn.get(0);
    }

    private Map<Integer, LocalAssignment> getLocalAssign(StormClusterState stormClusterState, String supervisorId, Map<String, Assignment> assignments) throws Exception {
        HashMap<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
        for (Map.Entry<String, Assignment> assignEntry : assignments.entrySet()) {
            Assignment assignment;
            String topologyId = assignEntry.getKey();
            Map<Integer, LocalAssignment> portTasks = this.readMyTasks(stormClusterState, topologyId, supervisorId, assignment = assignEntry.getValue());
            if (portTasks == null) continue;
            for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
                Integer port = entry.getKey();
                LocalAssignment la = entry.getValue();
                if (!portLA.containsKey(port)) {
                    portLA.put(port, la);
                    continue;
                }
                throw new RuntimeException("Should not have multiple topologys assigned to one port");
            }
        }
        return portLA;
    }

    private Map<Integer, LocalAssignment> readMyTasks(StormClusterState stormClusterState, String topologyId, String supervisorId, Assignment assignmentInfo) throws Exception {
        HashMap<Integer, LocalAssignment> portTasks = new HashMap<Integer, LocalAssignment>();
        Set<ResourceWorkerSlot> workers = assignmentInfo.getWorkers();
        if (workers == null) {
            LOG.error("No worker of assignment's " + assignmentInfo);
            return portTasks;
        }
        for (ResourceWorkerSlot worker : workers) {
            if (!supervisorId.equals(worker.getNodeId())) continue;
            portTasks.put(worker.getPort(), new LocalAssignment(topologyId, worker.getTasks(), Common.topologyIdToName(topologyId), worker.getMemSize(), worker.getCpu(), worker.getJvm(), assignmentInfo.getTimeStamp()));
        }
        return portTasks;
    }

    public static Map<String, String> getTopologyCodeLocations(Map<String, Assignment> assignments, String supervisorId) throws Exception {
        HashMap<String, String> rtn = new HashMap<String, String>();
        block0: for (Map.Entry<String, Assignment> entry : assignments.entrySet()) {
            String topologyid = entry.getKey();
            Assignment assignmenInfo = entry.getValue();
            Set<ResourceWorkerSlot> workers = assignmenInfo.getWorkers();
            for (ResourceWorkerSlot worker : workers) {
                String node = worker.getNodeId();
                if (!supervisorId.equals(node)) continue;
                rtn.put(topologyid, assignmenInfo.getMasterCodeDir());
                continue block0;
            }
        }
        return rtn;
    }

    public void downloadTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds, Set<String> updateTopologys, Map<String, Assignment> assignments, Set<String> downloadFailedTopologyIds) throws Exception {
        HashSet<String> downloadTopologys = new HashSet<String>();
        for (Map.Entry<String, String> entry : topologyCodes.entrySet()) {
            int retry;
            String topologyId = entry.getKey();
            String masterCodeDir = entry.getValue();
            if (downloadedTopologyIds.contains(topologyId) && !updateTopologys.contains(topologyId)) continue;
            LOG.info("Downloading code for storm id " + topologyId + " from " + masterCodeDir);
            for (retry = 0; retry < 3; ++retry) {
                try {
                    this.downloadStormCode(this.conf, topologyId, masterCodeDir);
                    StormConfig.write_supervisor_topology_timestamp(this.conf, topologyId, assignments.get(topologyId).getTimeStamp());
                    break;
                }
                catch (IOException e) {
                    LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
                    continue;
                }
                catch (TException e) {
                    LOG.error((Object)((Object)e) + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
                }
            }
            if (retry < 3) {
                LOG.info("Finished downloading code for storm id " + topologyId + " from " + masterCodeDir);
                downloadTopologys.add(topologyId);
                continue;
            }
            LOG.error("Cann't  download code for storm id " + topologyId + " from " + masterCodeDir);
            downloadFailedTopologyIds.add(topologyId);
        }
        for (String topologyId : downloadFailedTopologyIds) {
            if (downloadedTopologyIds.contains(topologyId)) continue;
            try {
                String stormroot = StormConfig.supervisor_stormdist_root(this.conf, topologyId);
                File destDir = new File(stormroot);
                FileUtils.deleteQuietly((File)destDir);
            }
            catch (Exception e) {
                LOG.error("Cann't  clear directory about storm id " + topologyId + " on supervisor ");
            }
        }
        this.updateTaskCleanupTimeout(downloadTopologys);
    }

    public void removeUselessTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds) {
        for (String topologyId : downloadedTopologyIds) {
            if (topologyCodes.containsKey(topologyId)) continue;
            LOG.info("Removing code for storm id " + topologyId);
            String path = null;
            try {
                path = StormConfig.supervisor_stormdist_root(this.conf, topologyId);
                PathUtils.rmr(path);
            }
            catch (IOException e) {
                String errMsg = "rmr the path:" + path + "failed\n";
                LOG.error(errMsg, (Throwable)e);
            }
        }
    }

    private Set<String> getUpdateTopologys(Map<Integer, LocalAssignment> localAssignments, Map<Integer, LocalAssignment> zkAssignments, Map<String, Assignment> assignments) {
        HashSet<String> ret = new HashSet<String>();
        if (localAssignments != null && zkAssignments != null) {
            for (Map.Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
                Integer port = entry.getKey();
                LocalAssignment localAssignment = entry.getValue();
                LocalAssignment zkAssignment = zkAssignments.get(port);
                if (localAssignment == null || zkAssignment == null) continue;
                Assignment assignment = assignments.get(localAssignment.getTopologyId());
                if (!localAssignment.getTopologyId().equals(zkAssignment.getTopologyId()) || assignment == null || !assignment.isTopologyChange(localAssignment.getTimeStamp()) || !ret.add(localAssignment.getTopologyId())) continue;
                LOG.info("Topology " + localAssignment.getTopologyId() + " has been updated. LocalTs=" + localAssignment.getTimeStamp() + ", ZkTs=" + zkAssignment.getTimeStamp());
            }
        }
        return ret;
    }

    private Set<String> getNeedReDownloadTopologys(Map<Integer, LocalAssignment> localAssignment) {
        Set reDownloadTopologys = this.syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
        if (reDownloadTopologys == null || reDownloadTopologys.size() == 0) {
            return null;
        }
        HashSet<String> needRemoveTopologys = new HashSet<String>();
        Map<Integer, String> portToStartWorkerId = this.syncProcesses.getPortToWorkerId();
        for (Map.Entry<Integer, LocalAssignment> entry : localAssignment.entrySet()) {
            if (!portToStartWorkerId.containsKey(entry.getKey())) continue;
            needRemoveTopologys.add(entry.getValue().getTopologyId());
        }
        LOG.debug("worker is starting on these topology, so delay download topology binary: " + needRemoveTopologys);
        reDownloadTopologys.removeAll(needRemoveTopologys);
        if (reDownloadTopologys.size() > 0) {
            LOG.info("Following topologys is going to re-download the jars, " + reDownloadTopologys);
        }
        return reDownloadTopologys;
    }

    private void updateTaskCleanupTimeout(Set<String> topologys) {
        Map topologyConf = null;
        HashMap<String, Integer> taskCleanupTimeouts = new HashMap<String, Integer>();
        for (String topologyId : topologys) {
            try {
                topologyConf = StormConfig.read_supervisor_topology_conf(this.conf, topologyId);
            }
            catch (IOException e) {
                LOG.info("Failed to read conf for " + topologyId);
            }
            Integer cleanupTimeout = null;
            if (topologyConf != null) {
                cleanupTimeout = JStormUtils.parseInt(topologyConf.get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC));
            }
            if (cleanupTimeout == null) {
                cleanupTimeout = ConfigExtension.getTaskCleanupTimeoutSec(this.conf);
            }
            taskCleanupTimeouts.put(topologyId, cleanupTimeout);
        }
        HashMap<String, Integer> localTaskCleanupTimeouts = null;
        try {
            localTaskCleanupTimeouts = (HashMap<String, Integer>)this.localState.get("task-cleanup-timeout");
        }
        catch (IOException e) {
            LOG.error("Failed to read local task cleanup timeout map", (Throwable)e);
        }
        if (localTaskCleanupTimeouts == null) {
            localTaskCleanupTimeouts = taskCleanupTimeouts;
        } else {
            localTaskCleanupTimeouts.putAll(taskCleanupTimeouts);
        }
        try {
            this.localState.put("task-cleanup-timeout", localTaskCleanupTimeouts);
        }
        catch (IOException e) {
            LOG.error("Failed to write local task cleanup timeout map", (Throwable)e);
        }
    }

    private void getAllAssignments(Map<String, Integer> assignmentVersion, Map<String, Assignment> localZkAssignments, RunnableCallback callback) throws Exception {
        HashMap<String, Assignment> ret = new HashMap<String, Assignment>();
        HashMap<String, Integer> updateAssignmentVersion = new HashMap<String, Integer>();
        List<String> assignments = this.stormClusterState.assignments(callback);
        if (assignments == null) {
            assignmentVersion.clear();
            localZkAssignments.clear();
            LOG.debug("No assignment of ZK");
            return;
        }
        for (String topology_id : assignments) {
            Integer zkVersion = this.stormClusterState.assignment_version(topology_id, callback);
            LOG.debug(topology_id + "'s assigment version of zk is :" + zkVersion);
            Integer recordedVersion = assignmentVersion.get(topology_id);
            LOG.debug(topology_id + "'s assigment version of local is :" + recordedVersion);
            Assignment assignment = null;
            if (recordedVersion != null && zkVersion != null && recordedVersion.equals(zkVersion)) {
                assignment = localZkAssignments.get(topology_id);
            }
            if (assignment == null) {
                assignment = this.stormClusterState.assignment_info(topology_id, callback);
            }
            if (assignment == null) {
                LOG.error("Failed to get Assignment of " + topology_id + " from ZK");
                continue;
            }
            updateAssignmentVersion.put(topology_id, zkVersion);
            ret.put(topology_id, assignment);
        }
        assignmentVersion.clear();
        assignmentVersion.putAll(updateAssignmentVersion);
        localZkAssignments.clear();
        localZkAssignments.putAll(ret);
    }
}

