package com.alibaba.jstorm.task.master.heartbeat;

import backtype.storm.generated.Bolt;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.NimbusClientWrapper;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.error.ErrorConstants;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.master.TMHandler;
import com.alibaba.jstorm.task.master.TopologyMasterContext;
import com.alibaba.jstorm.task.master.ctrlevent.TopoMasterCtrlEvent;
import com.alibaba.jstorm.task.master.ctrlevent.UpdateConfigEvent;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:com/alibaba/jstorm/task/master/heartbeat/TaskHeartbeatUpdater.class */
public class TaskHeartbeatUpdater implements TMHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatUpdater.class);
    private int MAX_NUM_TASK_HB_SEND;
    private String topologyId;
    private int taskId;
    private Map stormConf;
    private NimbusClientWrapper client;
    private AtomicReference<ConcurrentHashMap<Integer, TaskHeartbeat>> taskHbMap;
    private TopologyTaskHbInfo taskHbs;
    private StormClusterState zkCluster;
    private TopologyContext context;
    private ConcurrentHashMap<Integer, TaskStatus> boltsExecutorStatusMap;
    private ConcurrentHashMap<Integer, TaskStatus> spoutsExecutorStatusMap;
    private OutputCollector collector;
    private final Object _lock = new Object();

    @Override // com.alibaba.jstorm.task.master.TMHandler
    public void process(Object obj) throws Exception {
        synchronized (this._lock) {
            if (obj instanceof UpdateConfigEvent) {
                update(((UpdateConfigEvent) obj).getConf());
                return;
            }
            Tuple tuple = (Tuple) obj;
            int sourceTask = tuple.getSourceTask();
            int intValue = ((Integer) tuple.getValue(0)).intValue();
            TaskStatus taskStatus = new TaskStatus();
            if (tuple.getValues().size() < 2) {
                taskStatus.setStatus((byte) 0);
            } else {
                taskStatus.setStatus(((Byte) tuple.getValue(1)).byteValue());
            }
            boolean z = false;
            if (this.spoutsExecutorStatusMap.containsKey(Integer.valueOf(sourceTask))) {
                this.spoutsExecutorStatusMap.put(Integer.valueOf(sourceTask), taskStatus);
            } else if (this.boltsExecutorStatusMap.containsKey(Integer.valueOf(sourceTask))) {
                this.boltsExecutorStatusMap.put(Integer.valueOf(sourceTask), taskStatus);
            } else if (sourceTask != this.taskId) {
                LOG.warn("received invalid task heartbeat {}", tuple);
            }
            if (taskStatus.getStatus() == 3 && this.spoutsExecutorStatusMap.get(Integer.valueOf(sourceTask)) != null) {
                boolean z2 = false;
                for (TaskStatus taskStatus2 : this.boltsExecutorStatusMap.values()) {
                    if (taskStatus2.getStatus() == 3 || taskStatus2.getStatus() == 2) {
                        z2 = true;
                        break;
                    }
                }
                if (!z2) {
                    z = true;
                }
            }
            if (this.client == null) {
                this.client = new NimbusClientWrapper();
                this.client.init(this.stormConf);
            }
            if (taskStatus.getStatus() != 3 && intValue > 0) {
                TaskHeartbeat taskHeartbeat = this.taskHbMap.get().get(Integer.valueOf(sourceTask));
                if (taskHeartbeat == null) {
                    taskHeartbeat = new TaskHeartbeat(TimeUtils.current_time_secs(), intValue);
                    TaskHeartbeat putIfAbsent = this.taskHbMap.get().putIfAbsent(Integer.valueOf(sourceTask), taskHeartbeat);
                    if (putIfAbsent != null) {
                        taskHeartbeat = putIfAbsent;
                    }
                }
                taskHeartbeat.set_time(TimeUtils.current_time_secs());
                taskHeartbeat.set_uptime(intValue);
            } else if (z) {
                ((BoltCollector) this.collector.getDelegate()).emitDirectCtrl(sourceTask, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.topologyFinishInit)));
                LOG.info("all bolts' task finish init operation, so tm will notify the spout task-{}", Integer.valueOf(sourceTask));
            }
            if (sourceTask == this.taskId) {
                uploadHB();
            }
        }
    }

    public void uploadHB() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        int i = 0;
        for (Map.Entry<Integer, TaskHeartbeat> entry : this.taskHbMap.getAndSet(new ConcurrentHashMap<>()).entrySet()) {
            concurrentHashMap.put(entry.getKey(), entry.getValue());
            i++;
            if (i >= this.MAX_NUM_TASK_HB_SEND) {
                uploadTaskHeatbeat(concurrentHashMap);
                concurrentHashMap = new ConcurrentHashMap();
                i = 0;
            }
        }
        if (concurrentHashMap.size() > 0) {
            uploadTaskHeatbeat(concurrentHashMap);
        }
    }

    public void initTaskHb() {
        this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId);
        ConcurrentHashMap<Integer, TaskHeartbeat> concurrentHashMap = new ConcurrentHashMap<>();
        try {
            TopologyTaskHbInfo topologyTaskHbInfo = this.zkCluster.topology_heartbeat(this.topologyId);
            if (topologyTaskHbInfo != null) {
                LOG.info("Found task heartbeat info left in zk for " + this.topologyId + ": " + topologyTaskHbInfo.toString());
                if (topologyTaskHbInfo.get_taskHbs() != null) {
                    concurrentHashMap.putAll(topologyTaskHbInfo.get_taskHbs());
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to get topology heartbeat from zk", e);
        }
        this.taskHbMap.set(concurrentHashMap);
        this.taskHbs.set_taskHbs(concurrentHashMap);
    }

    @Override // com.alibaba.jstorm.task.master.TMHandler
    public void init(TopologyMasterContext topologyMasterContext) {
        this.topologyId = topologyMasterContext.getTopologyId();
        this.taskId = topologyMasterContext.getTaskId();
        this.context = topologyMasterContext.getContext();
        this.collector = topologyMasterContext.getCollector();
        this.stormConf = topologyMasterContext.getConf();
        this.zkCluster = topologyMasterContext.getZkCluster();
        this.taskHbMap = new AtomicReference<>();
        initExecutorsStatus();
        initTaskHb();
        this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(this.stormConf);
    }

    private void initExecutorsStatus() {
        this.boltsExecutorStatusMap = new ConcurrentHashMap<>();
        this.spoutsExecutorStatusMap = new ConcurrentHashMap<>();
        try {
            StormTopology system_topology = Common.system_topology(this.stormConf, this.context.getRawTopology());
            Map<String, Bolt> map = system_topology.get_bolts();
            if (map != null) {
                Iterator<Integer> it = this.context.getComponentsTasks(map.keySet()).iterator();
                while (it.hasNext()) {
                    this.boltsExecutorStatusMap.put(it.next(), new TaskStatus());
                }
                this.boltsExecutorStatusMap.remove(Integer.valueOf(this.taskId));
            }
            Map<String, SpoutSpec> map2 = system_topology.get_spouts();
            if (map != null) {
                Iterator<Integer> it2 = this.context.getComponentsTasks(map2.keySet()).iterator();
                while (it2.hasNext()) {
                    this.spoutsExecutorStatusMap.put(it2.next(), new TaskStatus());
                }
            }
        } catch (InvalidTopologyException e) {
            LOG.error("Failed to build system topology", e);
            throw new RuntimeException(e);
        }
    }

    private void uploadTaskHeatbeat(Map<Integer, TaskHeartbeat> map) throws Exception {
        if (map != null) {
            try {
                if (map.size() == 0) {
                    return;
                }
                TopologyTaskHbInfo topologyTaskHbInfo = new TopologyTaskHbInfo(this.topologyId, this.taskId);
                topologyTaskHbInfo.set_taskHbs(map);
                this.client.getClient().updateTaskHeartbeat(topologyTaskHbInfo);
                String str = "";
                for (Map.Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) {
                    str = str + " " + entry.getKey() + HelpFormatter.DEFAULT_OPT_PREFIX + entry.getValue().get_time();
                }
                LOG.info("Update task heartbeat:" + str);
            } catch (Exception e) {
                LOG.error("Failed to update task heartbeat info ", e);
                if (this.client != null) {
                    this.client.cleanup();
                    this.client = null;
                }
                this.zkCluster.report_task_error(this.context.getTopologyId(), this.context.getThisTaskId(), "Failed to update task heartbeat info", ErrorConstants.WARN, ErrorConstants.CODE_USER);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:29:0x0052, code lost:
    
        com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater.LOG.warn("Timeout when waiting others' tasks shutdown");
     */
    @Override // com.alibaba.jstorm.task.master.TMHandler
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void cleanup() {
        /*
            r5 = this;
            r0 = r5
            com.alibaba.jstorm.cluster.StormClusterState r0 = r0.zkCluster     // Catch: java.lang.Exception -> Lc1
            r1 = r5
            java.lang.String r1 = r1.topologyId     // Catch: java.lang.Exception -> Lc1
            r2 = 0
            com.alibaba.jstorm.cluster.StormBase r0 = r0.storm_base(r1, r2)     // Catch: java.lang.Exception -> Lc1
            r6 = r0
            r0 = r6
            if (r0 == 0) goto L27
            r0 = r6
            com.alibaba.jstorm.cluster.StormStatus r0 = r0.getStatus()     // Catch: java.lang.Exception -> Lc1
            com.alibaba.jstorm.daemon.nimbus.StatusType r0 = r0.getStatusType()     // Catch: java.lang.Exception -> Lc1
            com.alibaba.jstorm.daemon.nimbus.StatusType r1 = com.alibaba.jstorm.daemon.nimbus.StatusType.killed     // Catch: java.lang.Exception -> Lc1
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Exception -> Lc1
            if (r0 == 0) goto L27
            r0 = 1
            goto L28
        L27:
            r0 = 0
        L28:
            r7 = r0
            r0 = r6
            if (r0 == 0) goto L31
            r0 = r7
            if (r0 == 0) goto Lbe
        L31:
            r0 = 1000(0x3e8, float:1.401E-42)
            r1 = r5
            java.util.Map r1 = r1.stormConf     // Catch: java.lang.Exception -> Lc1
            int r1 = com.alibaba.jstorm.client.ConfigExtension.getTaskCleanupTimeoutSec(r1)     // Catch: java.lang.Exception -> Lc1
            int r0 = r0 * r1
            long r0 = (long) r0     // Catch: java.lang.Exception -> Lc1
            r8 = r0
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Exception -> Lc1
            r10 = r0
        L43:
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.InterruptedException -> L73 java.lang.Exception -> Lc1
            r1 = r10
            long r0 = r0 - r1
            r12 = r0
            r0 = r12
            r1 = r8
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L60
            org.slf4j.Logger r0 = com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater.LOG     // Catch: java.lang.InterruptedException -> L73 java.lang.Exception -> Lc1
            java.lang.String r1 = "Timeout when waiting others' tasks shutdown"
            r0.warn(r1)     // Catch: java.lang.InterruptedException -> L73 java.lang.Exception -> Lc1
            goto L78
        L60:
            r0 = r5
            boolean r0 = r0.checkAllTasksShutdown()     // Catch: java.lang.InterruptedException -> L73 java.lang.Exception -> Lc1
            if (r0 == 0) goto L6a
            goto L78
        L6a:
            r0 = 100
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L73 java.lang.Exception -> Lc1
            goto L43
        L73:
            r14 = move-exception
            goto L78
        L78:
            org.slf4j.Logger r0 = com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater.LOG     // Catch: java.lang.Exception -> Lc1
            java.lang.String r1 = "notify nimbus that all tasks's status is: {} {} "
            r2 = r5
            java.util.concurrent.ConcurrentHashMap<java.lang.Integer, com.alibaba.jstorm.task.TaskStatus> r2 = r2.spoutsExecutorStatusMap     // Catch: java.lang.Exception -> Lc1
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Exception -> Lc1
            r3 = r5
            java.util.concurrent.ConcurrentHashMap<java.lang.Integer, com.alibaba.jstorm.task.TaskStatus> r3 = r3.boltsExecutorStatusMap     // Catch: java.lang.Exception -> Lc1
            java.lang.String r3 = r3.toString()     // Catch: java.lang.Exception -> Lc1
            r0.info(r1, r2, r3)     // Catch: java.lang.Exception -> Lc1
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r0 = r0.client     // Catch: java.lang.Exception -> Lc1
            if (r0 != 0) goto Lae
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r1 = new backtype.storm.utils.NimbusClientWrapper     // Catch: java.lang.Exception -> Lc1
            r2 = r1
            r2.<init>()     // Catch: java.lang.Exception -> Lc1
            r0.client = r1     // Catch: java.lang.Exception -> Lc1
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r0 = r0.client     // Catch: java.lang.Exception -> Lc1
            r1 = r5
            java.util.Map r1 = r1.stormConf     // Catch: java.lang.Exception -> Lc1
            r0.init(r1)     // Catch: java.lang.Exception -> Lc1
        Lae:
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r0 = r0.client     // Catch: java.lang.Exception -> Lc1
            backtype.storm.generated.Nimbus$Iface r0 = r0.getClient()     // Catch: java.lang.Exception -> Lc1
            r1 = r5
            java.lang.String r1 = r1.topologyId     // Catch: java.lang.Exception -> Lc1
            r0.notifyThisTopologyTasksIsDead(r1)     // Catch: java.lang.Exception -> Lc1
        Lbe:
            goto Le1
        Lc1:
            r6 = move-exception
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r0 = r0.client
            if (r0 == 0) goto Ld5
            r0 = r5
            backtype.storm.utils.NimbusClientWrapper r0 = r0.client
            r0.cleanup()
            r0 = r5
            r1 = 0
            r0.client = r1
        Ld5:
            org.slf4j.Logger r0 = com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater.LOG
            java.lang.String r1 = "Failed to get topology stormbase from zk"
            r2 = r6
            r0.warn(r1, r2)
        Le1:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater.cleanup():void");
    }

    private boolean checkAllTasksShutdown() {
        boolean z = true;
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.spoutsExecutorStatusMap.values());
        arrayList.addAll(this.boltsExecutorStatusMap.values());
        Iterator it = arrayList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (((TaskStatus) it.next()).getStatus() == 0) {
                z = false;
                break;
            }
        }
        return z;
    }

    public void update(Map map) {
        LOG.info("Topology master received new conf:" + map);
        synchronized (this._lock) {
            initExecutorsStatus();
        }
    }
}
