/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ComponentSummary;
import backtype.storm.generated.Credentials;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.MonitorOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NimbusSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.SupervisorWorkers;
import backtype.storm.generated.TaskComponent;
import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TaskSummary;
import backtype.storm.generated.TopologyAssignException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.generated.WorkerSummary;
import backtype.storm.nimbus.ITopologyActionNotifierPlugin;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.DaemonCommon;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.Thrift;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceHandler
implements Nimbus.Iface,
Shutdownable,
DaemonCommon {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceHandler.class);
    public static final int THREAD_NUM = 64;
    private NimbusData data;
    private Map<Object, Object> conf;

    public ServiceHandler(NimbusData data) {
        this.data = data;
        this.conf = data.getConf();
    }

    @Override
    public void shutdown() {
        LOG.info("Begin to shut down master");
        ITopologyActionNotifierPlugin nimbusNotify = this.data.getNimbusNotify();
        if (nimbusNotify != null) {
            nimbusNotify.cleanup();
        }
        LOG.info("Successfully shut down master");
    }

    @Override
    public boolean waiting() {
        return false;
    }

    @Override
    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws TException, AlreadyAliveException, InvalidTopologyException, TopologyAssignException {
        SubmitOptions options = new SubmitOptions(TopologyInitialStatus.ACTIVE);
        this.submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
    }

    private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException {
        TopologyAssignEvent assignEvent = new TopologyAssignEvent();
        assignEvent.setTopologyId(topologyId);
        assignEvent.setScratch(false);
        assignEvent.setTopologyName(topologyName);
        assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status));
        TopologyAssign.push(assignEvent);
        boolean isSuccess = assignEvent.waitFinish();
        if (!isSuccess) {
            throw new FailedAssignTopologyException(assignEvent.getErrorMsg());
        }
        LOG.info("Finish submit for " + topologyName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitTopologyWithOpts(String topologyName, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException {
        StringBuilder sb;
        LOG.info("Receive " + topologyName + ", uploadedJarLocation:" + uploadedJarLocation);
        long start = System.nanoTime();
        if (!Common.charValidate(topologyName)) {
            throw new InvalidTopologyException(topologyName + " is not a valid topology name");
        }
        try {
            this.checkTopologyActive(this.data, topologyName, false);
        }
        catch (AlreadyAliveException e) {
            LOG.info(topologyName + " already exists ");
            throw e;
        }
        catch (Throwable e) {
            LOG.info("Failed to check whether topology is alive or not", e);
            throw new TException(e);
        }
        String topologyId = null;
        NimbusData nimbusData = this.data;
        synchronized (nimbusData) {
            Set<String> pendingTopologys = this.data.getPendingSubmitTopoloygs().keySet();
            for (String cachTopologyId : pendingTopologys) {
                if (!cachTopologyId.contains(topologyName + "-")) continue;
                throw new AlreadyAliveException(topologyName + "  were submitted");
            }
            int counter = this.data.getSubmittedCount().incrementAndGet();
            topologyId = Common.topologyNameToId(topologyName, counter);
            this.data.getPendingSubmitTopoloygs().put(topologyId, null);
        }
        try {
            Map serializedConf = (Map)JStormUtils.from_json(jsonConf);
            if (serializedConf == null) {
                LOG.warn("Failed to serialized Configuration");
                throw new InvalidTopologyException("Failed to serialize topology configuration");
            }
            serializedConf.put("topology.id", topologyId);
            serializedConf.put("topology.name", topologyName);
            Map stormConf = NimbusUtils.normalizeConf(this.conf, serializedConf, topology);
            LOG.info("Normalized configuration:" + stormConf);
            HashMap<Object, Object> totalStormConf = new HashMap<Object, Object>(this.conf);
            totalStormConf.putAll(stormConf);
            StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);
            Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
            StormClusterState stormClusterState = this.data.getStormClusterState();
            double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf);
            this.setupStormCode(this.conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology);
            this.setupZkTaskInfo(this.conf, topologyId, stormClusterState);
            LOG.info("Submit for " + topologyName + " with conf " + serializedConf);
            this.makeAssignment(topologyName, topologyId, options.get_initial_status());
            this.data.getPendingSubmitTopoloygs().remove(topologyId);
            TopologyMetricsRunnable.StartTopologyEvent startEvent = new TopologyMetricsRunnable.StartTopologyEvent();
            startEvent.clusterName = this.data.getClusterName();
            startEvent.topologyId = topologyId;
            startEvent.timestamp = System.currentTimeMillis();
            startEvent.sampleRate = metricsSampleRate;
            this.data.getMetricRunnable().pushEvent(startEvent);
            this.notifyTopologyActionListener(topologyName, "submitTopology");
        }
        catch (FailedAssignTopologyException e) {
            sb = new StringBuilder();
            sb.append("Fail to sumbit topology, Root cause:");
            if (e.getMessage() == null) {
                sb.append("submit timeout");
            } else {
                sb.append(e.getMessage());
            }
            sb.append("\n\n");
            sb.append("topologyId:" + topologyId);
            sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
            LOG.error(sb.toString(), (Throwable)e);
            this.data.getPendingSubmitTopoloygs().remove(topologyId);
            throw new TopologyAssignException(sb.toString());
        }
        catch (InvalidParameterException e) {
            sb = new StringBuilder();
            sb.append("Fail to sumbit topology ");
            sb.append(e.getMessage());
            sb.append(", cause:" + e.getCause());
            sb.append("\n\n");
            sb.append("topologyId:" + topologyId);
            sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
            LOG.error(sb.toString(), (Throwable)e);
            this.data.getPendingSubmitTopoloygs().remove(topologyId);
            throw new InvalidParameterException(sb.toString());
        }
        catch (InvalidTopologyException e) {
            LOG.error("Topology is invalid. " + e.get_msg());
            this.data.getPendingSubmitTopoloygs().remove(topologyId);
            throw e;
        }
        catch (Throwable e) {
            sb = new StringBuilder();
            sb.append("Fail to sumbit topology ");
            sb.append(e.getMessage());
            sb.append(", cause:" + e.getCause());
            sb.append("\n\n");
            sb.append("topologyId:" + topologyId);
            sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
            LOG.error(sb.toString(), e);
            this.data.getPendingSubmitTopoloygs().remove(topologyId);
            throw new TopologyAssignException(sb.toString());
        }
        finally {
            double spend = (System.nanoTime() - start) / 1000L;
            SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts", spend);
            LOG.info("submitTopologyWithOpts {} costs {}ms", (Object)topologyName, (Object)spend);
        }
    }

    @Override
    public void killTopology(String topologyName) throws TException, NotAliveException {
        this.killTopologyWithOpts(topologyName, new KillOptions());
    }

    @Override
    public void killTopologyWithOpts(String topologyName, KillOptions options) throws TException, NotAliveException {
        try {
            this.checkTopologyActive(this.data, topologyName, true);
            String topologyId = this.getTopologyId(topologyName);
            Integer wait_amt = null;
            if (options.is_set_wait_secs()) {
                wait_amt = options.get_wait_secs();
            }
            NimbusUtils.transitionName(this.data, topologyName, true, StatusType.kill, wait_amt);
            TopologyMetricsRunnable.Remove event = new TopologyMetricsRunnable.Remove();
            event.topologyId = topologyId;
            this.data.getMetricRunnable().pushEvent(event);
            this.notifyTopologyActionListener(topologyName, "killTopology");
        }
        catch (NotAliveException e) {
            String errMsg = "KillTopology Error, no this topology " + topologyName;
            LOG.error(errMsg, (Throwable)((Object)e));
            throw new NotAliveException(errMsg);
        }
        catch (Exception e) {
            String errMsg = "Failed to kill topology " + topologyName;
            LOG.error(errMsg, (Throwable)e);
            throw new TException(errMsg);
        }
    }

    @Override
    public void activate(String topologyName) throws TException, NotAliveException {
        try {
            NimbusUtils.transitionName(this.data, topologyName, true, StatusType.activate, new Object[0]);
            this.notifyTopologyActionListener(topologyName, "activate");
        }
        catch (NotAliveException e) {
            String errMsg = "Activate Error, no this topology " + topologyName;
            LOG.error(errMsg, (Throwable)((Object)e));
            throw new NotAliveException(errMsg);
        }
        catch (Exception e) {
            String errMsg = "Failed to active topology " + topologyName;
            LOG.error(errMsg, (Throwable)e);
            throw new TException(errMsg);
        }
    }

    @Override
    public void deactivate(String topologyName) throws TException, NotAliveException {
        try {
            NimbusUtils.transitionName(this.data, topologyName, true, StatusType.inactivate, new Object[0]);
            this.notifyTopologyActionListener(topologyName, "inactivate");
        }
        catch (NotAliveException e) {
            String errMsg = "Deactivate Error, no this topology " + topologyName;
            LOG.error(errMsg, (Throwable)((Object)e));
            throw new NotAliveException(errMsg);
        }
        catch (Exception e) {
            String errMsg = "Failed to deactivate topology " + topologyName;
            LOG.error(errMsg, (Throwable)e);
            throw new TException(errMsg);
        }
    }

    @Override
    public void rebalance(String topologyName, RebalanceOptions options) throws TException, NotAliveException {
        try {
            this.checkTopologyActive(this.data, topologyName, true);
            Integer wait_amt = null;
            String jsonConf = null;
            Boolean reassign = false;
            if (options != null) {
                if (options.is_set_wait_secs()) {
                    wait_amt = options.get_wait_secs();
                }
                if (options.is_set_reassign()) {
                    reassign = options.is_reassign();
                }
                if (options.is_set_conf()) {
                    jsonConf = options.get_conf();
                }
            }
            LOG.info("Begin to rebalance " + topologyName + "wait_time:" + wait_amt + ", reassign: " + reassign + ", new worker/bolt configuration:" + jsonConf);
            Map conf = (Map)JStormUtils.from_json(jsonConf);
            NimbusUtils.transitionName(this.data, topologyName, true, StatusType.rebalance, wait_amt, reassign, conf);
            this.notifyTopologyActionListener(topologyName, "rebalance");
        }
        catch (NotAliveException e) {
            String errMsg = "Rebalance Error, no this topology " + topologyName;
            LOG.error(errMsg, (Throwable)((Object)e));
            throw new NotAliveException(errMsg);
        }
        catch (Exception e) {
            String errMsg = "Failed to rebalance topology " + topologyName;
            LOG.error(errMsg, (Throwable)e);
            throw new TException(errMsg);
        }
    }

    @Override
    public void restart(String name, String jsonConf) throws TException, NotAliveException, InvalidTopologyException, TopologyAssignException {
        Map topologyConf;
        StormTopology topology;
        String topologyId;
        LOG.info("Begin to restart " + name + ", new configuration:" + jsonConf);
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            topologyId = Cluster.get_topology_id(stormClusterState, name);
        }
        catch (Exception e2) {
            topologyId = null;
        }
        if (topologyId == null) {
            LOG.info("No topology of " + name);
            throw new NotAliveException("No topology of " + name);
        }
        this.deactivate(name);
        JStormUtils.sleepMs(5000L);
        LOG.info("Deactivate " + name);
        String topologyCodeLocation = null;
        try {
            topology = StormConfig.read_nimbus_topology_code(this.conf, topologyId);
            topologyConf = StormConfig.read_nimbus_topology_conf(this.conf, topologyId);
            if (jsonConf != null) {
                Map newConf = (Map)JStormUtils.from_json(jsonConf);
                topologyConf.putAll(newConf);
            }
            String oldDistDir = StormConfig.masterStormdistRoot(this.conf, topologyId);
            String parent = StormConfig.masterInbox(this.conf);
            topologyCodeLocation = parent + "/" + topologyId;
            FileUtils.forceMkdir((File)new File(topologyCodeLocation));
            FileUtils.cleanDirectory((File)new File(topologyCodeLocation));
            File stormDistDir = new File(oldDistDir);
            stormDistDir.setLastModified(System.currentTimeMillis());
            FileUtils.copyDirectory((File)stormDistDir, (File)new File(topologyCodeLocation));
            LOG.info("Successfully read old jar/conf/topology " + name);
            this.notifyTopologyActionListener(name, "restart");
        }
        catch (Exception e) {
            LOG.error("Failed to read old jar/conf/topology", (Throwable)e);
            if (topologyCodeLocation != null) {
                try {
                    PathUtils.rmr(topologyCodeLocation);
                }
                catch (IOException parent) {
                    // empty catch block
                }
            }
            throw new TException("Failed to read old jar/conf/topology ");
        }
        RemoveTransitionCallback killCb = new RemoveTransitionCallback(this.data, topologyId);
        killCb.execute(new Object[0]);
        LOG.info("Successfully kill the topology " + name);
        TopologyMetricsRunnable.KillTopologyEvent killEvent = new TopologyMetricsRunnable.KillTopologyEvent();
        killEvent.clusterName = this.data.getClusterName();
        killEvent.topologyId = topologyId;
        killEvent.timestamp = System.currentTimeMillis();
        this.data.getMetricRunnable().pushEvent(killEvent);
        TopologyMetricsRunnable.Remove removeEvent = new TopologyMetricsRunnable.Remove();
        removeEvent.topologyId = topologyId;
        this.data.getMetricRunnable().pushEvent(removeEvent);
        try {
            this.submitTopology(name, topologyCodeLocation, JStormUtils.to_json(topologyConf), topology);
        }
        catch (AlreadyAliveException e) {
            LOG.info("Failed to kill the topology" + name);
            throw new TException("Failed to kill the topology" + name);
        }
        finally {
            try {
                PathUtils.rmr(topologyCodeLocation);
            }
            catch (IOException iOException) {}
        }
    }

    @Override
    public void beginLibUpload(String libName) throws TException {
        try {
            String parent = PathUtils.parent_path(libName);
            PathUtils.local_mkdirs(parent);
            this.data.getUploaders().put(libName, Channels.newChannel(new FileOutputStream(libName)));
            LOG.info("Begin upload file from client to " + libName);
        }
        catch (Exception e) {
            LOG.error("Fail to upload jar " + libName, (Throwable)e);
            throw new TException((Throwable)e);
        }
    }

    @Override
    public String beginFileUpload() throws TException {
        String fileLoc = null;
        try {
            String key = UUID.randomUUID().toString();
            String path = StormConfig.masterInbox(this.conf) + "/" + key;
            FileUtils.forceMkdir((File)new File(path));
            FileUtils.cleanDirectory((File)new File(path));
            fileLoc = path + "/stormjar-" + key + ".jar";
            this.data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc)));
            LOG.info("Begin upload file from client to " + fileLoc);
            return path;
        }
        catch (FileNotFoundException e) {
            LOG.error("File not found: " + fileLoc, (Throwable)e);
            throw new TException((Throwable)e);
        }
        catch (IOException e) {
            LOG.error("Upload file error: " + fileLoc, (Throwable)e);
            throw new TException((Throwable)e);
        }
    }

    @Override
    public void uploadChunk(String location, ByteBuffer chunk) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(location);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out) " + location);
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + location);
            }
            WritableByteChannel channel = (WritableByteChannel)obj;
            channel.write(chunk);
            uploaders.put(location, channel);
        }
        catch (IOException e) {
            String errMsg = " WritableByteChannel write filed when uploadChunk " + location;
            LOG.error(errMsg);
            throw new TException((Throwable)e);
        }
    }

    @Override
    public void finishFileUpload(String location) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(location);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out)");
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + location);
            }
            WritableByteChannel channel = (WritableByteChannel)obj;
            channel.close();
            uploaders.remove(location);
            LOG.info("Finished uploading file from client: " + location);
        }
        catch (IOException e) {
            LOG.error(" WritableByteChannel close failed when finishFileUpload " + location);
        }
    }

    @Override
    public String beginFileDownload(String file) throws TException {
        String id;
        try {
            int bufferSize = JStormUtils.parseInt(this.conf.get("nimbus.thrift.max_buffer_size"), 0x100000) / 2;
            BufferFileInputStream is = new BufferFileInputStream(file, bufferSize);
            id = UUID.randomUUID().toString();
            this.data.getDownloaders().put(id, is);
        }
        catch (FileNotFoundException e) {
            LOG.error(e + "file:" + file + " not found");
            throw new TException((Throwable)e);
        }
        return id;
    }

    @Override
    public ByteBuffer downloadChunk(String id) throws TException {
        block5: {
            TimeCacheMap<Object, Object> downloaders = this.data.getDownloaders();
            Object obj = downloaders.get(id);
            if (obj == null) {
                throw new TException("Could not find input stream for that id");
            }
            try {
                if (obj instanceof BufferFileInputStream) {
                    BufferFileInputStream is = (BufferFileInputStream)obj;
                    byte[] ret = is.read();
                    if (ret != null) {
                        downloaders.put(id, is);
                        return ByteBuffer.wrap(ret);
                    }
                    break block5;
                }
                throw new TException("Object isn't BufferFileInputStream for " + id);
            }
            catch (IOException e) {
                LOG.error("BufferFileInputStream read failed when downloadChunk ", (Throwable)e);
                throw new TException((Throwable)e);
            }
        }
        byte[] empty = new byte[]{};
        return ByteBuffer.wrap(empty);
    }

    @Override
    public void finishFileDownload(String id) throws TException {
        this.data.getDownloaders().remove(id);
    }

    @Override
    public ClusterSummary getClusterInfo() throws TException {
        long start = System.nanoTime();
        try {
            StormClusterState stormClusterState = this.data.getStormClusterState();
            HashMap<String, Assignment> assignments = new HashMap<String, Assignment>();
            List<TopologySummary> topologySummaries = NimbusUtils.getTopologySummary(stormClusterState, assignments);
            Map<String, SupervisorInfo> supervisorInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
            List<SupervisorSummary> supervisorSummaries = NimbusUtils.mkSupervisorSummaries(supervisorInfos, assignments);
            NimbusSummary nimbusSummary = NimbusUtils.getNimbusSummary(stormClusterState, supervisorSummaries, this.data);
            ClusterSummary clusterSummary = new ClusterSummary(nimbusSummary, supervisorSummaries, topologySummaries);
            return clusterSummary;
        }
        catch (TException e) {
            LOG.info("Failed to get ClusterSummary ", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            LOG.info("Failed to get ClusterSummary ", (Throwable)e);
            throw new TException((Throwable)e);
        }
        finally {
            long end = System.nanoTime();
            SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", (end - start) / 1000L);
        }
    }

    @Override
    public String getVersion() throws TException {
        return Utils.getVersion();
    }

    @Override
    public SupervisorWorkers getSupervisorWorkers(String host) throws NotAliveException, TException {
        long start = System.nanoTime();
        try {
            StormClusterState stormClusterState = this.data.getStormClusterState();
            String supervisorId = null;
            SupervisorInfo supervisorInfo = null;
            String ip = NetWorkUtils.host2Ip(host);
            String hostName = NetWorkUtils.ip2Host(host);
            Map<String, SupervisorInfo> supervisorInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
            for (Map.Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
                SupervisorInfo info = entry.getValue();
                if (!info.getHostName().equals(hostName) && !info.getHostName().equals(ip)) continue;
                supervisorId = entry.getKey();
                supervisorInfo = info;
                break;
            }
            if (supervisorId == null) {
                throw new TException("No supervisor of " + host);
            }
            Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null);
            TreeMap<Integer, WorkerSummary> portWorkerSummarys = new TreeMap<Integer, WorkerSummary>();
            int usedSlotNumber = 0;
            HashMap<String, Map<Integer, String>> topologyTaskToComponent = new HashMap<String, Map<Integer, String>>();
            HashMap<String, MetricInfo> metricInfoMap = new HashMap<String, MetricInfo>();
            for (Map.Entry<String, Assignment> entry : assignments.entrySet()) {
                String topologyId = entry.getKey();
                Assignment assignment = entry.getValue();
                Set<ResourceWorkerSlot> workers = assignment.getWorkers();
                for (ResourceWorkerSlot worker : workers) {
                    Map<Integer, String> taskToComponent;
                    if (!supervisorId.equals(worker.getNodeId())) continue;
                    ++usedSlotNumber;
                    Integer port = worker.getPort();
                    WorkerSummary workerSummary = (WorkerSummary)portWorkerSummarys.get(port);
                    if (workerSummary == null) {
                        workerSummary = new WorkerSummary();
                        workerSummary.set_port(port);
                        workerSummary.set_topology(topologyId);
                        workerSummary.set_tasks(new ArrayList<TaskComponent>());
                        portWorkerSummarys.put(port, workerSummary);
                    }
                    if ((taskToComponent = (Map<Integer, String>)topologyTaskToComponent.get(topologyId)) == null) {
                        taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
                        topologyTaskToComponent.put(topologyId, taskToComponent);
                    }
                    int earliest = TimeUtils.current_time_secs();
                    for (Integer taskId : worker.getTasks()) {
                        TaskComponent taskComponent = new TaskComponent();
                        taskComponent.set_component(taskToComponent.get(taskId));
                        taskComponent.set_taskId(taskId);
                        Integer startTime = assignment.getTaskStartTimeSecs().get(taskId);
                        if (startTime != null && startTime < earliest) {
                            earliest = startTime;
                        }
                        workerSummary.add_to_tasks(taskComponent);
                    }
                    workerSummary.set_uptime(TimeUtils.time_delta(earliest));
                    String workerSlotName = TopologyMetricsRunnable.getWorkerSlotName(supervisorInfo.getHostName(), port);
                    List<MetricInfo> workerMetricInfoList = this.data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
                    if (workerMetricInfoList.size() <= 0) continue;
                    MetricInfo workerMetricInfo = workerMetricInfoList.get(0);
                    Iterator<String> itr = workerMetricInfo.get_metrics().keySet().iterator();
                    while (itr.hasNext()) {
                        String metricName = itr.next();
                        if (metricName.contains(host)) continue;
                        itr.remove();
                    }
                    metricInfoMap.put(workerSlotName, workerMetricInfo);
                }
            }
            ArrayList<WorkerSummary> workerList = new ArrayList<WorkerSummary>();
            workerList.addAll(portWorkerSummarys.values());
            HashMap<String, Integer> supervisorToUsedSlotNum = new HashMap<String, Integer>();
            supervisorToUsedSlotNum.put(supervisorId, usedSlotNumber);
            SupervisorSummary supervisorSummary = NimbusUtils.mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToUsedSlotNum);
            SupervisorWorkers supervisorWorkers = new SupervisorWorkers(supervisorSummary, workerList, metricInfoMap);
            return supervisorWorkers;
        }
        catch (TException e) {
            LOG.info("Failed to get ClusterSummary ", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            LOG.info("Failed to get ClusterSummary ", (Throwable)e);
            throw new TException((Throwable)e);
        }
        finally {
            long end = System.nanoTime();
            SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers", (end - start) / 1000L);
        }
    }

    @Override
    public TopologyInfo getTopologyInfo(String topologyId) throws NotAliveException, TException {
        long start = System.nanoTime();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            StormBase base = stormClusterState.storm_base(topologyId, null);
            if (base == null) {
                throw new NotAliveException("No topology of " + topologyId);
            }
            Assignment assignment = stormClusterState.assignment_info(topologyId, null);
            if (assignment == null) {
                throw new NotAliveException("No topology of " + topologyId);
            }
            TopologyTaskHbInfo topologyTaskHbInfo = this.data.getTasksHeartbeat().get(topologyId);
            Map<Integer, TaskHeartbeat> taskHbMap = null;
            if (topologyTaskHbInfo != null) {
                taskHbMap = topologyTaskHbInfo.get_taskHbs();
            }
            Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(stormClusterState, topologyId);
            Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, taskInfoMap);
            Map<Integer, String> taskToType = Cluster.get_all_task_type(stormClusterState, topologyId, taskInfoMap);
            String errorString = Cluster.is_topology_exist_error(stormClusterState, topologyId) ? "Y" : "";
            TopologySummary topologySummary = new TopologySummary();
            topologySummary.set_id(topologyId);
            topologySummary.set_name(base.getStormName());
            topologySummary.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
            topologySummary.set_status(base.getStatusString());
            topologySummary.set_numTasks(NimbusUtils.getTopologyTaskNum(assignment));
            topologySummary.set_numWorkers(assignment.getWorkers().size());
            topologySummary.set_errorInfo(errorString);
            HashMap<String, ComponentSummary> componentSummaryMap = new HashMap<String, ComponentSummary>();
            HashMap<String, List<Integer>> componentToTasks = JStormUtils.reverse_map(taskToComponent);
            for (Map.Entry<String, List<Integer>> entry : componentToTasks.entrySet()) {
                String name = entry.getKey();
                List<Integer> taskIds = entry.getValue();
                if (taskIds == null || taskIds.size() == 0) {
                    LOG.warn("No task of component " + name);
                    continue;
                }
                ComponentSummary componentSummary = new ComponentSummary();
                componentSummaryMap.put(name, componentSummary);
                componentSummary.set_name(name);
                componentSummary.set_type(taskToType.get(taskIds.get(0)));
                componentSummary.set_parallel(taskIds.size());
                componentSummary.set_taskIds(taskIds);
            }
            TreeMap<Integer, TaskSummary> taskSummaryMap = new TreeMap<Integer, TaskSummary>();
            Map<Integer, List<TaskError>> taskErrors = Cluster.get_all_task_errors(stormClusterState, topologyId);
            for (Integer taskId : taskInfoMap.keySet()) {
                List<TaskError> taskErrorList;
                TaskSummary taskSummary = new TaskSummary();
                taskSummaryMap.put(taskId, taskSummary);
                taskSummary.set_taskId(taskId);
                if (taskHbMap == null) {
                    taskSummary.set_status("Starting");
                    taskSummary.set_uptime(0);
                } else {
                    TaskHeartbeat hb = taskHbMap.get(taskId);
                    if (hb == null) {
                        taskSummary.set_status("Starting");
                        taskSummary.set_uptime(0);
                    } else {
                        boolean isInactive = NimbusUtils.isTaskDead(this.data, topologyId, taskId);
                        if (isInactive) {
                            taskSummary.set_status("INACTIVE");
                        } else {
                            taskSummary.set_status("ACTIVE");
                        }
                        taskSummary.set_uptime(hb.get_uptime());
                    }
                }
                if (StringUtils.isBlank((String)errorString) || (taskErrorList = taskErrors.get(taskId)) == null || taskErrorList.size() == 0) continue;
                for (TaskError taskError : taskErrorList) {
                    ErrorInfo errorInfo = new ErrorInfo(taskError.getError(), taskError.getTimSecs(), taskError.getLevel(), taskError.getCode());
                    taskSummary.add_to_errors(errorInfo);
                    String component = taskToComponent.get(taskId);
                    ((ComponentSummary)componentSummaryMap.get(component)).add_to_errors(errorInfo);
                }
            }
            for (ResourceWorkerSlot workerSlot : assignment.getWorkers()) {
                String hostname = workerSlot.getHostname();
                int port = workerSlot.getPort();
                for (Integer taskId : workerSlot.getTasks()) {
                    TaskSummary taskSummary = (TaskSummary)taskSummaryMap.get(taskId);
                    taskSummary.set_host(hostname);
                    taskSummary.set_port(port);
                }
            }
            TopologyInfo topologyInfo = new TopologyInfo();
            topologyInfo.set_topology(topologySummary);
            topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap.values()));
            topologyInfo.set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
            List<MetricInfo> tpMetricList = this.data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
            List<MetricInfo> compMetricList = this.data.getMetricCache().getMetricData(topologyId, MetaType.COMPONENT);
            List<MetricInfo> workerMetricList = this.data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
            MetricInfo taskMetric = MetricUtils.mkMetricInfo();
            MetricInfo streamMetric = MetricUtils.mkMetricInfo();
            MetricInfo nettyMetric = MetricUtils.mkMetricInfo();
            MetricInfo tpMetric = tpMetricList == null || tpMetricList.size() == 0 ? MetricUtils.mkMetricInfo() : tpMetricList.get(tpMetricList.size() - 1);
            MetricInfo compMetric = compMetricList == null || compMetricList.size() == 0 ? MetricUtils.mkMetricInfo() : compMetricList.get(0);
            MetricInfo workerMetric = workerMetricList == null || workerMetricList.size() == 0 ? MetricUtils.mkMetricInfo() : workerMetricList.get(0);
            TopologyMetric topologyMetrics = new TopologyMetric(tpMetric, compMetric, workerMetric, taskMetric, streamMetric, nettyMetric);
            topologyInfo.set_metrics(topologyMetrics);
            TopologyInfo topologyInfo2 = topologyInfo;
            return topologyInfo2;
        }
        catch (TException e) {
            LOG.info("Failed to get topologyInfo " + topologyId, (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            LOG.info("Failed to get topologyInfo " + topologyId, (Throwable)e);
            throw new TException("Failed to get topologyInfo" + topologyId);
        }
        finally {
            long end = System.nanoTime();
            SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", (end - start) / 1000L);
        }
    }

    @Override
    public TopologyInfo getTopologyInfoByName(String topologyName) throws NotAliveException, TException {
        String topologyId = this.getTopologyId(topologyName);
        return this.getTopologyInfo(topologyId);
    }

    @Override
    public String getNimbusConf() throws TException {
        try {
            return JStormUtils.to_json(this.data.getConf());
        }
        catch (Exception e) {
            String err = "Failed to generate Nimbus configuration";
            LOG.error(err, (Throwable)e);
            throw new TException(err);
        }
    }

    @Override
    public String getTopologyConf(String id) throws NotAliveException, TException {
        String rtn;
        try {
            Map topologyConf = StormConfig.read_nimbus_topology_conf(this.conf, id);
            rtn = JStormUtils.to_json(topologyConf);
        }
        catch (IOException e) {
            LOG.info("Failed to get configuration of " + id, (Throwable)e);
            throw new TException((Throwable)e);
        }
        return rtn;
    }

    @Override
    public String getTopologyId(String topologyName) throws NotAliveException, TException {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            String topologyId = Cluster.get_topology_id(stormClusterState, topologyName);
            if (topologyId != null) {
                return topologyId;
            }
        }
        catch (Exception e) {
            LOG.info("Failed to get getTopologyId " + topologyName, (Throwable)e);
            throw new TException("Failed to get getTopologyId " + topologyName);
        }
        throw new NotAliveException("No topology of " + topologyName);
    }

    @Override
    public StormTopology getTopology(String id) throws NotAliveException, TException {
        StormTopology topology;
        try {
            StormTopology stormtopology = StormConfig.read_nimbus_topology_code(this.conf, id);
            if (stormtopology == null) {
                throw new NotAliveException("No topology of " + id);
            }
            Map topologyConf = StormConfig.read_nimbus_topology_conf(this.conf, id);
            topology = Common.system_topology(topologyConf, stormtopology);
        }
        catch (Exception e) {
            LOG.error("Failed to get topology " + id + ",", (Throwable)e);
            throw new TException("Failed to get system_topology");
        }
        return topology;
    }

    @Override
    public StormTopology getUserTopology(String id) throws NotAliveException, TException {
        StormTopology topology = null;
        try {
            StormTopology stormtopology = StormConfig.read_nimbus_topology_code(this.conf, id);
            if (stormtopology == null) {
                throw new NotAliveException("No topology of " + id);
            }
            return topology;
        }
        catch (Exception e) {
            LOG.error("Failed to get topology " + id + ",", (Throwable)e);
            throw new TException("Failed to get system_topology");
        }
    }

    public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception {
        if (this.isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) {
            if (bActive) {
                throw new NotAliveException(topologyName + " is not alive");
            }
            throw new AlreadyAliveException(topologyName + " is already active");
        }
    }

    public boolean isTopologyActive(StormClusterState stormClusterState, String topologyName) throws Exception {
        boolean rtn = false;
        if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
            rtn = true;
        }
        return rtn;
    }

    private void setupStormCode(Map<Object, Object> conf, String topologyId, String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology) throws IOException {
        String stormroot = StormConfig.masterStormdistRoot(conf, topologyId);
        FileUtils.forceMkdir((File)new File(stormroot));
        FileUtils.cleanDirectory((File)new File(stormroot));
        this.setupJar(conf, tmpJarLocation, stormroot);
        FileUtils.writeByteArrayToFile((File)new File(StormConfig.stormcode_path(stormroot)), (byte[])Utils.serialize(topology));
        FileUtils.writeByteArrayToFile((File)new File(StormConfig.stormconf_path(stormroot)), (byte[])Utils.serialize(stormConf));
        StormConfig.write_nimbus_topology_timestamp(this.data.getConf(), topologyId, System.currentTimeMillis());
    }

    private boolean copyLibJars(String tmpJarLocation, String stormroot) throws IOException {
        String srcLibPath = StormConfig.stormlib_path(tmpJarLocation);
        String destLibPath = StormConfig.stormlib_path(stormroot);
        LOG.info("Begin to copy from " + srcLibPath + " to " + destLibPath);
        File srcFile = new File(srcLibPath);
        if (!srcFile.exists()) {
            LOG.info("No lib jars " + srcLibPath);
            return false;
        }
        File destFile = new File(destLibPath);
        FileUtils.copyDirectory((File)srcFile, (File)destFile);
        PathUtils.rmr(srcLibPath);
        LOG.info("Successfully copy libs " + destLibPath);
        return true;
    }

    private void setupJar(Map<Object, Object> conf, String tmpJarLocation, String stormroot) throws IOException {
        if (!StormConfig.local_mode(conf)) {
            boolean existLibs = this.copyLibJars(tmpJarLocation, stormroot);
            String jarPath = null;
            List<String> files = PathUtils.read_dir_contents(tmpJarLocation);
            for (String file : files) {
                if (!file.endsWith(".jar")) continue;
                jarPath = tmpJarLocation + "/" + file;
                break;
            }
            if (jarPath == null) {
                if (!existLibs) {
                    throw new IllegalArgumentException("No jar under " + tmpJarLocation);
                }
                LOG.info("No submit jar");
                return;
            }
            File srcFile = new File(jarPath);
            if (!srcFile.exists()) {
                throw new IllegalArgumentException(jarPath + " to copy to " + stormroot + " does not exist!");
            }
            String path = StormConfig.stormjar_path(stormroot);
            File destFile = new File(path);
            FileUtils.copyFile((File)srcFile, (File)destFile);
            srcFile.delete();
        }
    }

    public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
        Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId);
        int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
        TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId, masterId);
        this.data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo);
        stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo);
        if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
            throw new InvalidTopologyException("Failed to generate TaskIDs map");
        }
        stormClusterState.set_task(topologyId, taskToTaskInfo);
    }

    public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyid) throws IOException, InvalidTopologyException {
        Map stormConf = StormConfig.read_nimbus_topology_conf(conf, topologyid);
        StormTopology stopology = StormConfig.read_nimbus_topology_code(conf, topologyid);
        StormTopology topology = Common.system_topology(stormConf, stopology);
        return Common.mkTaskInfo(stormConf, topology, topologyid);
    }

    @Override
    public void metricMonitor(String topologyName, MonitorOptions options) throws TException {
        boolean isEnable = options.is_isEnable();
        StormClusterState clusterState = this.data.getStormClusterState();
        try {
            String topologyId = Cluster.get_topology_id(clusterState, topologyName);
            if (null == topologyId) {
                throw new NotAliveException("Failed to update metricsMonitor status as " + topologyName + " is not alive");
            }
            clusterState.set_storm_monitor(topologyId, isEnable);
        }
        catch (Exception e) {
            String errMsg = "Failed to update metricsMonitor " + topologyName;
            LOG.error(errMsg, (Throwable)e);
            throw new TException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TopologyMetric getTopologyMetrics(String topologyId) throws TException {
        LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " + topologyId);
        long start = System.nanoTime();
        try {
            TopologyMetric topologyMetric = this.data.getMetricRunnable().getTopologyMetric(topologyId);
            return topologyMetric;
        }
        finally {
            long end = System.nanoTime();
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / 1000L);
        }
    }

    @Override
    public void uploadTopologyMetrics(String topologyId, TopologyMetric uploadMetrics) throws TException {
        LOG.info("Received topology metrics:{}", (Object)topologyId);
        TopologyMetricsRunnable.Update event = new TopologyMetricsRunnable.Update();
        event.timestamp = System.currentTimeMillis();
        event.topologyMetrics = uploadMetrics;
        event.topologyId = topologyId;
        this.data.getMetricRunnable().pushEvent(event);
    }

    @Override
    public Map<String, Long> registerMetrics(String topologyId, Set<String> metrics) throws TException {
        try {
            return this.data.getMetricRunnable().registerMetrics(topologyId, metrics);
        }
        catch (Exception ex) {
            return null;
        }
    }

    public void uploadNewCredentials(String topologyName, Credentials creds) {
    }

    @Override
    public List<MetricInfo> getMetrics(String topologyId, int type) throws TException {
        MetaType metaType = MetaType.parse(type);
        return this.data.getMetricCache().getMetricData(topologyId, metaType);
    }

    @Override
    public MetricInfo getNettyMetrics(String topologyId) throws TException {
        List<MetricInfo> metricInfoList = this.data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
        if (metricInfoList != null && metricInfoList.size() > 0) {
            return metricInfoList.get(0);
        }
        return new MetricInfo();
    }

    @Override
    public MetricInfo getNettyMetricsByHost(String topologyId, String host) throws TException {
        MetricInfo ret = new MetricInfo();
        List<MetricInfo> metricInfoList = this.data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
        if (metricInfoList != null && metricInfoList.size() > 0) {
            MetricInfo metricInfo = metricInfoList.get(0);
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metricInfo.get_metrics().entrySet()) {
                String metricName = metricEntry.getKey();
                Map<Integer, MetricSnapshot> data = metricEntry.getValue();
                if (!metricName.contains(host)) continue;
                ret.put_to_metrics(metricName, data);
            }
        }
        LOG.info("getNettyMetricsByHost, total size:{}", (Object)ret.get_metrics_size());
        return ret;
    }

    @Override
    public int getNettyMetricSizeByHost(String topologyId, String host) throws TException {
        return this.getNettyMetricsByHost(topologyId, host).get_metrics_size();
    }

    @Override
    public MetricInfo getPagingNettyMetrics(String topologyId, String host, int page) throws TException {
        MetricInfo ret = new MetricInfo();
        int start = (page - 1) * 200;
        int end = page * 200;
        int cur = -1;
        List<MetricInfo> metricInfoList = this.data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
        if (metricInfoList != null && metricInfoList.size() > 0) {
            MetricInfo metricInfo = metricInfoList.get(0);
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metricInfo.get_metrics().entrySet()) {
                String metricName = metricEntry.getKey();
                Map<Integer, MetricSnapshot> data = metricEntry.getValue();
                if (!metricName.contains(host)) continue;
                if (++cur >= start && cur < end) {
                    ret.put_to_metrics(metricName, data);
                }
                if (cur < end) continue;
                break;
            }
        }
        LOG.info("getNettyMetricsByHost, total size:{}", (Object)ret.get_metrics_size());
        return ret;
    }

    @Override
    public MetricInfo getTaskMetrics(String topologyId, String component) throws TException {
        List<MetricInfo> taskMetricList = this.getMetrics(topologyId, MetaType.TASK.getT());
        if (taskMetricList != null && taskMetricList.size() > 0) {
            MetricInfo metricInfo = taskMetricList.get(0);
            Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
            Iterator<String> itr = metrics.keySet().iterator();
            while (itr.hasNext()) {
                String metricName = itr.next();
                String[] parts = metricName.split("@");
                if (parts.length >= 7 && parts[2].equals(component)) continue;
                itr.remove();
            }
            LOG.info("taskMetric, total size:{}", (Object)metricInfo.get_metrics_size());
            return metricInfo;
        }
        return MetricUtils.mkMetricInfo();
    }

    @Override
    public List<MetricInfo> getTaskAndStreamMetrics(String topologyId, int taskId) throws TException {
        MetricInfo streamMetricInfo;
        MetricInfo taskMetricInfo;
        List<MetricInfo> taskMetricList = this.getMetrics(topologyId, MetaType.TASK.getT());
        List<MetricInfo> streamMetricList = this.getMetrics(topologyId, MetaType.STREAM.getT());
        String taskIdStr = taskId + "";
        if (taskMetricList != null && taskMetricList.size() > 0) {
            taskMetricInfo = taskMetricList.get(0);
            Map<String, Map<Integer, MetricSnapshot>> metrics = taskMetricInfo.get_metrics();
            Iterator<String> itr = metrics.keySet().iterator();
            while (itr.hasNext()) {
                String metricName = itr.next();
                String[] parts = metricName.split("@");
                if (parts.length >= 7 && parts[3].equals(taskIdStr)) continue;
                itr.remove();
            }
        } else {
            taskMetricInfo = MetricUtils.mkMetricInfo();
        }
        if (streamMetricList != null && streamMetricList.size() > 0) {
            streamMetricInfo = streamMetricList.get(0);
            Map<String, Map<Integer, MetricSnapshot>> metrics = streamMetricInfo.get_metrics();
            Iterator<String> itr = metrics.keySet().iterator();
            while (itr.hasNext()) {
                String metricName = itr.next();
                String[] parts = metricName.split("@");
                if (parts.length >= 7 && parts[3].equals(taskIdStr)) continue;
                itr.remove();
            }
        } else {
            streamMetricInfo = MetricUtils.mkMetricInfo();
        }
        return Lists.newArrayList((Object[])new MetricInfo[]{taskMetricInfo, streamMetricInfo});
    }

    @Override
    public List<MetricInfo> getSummarizedTopologyMetrics(String topologyId) throws TException {
        return this.data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
    }

    @Override
    public void updateTopology(String name, String uploadedLocation, String updateConf) throws NotAliveException, InvalidTopologyException, TException {
        try {
            this.checkTopologyActive(this.data, name, true);
            String topologyId = null;
            StormClusterState stormClusterState = this.data.getStormClusterState();
            topologyId = Cluster.get_topology_id(stormClusterState, name);
            if (topologyId == null) {
                throw new NotAliveException(name);
            }
            if (uploadedLocation != null) {
                String stormroot = StormConfig.masterStormdistRoot(this.conf, topologyId);
                int lastIndexOf = uploadedLocation.lastIndexOf("/");
                String tmpDir = uploadedLocation.substring(0, lastIndexOf);
                String stormJarPath = StormConfig.stormjar_path(tmpDir);
                File file = new File(uploadedLocation);
                if (!file.exists()) {
                    throw new FileNotFoundException("Source '" + uploadedLocation + "' does not exist");
                }
                file.renameTo(new File(stormJarPath));
                File srcDir = new File(tmpDir);
                File destDir = new File(stormroot);
                try {
                    FileUtils.moveDirectory((File)srcDir, (File)destDir);
                }
                catch (FileExistsException e) {
                    FileUtils.copyDirectory((File)srcDir, (File)destDir);
                    FileUtils.deleteQuietly((File)srcDir);
                }
                StormConfig.write_nimbus_topology_timestamp(this.data.getConf(), topologyId, System.currentTimeMillis());
                LOG.info("update jar of " + name + " successfully");
            }
            Map topoConf = StormConfig.read_nimbus_topology_conf(this.data.getConf(), topologyId);
            Map config = (Map)JStormUtils.from_json(updateConf);
            topoConf.putAll(config);
            StormConfig.write_nimbus_topology_conf(this.data.getConf(), topologyId, topoConf);
            NimbusUtils.transitionName(this.data, name, true, StatusType.update_topology, config);
            LOG.info("update topology " + name + " successfully");
            this.notifyTopologyActionListener(name, "updateTopology");
        }
        catch (NotAliveException e) {
            String errMsg = "Error, no this topology " + name;
            LOG.error(errMsg, (Throwable)((Object)e));
            throw new NotAliveException(errMsg);
        }
        catch (Exception e) {
            String errMsg = "Failed to update topology " + name;
            LOG.error(errMsg, (Throwable)e);
            throw new TException(errMsg);
        }
    }

    @Override
    public void updateTaskHeartbeat(TopologyTaskHbInfo taskHbs) throws TException {
        Map<Integer, TaskHeartbeat> taskHbMap;
        Map<Integer, TaskHeartbeat> nimbusTaskHbMap;
        String topologyId = taskHbs.get_topologyId();
        Integer topologyMasterId = taskHbs.get_topologyMasterId();
        TopologyTaskHbInfo nimbusTaskHbs = this.data.getTasksHeartbeat().get(topologyId);
        if (nimbusTaskHbs == null) {
            nimbusTaskHbs = new TopologyTaskHbInfo(topologyId, topologyMasterId);
            this.data.getTasksHeartbeat().put(topologyId, nimbusTaskHbs);
        }
        if ((nimbusTaskHbMap = nimbusTaskHbs.get_taskHbs()) == null) {
            nimbusTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
            nimbusTaskHbs.set_taskHbs(nimbusTaskHbMap);
        }
        if ((taskHbMap = taskHbs.get_taskHbs()) != null) {
            for (Map.Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
                nimbusTaskHbMap.put(entry.getKey(), entry.getValue());
            }
        }
    }

    private void notifyTopologyActionListener(String topologyName, String action) {
        ITopologyActionNotifierPlugin nimbusNotify = this.data.getNimbusNotify();
        if (nimbusNotify != null) {
            nimbusNotify.notify(topologyName, action);
        }
    }
}

