package com.alibaba.jstorm.task.heartbeat;

import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.class */
public class TaskHeartbeatUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatUpdater.class);
    private int MAX_NUM_TASK_HB_SEND;
    private String topologyId;
    private int taskId;
    private Map conf;
    private NimbusClient client;
    private Map<Integer, TaskHeartbeat> taskHbMap;
    private TopologyTaskHbInfo taskHbs;
    private StormClusterState zkCluster;

    public TaskHeartbeatUpdater(Map map, String str, int i, StormClusterState stormClusterState) {
        this.topologyId = str;
        this.taskId = i;
        this.conf = map;
        this.client = NimbusClient.getConfiguredClient(map);
        this.zkCluster = stormClusterState;
        try {
            TopologyTaskHbInfo topologyTaskHbInfo = stormClusterState.topology_heartbeat(str);
            if (topologyTaskHbInfo != null) {
                LOG.info("Found task heartbeat info left in zk for " + str + ": " + topologyTaskHbInfo.toString());
                this.taskHbs = topologyTaskHbInfo;
                this.taskHbMap = topologyTaskHbInfo.get_taskHbs();
                if (this.taskHbMap == null) {
                    this.taskHbMap = new ConcurrentHashMap();
                    this.taskHbs.set_taskHbs(this.taskHbMap);
                }
                this.taskHbs.set_topologyId(str);
                this.taskHbs.set_topologyMasterId(this.taskId);
            } else {
                LOG.info("There is not any previous task heartbeat info left in zk for " + str);
                this.taskHbMap = new ConcurrentHashMap();
                this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId);
                this.taskHbs.set_taskHbs(this.taskHbMap);
            }
        } catch (Exception e) {
            LOG.warn("Failed to get topology heartbeat from zk", e);
        }
        this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(map);
    }

    public void process(Tuple tuple) {
        int sourceTask = tuple.getSourceTask();
        int intValue = ((Integer) tuple.getValue(0)).intValue();
        TaskHeartbeat taskHeartbeat = this.taskHbMap.get(Integer.valueOf(sourceTask));
        if (taskHeartbeat == null) {
            this.taskHbMap.put(Integer.valueOf(sourceTask), new TaskHeartbeat(TimeUtils.current_time_secs(), intValue));
        } else {
            taskHeartbeat.set_time(TimeUtils.current_time_secs());
            taskHeartbeat.set_uptime(intValue);
        }
        if (sourceTask == this.taskId) {
            TopologyTaskHbInfo topologyTaskHbInfo = new TopologyTaskHbInfo(this.topologyId, this.taskId);
            Map<Integer, TaskHeartbeat> concurrentHashMap = new ConcurrentHashMap<>();
            topologyTaskHbInfo.set_taskHbs(concurrentHashMap);
            int i = 0;
            for (Map.Entry<Integer, TaskHeartbeat> entry : this.taskHbMap.entrySet()) {
                concurrentHashMap.put(entry.getKey(), entry.getValue());
                i++;
                if (i >= this.MAX_NUM_TASK_HB_SEND) {
                    setTaskHeatbeat(topologyTaskHbInfo);
                    concurrentHashMap.clear();
                    i = 0;
                }
            }
            if (concurrentHashMap.size() > 0) {
                setTaskHeatbeat(topologyTaskHbInfo);
            }
        }
    }

    private void setTaskHeatbeat(TopologyTaskHbInfo topologyTaskHbInfo) {
        if (topologyTaskHbInfo == null) {
            return;
        }
        try {
            if (topologyTaskHbInfo.get_taskHbs() == null) {
                return;
            }
            this.client.getClient().updateTaskHeartbeat(topologyTaskHbInfo);
            String str = "";
            for (Map.Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) {
                str = str + " " + entry.getKey() + "-" + entry.getValue().get_time();
            }
            LOG.info("Update task heartbeat:" + str);
        } catch (TException e) {
            LOG.error("Failed to update task heartbeat info", e);
            if (this.client != null) {
                this.client.close();
                this.client = NimbusClient.getConfiguredClient(this.conf);
            }
        }
    }
}
