package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.Config;
import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ComponentSummary;
import backtype.storm.generated.Credentials;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.MonitorOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.SupervisorWorkers;
import backtype.storm.generated.TaskComponent;
import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TaskSummary;
import backtype.storm.generated.TopologyAssignException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.generated.WorkerSummary;
import backtype.storm.nimbus.ITopologyActionNotifierPlugin;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.DaemonCommon;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.Thrift;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/ServiceHandler.class */
public class ServiceHandler implements Nimbus.Iface, Shutdownable, DaemonCommon {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceHandler.class);
    public static final int THREAD_NUM = 64;
    private NimbusData data;
    private Map<Object, Object> conf;

    public ServiceHandler(NimbusData nimbusData) {
        this.data = nimbusData;
        this.conf = nimbusData.getConf();
    }

    @Override // backtype.storm.daemon.Shutdownable
    public void shutdown() {
        LOG.info("Begin to shut down master");
        ITopologyActionNotifierPlugin nimbusNotify = this.data.getNimbusNotify();
        if (nimbusNotify != null) {
            nimbusNotify.cleanup();
        }
        LOG.info("Successfully shut down master");
    }

    @Override // com.alibaba.jstorm.cluster.DaemonCommon
    public boolean waiting() {
        return false;
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void submitTopology(String str, String str2, String str3, StormTopology stormTopology) throws TException, AlreadyAliveException, InvalidTopologyException, TopologyAssignException {
        submitTopologyWithOpts(str, str2, str3, stormTopology, new SubmitOptions(TopologyInitialStatus.ACTIVE));
    }

    private void makeAssignment(String str, String str2, TopologyInitialStatus topologyInitialStatus) throws FailedAssignTopologyException {
        TopologyAssignEvent topologyAssignEvent = new TopologyAssignEvent();
        topologyAssignEvent.setTopologyId(str2);
        topologyAssignEvent.setScratch(false);
        topologyAssignEvent.setTopologyName(str);
        topologyAssignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(topologyInitialStatus));
        TopologyAssign.push(topologyAssignEvent);
        if (!topologyAssignEvent.waitFinish()) {
            throw new FailedAssignTopologyException(topologyAssignEvent.getErrorMsg());
        }
        LOG.info("Finish submit for " + str);
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Type inference failed for: r16v2, types: [java.lang.Throwable, backtype.storm.generated.InvalidTopologyException] */
    @Override // backtype.storm.generated.Nimbus.Iface
    public void submitTopologyWithOpts(String str, String str2, String str3, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException {
        String str4;
        LOG.info("Receive " + str + ", uploadedJarLocation:" + str2);
        long nanoTime = System.nanoTime();
        if (!Common.charValidate(str)) {
            throw new InvalidTopologyException(str + " is not a valid topology name");
        }
        try {
            checkTopologyActive(this.data, str, false);
            synchronized (this.data) {
                Iterator<String> it = this.data.getPendingSubmitTopoloygs().keySet().iterator();
                while (it.hasNext()) {
                    if (it.next().contains(str + "-")) {
                        throw new AlreadyAliveException(str + "  were submitted");
                    }
                }
                str4 = Common.topologyNameToId(str, this.data.getSubmittedCount().incrementAndGet());
                this.data.getPendingSubmitTopoloygs().put(str4, null);
            }
            try {
                try {
                    try {
                        Map map = (Map) JStormUtils.from_json(str3);
                        if (map == null) {
                            LOG.warn("Failed to serialized Configuration");
                            throw new InvalidTopologyException("Failed to serialize topology configuration");
                        }
                        map.put("topology.id", str4);
                        map.put(Config.TOPOLOGY_NAME, str);
                        Map<Object, Object> normalizeConf = NimbusUtils.normalizeConf(this.conf, map, stormTopology);
                        LOG.info("Normalized configuration:" + normalizeConf);
                        HashMap hashMap = new HashMap(this.conf);
                        hashMap.putAll(normalizeConf);
                        StormTopology normalizeTopology = NimbusUtils.normalizeTopology(normalizeConf, stormTopology, true);
                        Common.validate_basic(normalizeTopology, hashMap, str4);
                        StormClusterState stormClusterState = this.data.getStormClusterState();
                        double metricSampleRate = ConfigExtension.getMetricSampleRate(normalizeConf);
                        setupStormCode(this.conf, str4, str2, normalizeConf, normalizeTopology);
                        setupZkTaskInfo(this.conf, str4, stormClusterState);
                        LOG.info("Submit for " + str + " with conf " + map);
                        makeAssignment(str, str4, submitOptions.get_initial_status());
                        this.data.getPendingSubmitTopoloygs().remove(str4);
                        TopologyMetricsRunnable.StartTopologyEvent startTopologyEvent = new TopologyMetricsRunnable.StartTopologyEvent();
                        startTopologyEvent.clusterName = this.data.getClusterName();
                        startTopologyEvent.topologyId = str4;
                        startTopologyEvent.timestamp = System.currentTimeMillis();
                        startTopologyEvent.sampleRate = metricSampleRate;
                        this.data.getMetricRunnable().pushEvent(startTopologyEvent);
                        notifyTopologyActionListener(str, "submitTopology");
                        double nanoTime2 = (System.nanoTime() - nanoTime) / 1000;
                        SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts", Double.valueOf(nanoTime2));
                        LOG.info("submitTopologyWithOpts {} costs {}ms", str, Double.valueOf(nanoTime2));
                    } catch (FailedAssignTopologyException e) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("Fail to sumbit topology, Root cause:");
                        if (e.getMessage() == null) {
                            sb.append("submit timeout");
                        } else {
                            sb.append(e.getMessage());
                        }
                        sb.append("\n\n");
                        sb.append("topologyId:" + str4);
                        sb.append(", uploadedJarLocation:" + str2 + "\n");
                        LOG.error(sb.toString(), e);
                        this.data.getPendingSubmitTopoloygs().remove(str4);
                        throw new TopologyAssignException(sb.toString());
                    } catch (InvalidParameterException e2) {
                        StringBuilder sb2 = new StringBuilder();
                        sb2.append("Fail to sumbit topology ");
                        sb2.append(e2.getMessage());
                        sb2.append(", cause:" + e2.getCause());
                        sb2.append("\n\n");
                        sb2.append("topologyId:" + str4);
                        sb2.append(", uploadedJarLocation:" + str2 + "\n");
                        LOG.error(sb2.toString(), e2);
                        this.data.getPendingSubmitTopoloygs().remove(str4);
                        throw new InvalidParameterException(sb2.toString());
                    }
                } catch (InvalidTopologyException e3) {
                    LOG.error("Topology is invalid. " + e3.get_msg());
                    this.data.getPendingSubmitTopoloygs().remove(str4);
                    throw e3;
                } catch (Throwable th) {
                    StringBuilder sb3 = new StringBuilder();
                    sb3.append("Fail to sumbit topology ");
                    sb3.append(th.getMessage());
                    sb3.append(", cause:" + th.getCause());
                    sb3.append("\n\n");
                    sb3.append("topologyId:" + str4);
                    sb3.append(", uploadedJarLocation:" + str2 + "\n");
                    LOG.error(sb3.toString(), th);
                    this.data.getPendingSubmitTopoloygs().remove(str4);
                    throw new TopologyAssignException(sb3.toString());
                }
            } catch (Throwable th2) {
                double nanoTime3 = (System.nanoTime() - nanoTime) / 1000;
                SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts", Double.valueOf(nanoTime3));
                LOG.info("submitTopologyWithOpts {} costs {}ms", str, Double.valueOf(nanoTime3));
                throw th2;
            }
        } catch (AlreadyAliveException e4) {
            LOG.info(str + " already exists ");
            throw e4;
        } catch (Throwable th3) {
            LOG.info("Failed to check whether topology is alive or not", th3);
            throw new TException(th3);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void killTopology(String str) throws TException, NotAliveException {
        killTopologyWithOpts(str, new KillOptions());
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void killTopologyWithOpts(String str, KillOptions killOptions) throws TException, NotAliveException {
        try {
            checkTopologyActive(this.data, str, true);
            String topologyId = getTopologyId(str);
            Integer num = null;
            if (killOptions.is_set_wait_secs()) {
                num = Integer.valueOf(killOptions.get_wait_secs());
            }
            NimbusUtils.transitionName(this.data, str, true, StatusType.kill, num);
            TopologyMetricsRunnable.Remove remove = new TopologyMetricsRunnable.Remove();
            remove.topologyId = topologyId;
            this.data.getMetricRunnable().pushEvent(remove);
            notifyTopologyActionListener(str, "killTopology");
        } catch (NotAliveException e) {
            String str2 = "KillTopology Error, no this topology " + str;
            LOG.error(str2, e);
            throw new NotAliveException(str2);
        } catch (Exception e2) {
            String str3 = "Failed to kill topology " + str;
            LOG.error(str3, e2);
            throw new TException(str3);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void activate(String str) throws TException, NotAliveException {
        try {
            NimbusUtils.transitionName(this.data, str, true, StatusType.activate, new Object[0]);
            notifyTopologyActionListener(str, "activate");
        } catch (NotAliveException e) {
            String str2 = "Activate Error, no this topology " + str;
            LOG.error(str2, e);
            throw new NotAliveException(str2);
        } catch (Exception e2) {
            String str3 = "Failed to active topology " + str;
            LOG.error(str3, e2);
            throw new TException(str3);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void deactivate(String str) throws TException, NotAliveException {
        try {
            NimbusUtils.transitionName(this.data, str, true, StatusType.inactivate, new Object[0]);
            notifyTopologyActionListener(str, "inactivate");
        } catch (NotAliveException e) {
            String str2 = "Deactivate Error, no this topology " + str;
            LOG.error(str2, e);
            throw new NotAliveException(str2);
        } catch (Exception e2) {
            String str3 = "Failed to deactivate topology " + str;
            LOG.error(str3, e2);
            throw new TException(str3);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void rebalance(String str, RebalanceOptions rebalanceOptions) throws TException, NotAliveException {
        try {
            checkTopologyActive(this.data, str, true);
            Integer num = null;
            String str2 = null;
            Boolean bool = false;
            if (rebalanceOptions != null) {
                if (rebalanceOptions.is_set_wait_secs()) {
                    num = Integer.valueOf(rebalanceOptions.get_wait_secs());
                }
                if (rebalanceOptions.is_set_reassign()) {
                    bool = Boolean.valueOf(rebalanceOptions.is_reassign());
                }
                if (rebalanceOptions.is_set_conf()) {
                    str2 = rebalanceOptions.get_conf();
                }
            }
            LOG.info("Begin to rebalance " + str + "wait_time:" + num + ", reassign: " + bool + ", new worker/bolt configuration:" + str2);
            NimbusUtils.transitionName(this.data, str, true, StatusType.rebalance, num, bool, (Map) JStormUtils.from_json(str2));
            notifyTopologyActionListener(str, "rebalance");
        } catch (NotAliveException e) {
            String str3 = "Rebalance Error, no this topology " + str;
            LOG.error(str3, e);
            throw new NotAliveException(str3);
        } catch (Exception e2) {
            String str4 = "Failed to rebalance topology " + str;
            LOG.error(str4, e2);
            throw new TException(str4);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void restart(String str, String str2) throws TException, NotAliveException, InvalidTopologyException, TopologyAssignException {
        String str3;
        LOG.info("Begin to restart " + str + ", new configuration:" + str2);
        try {
            str3 = Cluster.get_topology_id(this.data.getStormClusterState(), str);
        } catch (Exception e) {
            str3 = null;
        }
        if (str3 == null) {
            LOG.info("No topology of " + str);
            throw new NotAliveException("No topology of " + str);
        }
        deactivate(str);
        JStormUtils.sleepMs(5000L);
        LOG.info("Deactivate " + str);
        String str4 = null;
        try {
            StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(this.conf, str3);
            Map read_nimbus_topology_conf = StormConfig.read_nimbus_topology_conf(this.conf, str3);
            if (str2 != null) {
                read_nimbus_topology_conf.putAll((Map) JStormUtils.from_json(str2));
            }
            String masterStormdistRoot = StormConfig.masterStormdistRoot(this.conf, str3);
            str4 = StormConfig.masterInbox(this.conf) + "/" + str3;
            FileUtils.forceMkdir(new File(str4));
            FileUtils.cleanDirectory(new File(str4));
            File file = new File(masterStormdistRoot);
            file.setLastModified(System.currentTimeMillis());
            FileUtils.copyDirectory(file, new File(str4));
            LOG.info("Successfully read old jar/conf/topology " + str);
            notifyTopologyActionListener(str, "restart");
            new RemoveTransitionCallback(this.data, str3).execute(new Object[0]);
            LOG.info("Successfully kill the topology " + str);
            TopologyMetricsRunnable.KillTopologyEvent killTopologyEvent = new TopologyMetricsRunnable.KillTopologyEvent();
            killTopologyEvent.clusterName = this.data.getClusterName();
            killTopologyEvent.topologyId = str3;
            killTopologyEvent.timestamp = System.currentTimeMillis();
            this.data.getMetricRunnable().pushEvent(killTopologyEvent);
            TopologyMetricsRunnable.Remove remove = new TopologyMetricsRunnable.Remove();
            remove.topologyId = str3;
            this.data.getMetricRunnable().pushEvent(remove);
            try {
                try {
                    submitTopology(str, str4, JStormUtils.to_json(read_nimbus_topology_conf), read_nimbus_topology_code);
                } finally {
                    try {
                        PathUtils.rmr(str4);
                    } catch (IOException e2) {
                    }
                }
            } catch (AlreadyAliveException e3) {
                LOG.info("Failed to kill the topology" + str);
                throw new TException("Failed to kill the topology" + str);
            }
        } catch (Exception e4) {
            LOG.error("Failed to read old jar/conf/topology", e4);
            if (str4 != null) {
                try {
                    PathUtils.rmr(str4);
                } catch (IOException e5) {
                }
            }
            throw new TException("Failed to read old jar/conf/topology ");
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void beginLibUpload(String str) throws TException {
        try {
            PathUtils.local_mkdirs(PathUtils.parent_path(str));
            this.data.getUploaders().put(str, Channels.newChannel(new FileOutputStream(str)));
            LOG.info("Begin upload file from client to " + str);
        } catch (Exception e) {
            LOG.error("Fail to upload jar " + str, e);
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String beginFileUpload() throws TException {
        String str = null;
        try {
            String uuid = UUID.randomUUID().toString();
            String str2 = StormConfig.masterInbox(this.conf) + "/" + uuid;
            FileUtils.forceMkdir(new File(str2));
            FileUtils.cleanDirectory(new File(str2));
            str = str2 + "/stormjar-" + uuid + ".jar";
            this.data.getUploaders().put(str, Channels.newChannel(new FileOutputStream(str)));
            LOG.info("Begin upload file from client to " + str);
            return str2;
        } catch (FileNotFoundException e) {
            LOG.error("File not found: " + str, e);
            throw new TException(e);
        } catch (IOException e2) {
            LOG.error("Upload file error: " + str, e2);
            throw new TException(e2);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void uploadChunk(String str, ByteBuffer byteBuffer) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(str);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out) " + str);
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + str);
            }
            WritableByteChannel writableByteChannel = (WritableByteChannel) obj;
            writableByteChannel.write(byteBuffer);
            uploaders.put(str, writableByteChannel);
        } catch (IOException e) {
            LOG.error(" WritableByteChannel write filed when uploadChunk " + str);
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void finishFileUpload(String str) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(str);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out)");
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + str);
            }
            ((WritableByteChannel) obj).close();
            uploaders.remove(str);
            LOG.info("Finished uploading file from client: " + str);
        } catch (IOException e) {
            LOG.error(" WritableByteChannel close failed when finishFileUpload " + str);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String beginFileDownload(String str) throws TException {
        try {
            BufferFileInputStream bufferFileInputStream = new BufferFileInputStream(str, JStormUtils.parseInt(this.conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), 1048576).intValue() / 2);
            String uuid = UUID.randomUUID().toString();
            this.data.getDownloaders().put(uuid, bufferFileInputStream);
            return uuid;
        } catch (FileNotFoundException e) {
            LOG.error(e + "file:" + str + " not found");
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public ByteBuffer downloadChunk(String str) throws TException {
        TimeCacheMap<Object, Object> downloaders = this.data.getDownloaders();
        Object obj = downloaders.get(str);
        if (obj == null) {
            throw new TException("Could not find input stream for that id");
        }
        try {
            if (!(obj instanceof BufferFileInputStream)) {
                throw new TException("Object isn't BufferFileInputStream for " + str);
            }
            BufferFileInputStream bufferFileInputStream = (BufferFileInputStream) obj;
            byte[] read = bufferFileInputStream.read();
            if (read == null) {
                return ByteBuffer.wrap(new byte[0]);
            }
            downloaders.put(str, bufferFileInputStream);
            return ByteBuffer.wrap(read);
        } catch (IOException e) {
            LOG.error("BufferFileInputStream read failed when downloadChunk ", e);
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void finishFileDownload(String str) throws TException {
        this.data.getDownloaders().remove(str);
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public ClusterSummary getClusterInfo() throws TException {
        long nanoTime = System.nanoTime();
        try {
            try {
                StormClusterState stormClusterState = this.data.getStormClusterState();
                HashMap hashMap = new HashMap();
                List<TopologySummary> topologySummary = NimbusUtils.getTopologySummary(stormClusterState, hashMap);
                List<SupervisorSummary> mkSupervisorSummaries = NimbusUtils.mkSupervisorSummaries(Cluster.get_all_SupervisorInfo(stormClusterState, null), hashMap);
                ClusterSummary clusterSummary = new ClusterSummary(NimbusUtils.getNimbusSummary(stormClusterState, mkSupervisorSummaries, this.data), mkSupervisorSummaries, topologySummary);
                SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
                return clusterSummary;
            } catch (Exception e) {
                LOG.info("Failed to get ClusterSummary ", e);
                throw new TException(e);
            } catch (TException e2) {
                LOG.info("Failed to get ClusterSummary ", e2);
                throw e2;
            }
        } catch (Throwable th) {
            SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            throw th;
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String getVersion() throws TException {
        return Utils.getVersion();
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public SupervisorWorkers getSupervisorWorkers(String str) throws NotAliveException, TException {
        long nanoTime = System.nanoTime();
        try {
            try {
                StormClusterState stormClusterState = this.data.getStormClusterState();
                String str2 = null;
                SupervisorInfo supervisorInfo = null;
                String host2Ip = NetWorkUtils.host2Ip(str);
                String ip2Host = NetWorkUtils.ip2Host(str);
                for (Map.Entry<String, SupervisorInfo> entry : Cluster.get_all_SupervisorInfo(stormClusterState, null).entrySet()) {
                    SupervisorInfo value = entry.getValue();
                    if (value.getHostName().equals(ip2Host) || value.getHostName().equals(host2Ip)) {
                        str2 = entry.getKey();
                        supervisorInfo = value;
                        break;
                    }
                }
                if (str2 == null) {
                    throw new TException("No supervisor of " + str);
                }
                Map<String, Assignment> map = Cluster.get_all_assignment(stormClusterState, null);
                TreeMap treeMap = new TreeMap();
                int i = 0;
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                for (Map.Entry<String, Assignment> entry2 : map.entrySet()) {
                    String key = entry2.getKey();
                    Assignment value2 = entry2.getValue();
                    for (ResourceWorkerSlot resourceWorkerSlot : value2.getWorkers()) {
                        if (str2.equals(resourceWorkerSlot.getNodeId())) {
                            i++;
                            Integer valueOf = Integer.valueOf(resourceWorkerSlot.getPort());
                            WorkerSummary workerSummary = (WorkerSummary) treeMap.get(valueOf);
                            if (workerSummary == null) {
                                workerSummary = new WorkerSummary();
                                workerSummary.set_port(valueOf.intValue());
                                workerSummary.set_topology(key);
                                workerSummary.set_tasks(new ArrayList());
                                treeMap.put(valueOf, workerSummary);
                            }
                            Map<Integer, String> map2 = (Map) hashMap.get(key);
                            if (map2 == null) {
                                map2 = Cluster.get_all_task_component(stormClusterState, key, null);
                                hashMap.put(key, map2);
                            }
                            int current_time_secs = TimeUtils.current_time_secs();
                            for (Integer num : resourceWorkerSlot.getTasks()) {
                                TaskComponent taskComponent = new TaskComponent();
                                taskComponent.set_component(map2.get(num));
                                taskComponent.set_taskId(num.intValue());
                                Integer num2 = value2.getTaskStartTimeSecs().get(num);
                                if (num2 != null && num2.intValue() < current_time_secs) {
                                    current_time_secs = num2.intValue();
                                }
                                workerSummary.add_to_tasks(taskComponent);
                            }
                            workerSummary.set_uptime(TimeUtils.time_delta(current_time_secs));
                            String workerSlotName = TopologyMetricsRunnable.getWorkerSlotName(supervisorInfo.getHostName(), valueOf);
                            List<MetricInfo> metricData = this.data.getMetricCache().getMetricData(key, MetaType.WORKER);
                            if (metricData.size() > 0) {
                                MetricInfo metricInfo = metricData.get(0);
                                Iterator<String> it = metricInfo.get_metrics().keySet().iterator();
                                while (it.hasNext()) {
                                    if (!it.next().contains(str)) {
                                        it.remove();
                                    }
                                }
                                hashMap2.put(workerSlotName, metricInfo);
                            }
                        }
                    }
                }
                ArrayList arrayList = new ArrayList();
                arrayList.addAll(treeMap.values());
                HashMap hashMap3 = new HashMap();
                hashMap3.put(str2, Integer.valueOf(i));
                SupervisorWorkers supervisorWorkers = new SupervisorWorkers(NimbusUtils.mkSupervisorSummary(supervisorInfo, str2, hashMap3), arrayList, hashMap2);
                SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
                return supervisorWorkers;
            } catch (TException e) {
                LOG.info("Failed to get ClusterSummary ", e);
                throw e;
            } catch (Exception e2) {
                LOG.info("Failed to get ClusterSummary ", e2);
                throw new TException(e2);
            }
        } catch (Throwable th) {
            SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            throw th;
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public TopologyInfo getTopologyInfo(String str) throws NotAliveException, TException {
        long nanoTime = System.nanoTime();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            try {
                try {
                    StormBase storm_base = stormClusterState.storm_base(str, null);
                    if (storm_base == null) {
                        throw new NotAliveException("No topology of " + str);
                    }
                    Assignment assignment_info = stormClusterState.assignment_info(str, null);
                    if (assignment_info == null) {
                        throw new NotAliveException("No topology of " + str);
                    }
                    TopologyTaskHbInfo topologyTaskHbInfo = this.data.getTasksHeartbeat().get(str);
                    Map<Integer, TaskHeartbeat> map = topologyTaskHbInfo != null ? topologyTaskHbInfo.get_taskHbs() : null;
                    Map<Integer, TaskInfo> map2 = Cluster.get_all_taskInfo(stormClusterState, str);
                    Map<Integer, String> map3 = Cluster.get_all_task_component(stormClusterState, str, map2);
                    Map<Integer, String> map4 = Cluster.get_all_task_type(stormClusterState, str, map2);
                    String str2 = Cluster.is_topology_exist_error(stormClusterState, str) ? "Y" : "";
                    TopologySummary topologySummary = new TopologySummary();
                    topologySummary.set_id(str);
                    topologySummary.set_name(storm_base.getStormName());
                    topologySummary.set_uptimeSecs(TimeUtils.time_delta(storm_base.getLanchTimeSecs()));
                    topologySummary.set_status(storm_base.getStatusString());
                    topologySummary.set_numTasks(NimbusUtils.getTopologyTaskNum(assignment_info));
                    topologySummary.set_numWorkers(assignment_info.getWorkers().size());
                    topologySummary.set_errorInfo(str2);
                    HashMap hashMap = new HashMap();
                    for (Map.Entry entry : JStormUtils.reverse_map(map3).entrySet()) {
                        String str3 = (String) entry.getKey();
                        List<Integer> list = (List) entry.getValue();
                        if (list == null || list.size() == 0) {
                            LOG.warn("No task of component " + str3);
                        } else {
                            ComponentSummary componentSummary = new ComponentSummary();
                            hashMap.put(str3, componentSummary);
                            componentSummary.set_name(str3);
                            componentSummary.set_type(map4.get(list.get(0)));
                            componentSummary.set_parallel(list.size());
                            componentSummary.set_taskIds(list);
                        }
                    }
                    TreeMap treeMap = new TreeMap();
                    Map<Integer, List<TaskError>> map5 = Cluster.get_all_task_errors(stormClusterState, str);
                    for (Integer num : map2.keySet()) {
                        TaskSummary taskSummary = new TaskSummary();
                        treeMap.put(num, taskSummary);
                        taskSummary.set_taskId(num.intValue());
                        if (map == null) {
                            taskSummary.set_status("Starting");
                            taskSummary.set_uptime(0);
                        } else {
                            TaskHeartbeat taskHeartbeat = map.get(num);
                            if (taskHeartbeat == null) {
                                taskSummary.set_status("Starting");
                                taskSummary.set_uptime(0);
                            } else {
                                if (NimbusUtils.isTaskDead(this.data, str, num)) {
                                    taskSummary.set_status("INACTIVE");
                                } else {
                                    taskSummary.set_status("ACTIVE");
                                }
                                taskSummary.set_uptime(taskHeartbeat.get_uptime());
                            }
                        }
                        if (!StringUtils.isBlank(str2)) {
                            List<TaskError> list2 = map5.get(num);
                            if (list2 != null && list2.size() != 0) {
                                for (TaskError taskError : list2) {
                                    ErrorInfo errorInfo = new ErrorInfo(taskError.getError(), taskError.getTimSecs(), taskError.getLevel(), taskError.getCode());
                                    taskSummary.add_to_errors(errorInfo);
                                    ((ComponentSummary) hashMap.get(map3.get(num))).add_to_errors(errorInfo);
                                }
                            }
                        }
                    }
                    for (ResourceWorkerSlot resourceWorkerSlot : assignment_info.getWorkers()) {
                        String hostname = resourceWorkerSlot.getHostname();
                        int port = resourceWorkerSlot.getPort();
                        Iterator<Integer> it = resourceWorkerSlot.getTasks().iterator();
                        while (it.hasNext()) {
                            TaskSummary taskSummary2 = (TaskSummary) treeMap.get(it.next());
                            taskSummary2.set_host(hostname);
                            taskSummary2.set_port(port);
                        }
                    }
                    TopologyInfo topologyInfo = new TopologyInfo();
                    topologyInfo.set_topology(topologySummary);
                    topologyInfo.set_components(JStormUtils.mk_list(hashMap.values()));
                    topologyInfo.set_tasks(JStormUtils.mk_list(treeMap.values()));
                    List<MetricInfo> metricData = this.data.getMetricCache().getMetricData(str, MetaType.TOPOLOGY);
                    List<MetricInfo> metricData2 = this.data.getMetricCache().getMetricData(str, MetaType.COMPONENT);
                    List<MetricInfo> metricData3 = this.data.getMetricCache().getMetricData(str, MetaType.WORKER);
                    topologyInfo.set_metrics(new TopologyMetric((metricData == null || metricData.size() == 0) ? MetricUtils.mkMetricInfo() : metricData.get(metricData.size() - 1), (metricData2 == null || metricData2.size() == 0) ? MetricUtils.mkMetricInfo() : metricData2.get(0), (metricData3 == null || metricData3.size() == 0) ? MetricUtils.mkMetricInfo() : metricData3.get(0), MetricUtils.mkMetricInfo(), MetricUtils.mkMetricInfo(), MetricUtils.mkMetricInfo()));
                    SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
                    return topologyInfo;
                } catch (TException e) {
                    LOG.info("Failed to get topologyInfo " + str, e);
                    throw e;
                }
            } catch (Exception e2) {
                LOG.info("Failed to get topologyInfo " + str, e2);
                throw new TException("Failed to get topologyInfo" + str);
            }
        } catch (Throwable th) {
            SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            throw th;
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public TopologyInfo getTopologyInfoByName(String str) throws NotAliveException, TException {
        return getTopologyInfo(getTopologyId(str));
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String getNimbusConf() throws TException {
        try {
            return JStormUtils.to_json(this.data.getConf());
        } catch (Exception e) {
            LOG.error("Failed to generate Nimbus configuration", e);
            throw new TException("Failed to generate Nimbus configuration");
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String getTopologyConf(String str) throws NotAliveException, TException {
        try {
            return JStormUtils.to_json(StormConfig.read_nimbus_topology_conf(this.conf, str));
        } catch (IOException e) {
            LOG.info("Failed to get configuration of " + str, e);
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public String getTopologyId(String str) throws NotAliveException, TException {
        try {
            String str2 = Cluster.get_topology_id(this.data.getStormClusterState(), str);
            if (str2 != null) {
                return str2;
            }
            throw new NotAliveException("No topology of " + str);
        } catch (Exception e) {
            LOG.info("Failed to get getTopologyId " + str, e);
            throw new TException("Failed to get getTopologyId " + str);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public StormTopology getTopology(String str) throws NotAliveException, TException {
        try {
            StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(this.conf, str);
            if (read_nimbus_topology_code == null) {
                throw new NotAliveException("No topology of " + str);
            }
            return Common.system_topology(StormConfig.read_nimbus_topology_conf(this.conf, str), read_nimbus_topology_code);
        } catch (Exception e) {
            LOG.error("Failed to get topology " + str + ",", e);
            throw new TException("Failed to get system_topology");
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public StormTopology getUserTopology(String str) throws NotAliveException, TException {
        try {
            if (StormConfig.read_nimbus_topology_code(this.conf, str) == null) {
                throw new NotAliveException("No topology of " + str);
            }
            return null;
        } catch (Exception e) {
            LOG.error("Failed to get topology " + str + ",", e);
            throw new TException("Failed to get system_topology");
        }
    }

    public void checkTopologyActive(NimbusData nimbusData, String str, boolean z) throws Exception {
        if (isTopologyActive(nimbusData.getStormClusterState(), str) != z) {
            if (!z) {
                throw new AlreadyAliveException(str + " is already active");
            }
            throw new NotAliveException(str + " is not alive");
        }
    }

    public boolean isTopologyActive(StormClusterState stormClusterState, String str) throws Exception {
        boolean z = false;
        if (Cluster.get_topology_id(stormClusterState, str) != null) {
            z = true;
        }
        return z;
    }

    private void setupStormCode(Map<Object, Object> map, String str, String str2, Map<Object, Object> map2, StormTopology stormTopology) throws IOException {
        String masterStormdistRoot = StormConfig.masterStormdistRoot(map, str);
        FileUtils.forceMkdir(new File(masterStormdistRoot));
        FileUtils.cleanDirectory(new File(masterStormdistRoot));
        setupJar(map, str2, masterStormdistRoot);
        FileUtils.writeByteArrayToFile(new File(StormConfig.stormcode_path(masterStormdistRoot)), Utils.serialize(stormTopology));
        FileUtils.writeByteArrayToFile(new File(StormConfig.stormconf_path(masterStormdistRoot)), Utils.serialize(map2));
        StormConfig.write_nimbus_topology_timestamp(this.data.getConf(), str, System.currentTimeMillis());
    }

    private boolean copyLibJars(String str, String str2) throws IOException {
        String stormlib_path = StormConfig.stormlib_path(str);
        String stormlib_path2 = StormConfig.stormlib_path(str2);
        LOG.info("Begin to copy from " + stormlib_path + " to " + stormlib_path2);
        File file = new File(stormlib_path);
        if (!file.exists()) {
            LOG.info("No lib jars " + stormlib_path);
            return false;
        }
        FileUtils.copyDirectory(file, new File(stormlib_path2));
        PathUtils.rmr(stormlib_path);
        LOG.info("Successfully copy libs " + stormlib_path2);
        return true;
    }

    private void setupJar(Map<Object, Object> map, String str, String str2) throws IOException {
        if (StormConfig.local_mode(map)) {
            return;
        }
        boolean copyLibJars = copyLibJars(str, str2);
        String str3 = null;
        Iterator<String> it = PathUtils.read_dir_contents(str).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (next.endsWith(".jar")) {
                str3 = str + "/" + next;
                break;
            }
        }
        if (str3 == null) {
            if (!copyLibJars) {
                throw new IllegalArgumentException("No jar under " + str);
            }
            LOG.info("No submit jar");
        } else {
            File file = new File(str3);
            if (!file.exists()) {
                throw new IllegalArgumentException(str3 + " to copy to " + str2 + " does not exist!");
            }
            FileUtils.copyFile(file, new File(StormConfig.stormjar_path(str2)));
            file.delete();
        }
    }

    public void setupZkTaskInfo(Map<Object, Object> map, String str, StormClusterState stormClusterState) throws Exception {
        Map<Integer, TaskInfo> mkTaskComponentAssignments = mkTaskComponentAssignments(map, str);
        TopologyTaskHbInfo topologyTaskHbInfo = new TopologyTaskHbInfo(str, NimbusUtils.getTopologyMasterId(mkTaskComponentAssignments));
        this.data.getTasksHeartbeat().put(str, topologyTaskHbInfo);
        stormClusterState.topology_heartbeat(str, topologyTaskHbInfo);
        if (mkTaskComponentAssignments == null || mkTaskComponentAssignments.size() == 0) {
            throw new InvalidTopologyException("Failed to generate TaskIDs map");
        }
        stormClusterState.set_task(str, mkTaskComponentAssignments);
    }

    public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> map, String str) throws IOException, InvalidTopologyException {
        Map read_nimbus_topology_conf = StormConfig.read_nimbus_topology_conf(map, str);
        return Common.mkTaskInfo(read_nimbus_topology_conf, Common.system_topology(read_nimbus_topology_conf, StormConfig.read_nimbus_topology_code(map, str)), str);
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void metricMonitor(String str, MonitorOptions monitorOptions) throws TException {
        boolean is_isEnable = monitorOptions.is_isEnable();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            String str2 = Cluster.get_topology_id(stormClusterState, str);
            if (null == str2) {
                throw new NotAliveException("Failed to update metricsMonitor status as " + str + " is not alive");
            }
            stormClusterState.set_storm_monitor(str2, is_isEnable);
        } catch (Exception e) {
            LOG.error("Failed to update metricsMonitor " + str, e);
            throw new TException(e);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public TopologyMetric getTopologyMetrics(String str) throws TException {
        LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " + str);
        long nanoTime = System.nanoTime();
        try {
            TopologyMetric topologyMetric = this.data.getMetricRunnable().getTopologyMetric(str);
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            return topologyMetric;
        } catch (Throwable th) {
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            throw th;
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void uploadTopologyMetrics(String str, TopologyMetric topologyMetric) throws TException {
        LOG.info("Received topology metrics:{}", str);
        TopologyMetricsRunnable.Update update = new TopologyMetricsRunnable.Update();
        update.timestamp = System.currentTimeMillis();
        update.topologyMetrics = topologyMetric;
        update.topologyId = str;
        this.data.getMetricRunnable().pushEvent(update);
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public Map<String, Long> registerMetrics(String str, Set<String> set) throws TException {
        try {
            return this.data.getMetricRunnable().registerMetrics(str, set);
        } catch (Exception e) {
            return null;
        }
    }

    public void uploadNewCredentials(String str, Credentials credentials) {
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public List<MetricInfo> getMetrics(String str, int i) throws TException {
        return this.data.getMetricCache().getMetricData(str, MetaType.parse(i));
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public MetricInfo getNettyMetrics(String str) throws TException {
        List<MetricInfo> metricData = this.data.getMetricCache().getMetricData(str, MetaType.NETTY);
        return (metricData == null || metricData.size() <= 0) ? new MetricInfo() : metricData.get(0);
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public MetricInfo getNettyMetricsByHost(String str, String str2) throws TException {
        MetricInfo metricInfo = new MetricInfo();
        List<MetricInfo> metricData = this.data.getMetricCache().getMetricData(str, MetaType.NETTY);
        if (metricData != null && metricData.size() > 0) {
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : metricData.get(0).get_metrics().entrySet()) {
                String key = entry.getKey();
                Map<Integer, MetricSnapshot> value = entry.getValue();
                if (key.contains(str2)) {
                    metricInfo.put_to_metrics(key, value);
                }
            }
        }
        LOG.info("getNettyMetricsByHost, total size:{}", Integer.valueOf(metricInfo.get_metrics_size()));
        return metricInfo;
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public int getNettyMetricSizeByHost(String str, String str2) throws TException {
        return getNettyMetricsByHost(str, str2).get_metrics_size();
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public MetricInfo getPagingNettyMetrics(String str, String str2, int i) throws TException {
        MetricInfo metricInfo = new MetricInfo();
        int i2 = (i - 1) * 200;
        int i3 = i * 200;
        int i4 = -1;
        List<MetricInfo> metricData = this.data.getMetricCache().getMetricData(str, MetaType.NETTY);
        if (metricData != null && metricData.size() > 0) {
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : metricData.get(0).get_metrics().entrySet()) {
                String key = entry.getKey();
                Map<Integer, MetricSnapshot> value = entry.getValue();
                if (key.contains(str2)) {
                    i4++;
                    if (i4 >= i2 && i4 < i3) {
                        metricInfo.put_to_metrics(key, value);
                    }
                    if (i4 >= i3) {
                        break;
                    }
                }
            }
        }
        LOG.info("getNettyMetricsByHost, total size:{}", Integer.valueOf(metricInfo.get_metrics_size()));
        return metricInfo;
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public MetricInfo getTaskMetrics(String str, String str2) throws TException {
        List<MetricInfo> metrics = getMetrics(str, MetaType.TASK.getT());
        if (metrics == null || metrics.size() <= 0) {
            return MetricUtils.mkMetricInfo();
        }
        MetricInfo metricInfo = metrics.get(0);
        Iterator<String> it = metricInfo.get_metrics().keySet().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split(MetricUtils.DELIM);
            if (split.length < 7 || !split[2].equals(str2)) {
                it.remove();
            }
        }
        LOG.info("taskMetric, total size:{}", Integer.valueOf(metricInfo.get_metrics_size()));
        return metricInfo;
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public List<MetricInfo> getTaskAndStreamMetrics(String str, int i) throws TException {
        MetricInfo mkMetricInfo;
        MetricInfo mkMetricInfo2;
        List<MetricInfo> metrics = getMetrics(str, MetaType.TASK.getT());
        List<MetricInfo> metrics2 = getMetrics(str, MetaType.STREAM.getT());
        String str2 = i + "";
        if (metrics == null || metrics.size() <= 0) {
            mkMetricInfo = MetricUtils.mkMetricInfo();
        } else {
            mkMetricInfo = metrics.get(0);
            Iterator<String> it = mkMetricInfo.get_metrics().keySet().iterator();
            while (it.hasNext()) {
                String[] split = it.next().split(MetricUtils.DELIM);
                if (split.length < 7 || !split[3].equals(str2)) {
                    it.remove();
                }
            }
        }
        if (metrics2 == null || metrics2.size() <= 0) {
            mkMetricInfo2 = MetricUtils.mkMetricInfo();
        } else {
            mkMetricInfo2 = metrics2.get(0);
            Iterator<String> it2 = mkMetricInfo2.get_metrics().keySet().iterator();
            while (it2.hasNext()) {
                String[] split2 = it2.next().split(MetricUtils.DELIM);
                if (split2.length < 7 || !split2[3].equals(str2)) {
                    it2.remove();
                }
            }
        }
        return Lists.newArrayList(new MetricInfo[]{mkMetricInfo, mkMetricInfo2});
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public List<MetricInfo> getSummarizedTopologyMetrics(String str) throws TException {
        return this.data.getMetricCache().getMetricData(str, MetaType.TOPOLOGY);
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void updateTopology(String str, String str2, String str3) throws NotAliveException, InvalidTopologyException, TException {
        try {
            checkTopologyActive(this.data, str, true);
            String str4 = Cluster.get_topology_id(this.data.getStormClusterState(), str);
            if (str4 == null) {
                throw new NotAliveException(str);
            }
            if (str2 != null) {
                String masterStormdistRoot = StormConfig.masterStormdistRoot(this.conf, str4);
                String substring = str2.substring(0, str2.lastIndexOf("/"));
                String stormjar_path = StormConfig.stormjar_path(substring);
                File file = new File(str2);
                if (!file.exists()) {
                    throw new FileNotFoundException("Source '" + str2 + "' does not exist");
                }
                file.renameTo(new File(stormjar_path));
                File file2 = new File(substring);
                File file3 = new File(masterStormdistRoot);
                try {
                    FileUtils.moveDirectory(file2, file3);
                } catch (FileExistsException e) {
                    FileUtils.copyDirectory(file2, file3);
                    FileUtils.deleteQuietly(file2);
                }
                StormConfig.write_nimbus_topology_timestamp(this.data.getConf(), str4, System.currentTimeMillis());
                LOG.info("update jar of " + str + " successfully");
            }
            Map read_nimbus_topology_conf = StormConfig.read_nimbus_topology_conf(this.data.getConf(), str4);
            Map map = (Map) JStormUtils.from_json(str3);
            read_nimbus_topology_conf.putAll(map);
            StormConfig.write_nimbus_topology_conf(this.data.getConf(), str4, read_nimbus_topology_conf);
            NimbusUtils.transitionName(this.data, str, true, StatusType.update_topology, map);
            LOG.info("update topology " + str + " successfully");
            notifyTopologyActionListener(str, "updateTopology");
        } catch (NotAliveException e2) {
            String str5 = "Error, no this topology " + str;
            LOG.error(str5, e2);
            throw new NotAliveException(str5);
        } catch (Exception e3) {
            String str6 = "Failed to update topology " + str;
            LOG.error(str6, e3);
            throw new TException(str6);
        }
    }

    @Override // backtype.storm.generated.Nimbus.Iface
    public void updateTaskHeartbeat(TopologyTaskHbInfo topologyTaskHbInfo) throws TException {
        String str = topologyTaskHbInfo.get_topologyId();
        Integer valueOf = Integer.valueOf(topologyTaskHbInfo.get_topologyMasterId());
        TopologyTaskHbInfo topologyTaskHbInfo2 = this.data.getTasksHeartbeat().get(str);
        if (topologyTaskHbInfo2 == null) {
            topologyTaskHbInfo2 = new TopologyTaskHbInfo(str, valueOf.intValue());
            this.data.getTasksHeartbeat().put(str, topologyTaskHbInfo2);
        }
        Map<Integer, TaskHeartbeat> map = topologyTaskHbInfo2.get_taskHbs();
        if (map == null) {
            map = new ConcurrentHashMap();
            topologyTaskHbInfo2.set_taskHbs(map);
        }
        Map<Integer, TaskHeartbeat> map2 = topologyTaskHbInfo.get_taskHbs();
        if (map2 != null) {
            for (Map.Entry<Integer, TaskHeartbeat> entry : map2.entrySet()) {
                map.put(entry.getKey(), entry.getValue());
            }
        }
    }

    private void notifyTopologyActionListener(String str, String str2) {
        ITopologyActionNotifierPlugin nimbusNotify = this.data.getNimbusNotify();
        if (nimbusNotify != null) {
            nimbusNotify.notify(str, str2);
        }
    }
}
