package com.alibaba.jstorm.daemon.worker.timer;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.UptimeComputer;
import com.alibaba.jstorm.task.error.ErrorConstants;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.class */
public class TaskHeartbeatTrigger extends TimerTrigger {
    private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatTrigger.class);
    private int taskId;
    private String componentId;
    private TopologyContext sysTopologyCtx;
    private BoltCollector boltCollector = null;
    private SpoutCollector spoutCollector = null;
    private long executeThreadHbTime;
    private int taskHbTimeout;
    private ITaskReportErr reportError;
    private IntervalCheck intervalCheck;
    private UptimeComputer uptime;
    protected volatile TaskStatus executorStatus;

    public TaskHeartbeatTrigger(Map map, String str, DisruptorQueue disruptorQueue, int i, String str2, TopologyContext topologyContext, ITaskReportErr iTaskReportErr, TaskStatus taskStatus) {
        this.name = str;
        this.queue = disruptorQueue;
        this.opCode = 3;
        this.taskId = i;
        this.componentId = str2;
        this.sysTopologyCtx = topologyContext;
        this.frequence = JStormUtils.parseInt(map.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10).intValue();
        this.firstTime = 0;
        this.executeThreadHbTime = TimeUtils.current_time_secs();
        this.taskHbTimeout = JStormUtils.parseInt(map.get(Config.NIMBUS_TASK_TIMEOUT_SECS), 180).intValue();
        this.intervalCheck = new IntervalCheck();
        this.intervalCheck.setInterval(this.taskHbTimeout);
        this.intervalCheck.start();
        this.reportError = iTaskReportErr;
        this.uptime = new UptimeComputer();
        this.executorStatus = taskStatus;
    }

    @Override // com.alibaba.jstorm.daemon.worker.timer.TimerTrigger
    public void updateObject() {
        this.object = Integer.valueOf(this.taskId);
    }

    @Override // com.alibaba.jstorm.daemon.worker.timer.TimerTrigger, java.lang.Runnable
    public void run() {
        try {
            updateObject();
            if (this.object == null) {
                LOG.info("Timer " + this.name + " 's object is null ");
                return;
            }
            if (this.intervalCheck.check()) {
                checkExecuteThreadHb();
            }
            sendHbMsg();
            this.queue.publish(new TimerTrigger.TimerEvent(this.opCode, this.object), false);
            LOG.debug("Offer task HB event to controlQueue, taskId=" + this.taskId);
            LOG.debug(" Trigger timer event to " + this.name);
        } catch (Exception e) {
            LOG.warn("Failed to publish timer event to " + this.name, e);
        }
    }

    public void setSpoutOutputCollector(SpoutCollector spoutCollector) {
        this.spoutCollector = spoutCollector;
    }

    public void setBoltOutputCollector(BoltCollector boltCollector) {
        this.boltCollector = boltCollector;
    }

    public void setExeThreadHbTime(long j) {
        this.executeThreadHbTime = j;
    }

    public void sendHbMsg() {
        if (this.componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
            this.queue.publish(new TupleImplExt(this.sysTopologyCtx, new Values(Integer.valueOf(this.uptime.uptime()), Byte.valueOf(this.executorStatus.getStatus())), this.taskId, Common.TOPOLOGY_MASTER_HB_STREAM_ID));
            return;
        }
        List<Object> mk_list = JStormUtils.mk_list(Integer.valueOf(this.uptime.uptime()), Byte.valueOf(this.executorStatus.getStatus()));
        if (this.spoutCollector != null) {
            this.spoutCollector.emitCtrl(Common.TOPOLOGY_MASTER_HB_STREAM_ID, mk_list, null);
        } else if (this.boltCollector != null) {
            this.boltCollector.emitCtrl(Common.TOPOLOGY_MASTER_HB_STREAM_ID, null, mk_list);
        } else {
            LOG.warn("Failed to send hearbeat msg. OutputCollector has not been initialized!");
        }
    }

    public void updateExecutorStatus(byte b) {
        LOG.info("due to task-{} status changed: {}, so we notify the TopologyMaster", Integer.valueOf(this.taskId), Byte.valueOf(b));
        this.executorStatus.setStatus(b);
        sendHbMsg();
    }

    private void checkExecuteThreadHb() {
        if (TimeUtils.current_time_secs() - this.executeThreadHbTime > this.taskHbTimeout) {
            this.reportError.report("No response from Task-" + this.taskId + ", last report time(sec) is " + this.executeThreadHbTime, ErrorConstants.WARN, 120, 600);
        }
    }
}
