/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.heartbeat;

import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskHeartbeatUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatUpdater.class);
    private int MAX_NUM_TASK_HB_SEND;
    private String topologyId;
    private int taskId;
    private Map conf;
    private NimbusClient client;
    private Map<Integer, TaskHeartbeat> taskHbMap;
    private TopologyTaskHbInfo taskHbs;
    private StormClusterState zkCluster;

    public TaskHeartbeatUpdater(Map conf, String topologyId, int taskId, StormClusterState zkCluster) {
        this.topologyId = topologyId;
        this.taskId = taskId;
        this.conf = conf;
        this.client = NimbusClient.getConfiguredClient(conf);
        this.zkCluster = zkCluster;
        try {
            TopologyTaskHbInfo taskHbInfo = zkCluster.topology_heartbeat(topologyId);
            if (taskHbInfo != null) {
                LOG.info("Found task heartbeat info left in zk for " + topologyId + ": " + taskHbInfo.toString());
                this.taskHbs = taskHbInfo;
                this.taskHbMap = taskHbInfo.get_taskHbs();
                if (this.taskHbMap == null) {
                    this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
                    this.taskHbs.set_taskHbs(this.taskHbMap);
                }
                this.taskHbs.set_topologyId(topologyId);
                this.taskHbs.set_topologyMasterId(this.taskId);
            } else {
                LOG.info("There is not any previous task heartbeat info left in zk for " + topologyId);
                this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
                this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId);
                this.taskHbs.set_taskHbs(this.taskHbMap);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to get topology heartbeat from zk", (Throwable)e);
        }
        this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(conf);
    }

    public void process(Tuple input) {
        int sourceTask = input.getSourceTask();
        int uptime = (Integer)input.getValue(0);
        TaskHeartbeat taskHb = this.taskHbMap.get(sourceTask);
        if (taskHb == null) {
            taskHb = new TaskHeartbeat(TimeUtils.current_time_secs(), uptime);
            this.taskHbMap.put(sourceTask, taskHb);
        } else {
            taskHb.set_time(TimeUtils.current_time_secs());
            taskHb.set_uptime(uptime);
        }
        if (sourceTask == this.taskId) {
            TopologyTaskHbInfo tmpTaskHbInfo = new TopologyTaskHbInfo(this.topologyId, this.taskId);
            ConcurrentHashMap<Integer, TaskHeartbeat> tmpTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
            tmpTaskHbInfo.set_taskHbs(tmpTaskHbMap);
            int sendCount = 0;
            for (Map.Entry<Integer, TaskHeartbeat> entry : this.taskHbMap.entrySet()) {
                tmpTaskHbMap.put(entry.getKey(), entry.getValue());
                if (++sendCount < this.MAX_NUM_TASK_HB_SEND) continue;
                this.setTaskHeatbeat(tmpTaskHbInfo);
                tmpTaskHbMap.clear();
                sendCount = 0;
            }
            if (tmpTaskHbMap.size() > 0) {
                this.setTaskHeatbeat(tmpTaskHbInfo);
            }
        }
    }

    private void setTaskHeatbeat(TopologyTaskHbInfo topologyTaskHbInfo) {
        block5: {
            try {
                if (topologyTaskHbInfo == null) {
                    return;
                }
                if (topologyTaskHbInfo.get_taskHbs() == null) {
                    return;
                }
                this.client.getClient().updateTaskHeartbeat(topologyTaskHbInfo);
                String info = "";
                for (Map.Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) {
                    info = info + " " + entry.getKey() + "-" + entry.getValue().get_time();
                }
                LOG.info("Update task heartbeat:" + info);
            }
            catch (TException e) {
                LOG.error("Failed to update task heartbeat info", (Throwable)e);
                if (this.client == null) break block5;
                this.client.close();
                this.client = NimbusClient.getConfiguredClient(this.conf);
            }
        }
    }
}

