package com.alibaba.jstorm.task;

import backtype.storm.hooks.ITaskHook;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.topology.IDynamicComponent;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.ShutdownableDameon;
import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.thrift.protocol.TMultiplexedProtocol;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskShutdownDameon.class */
public class TaskShutdownDameon implements ShutdownableDameon {
    private static Logger LOG = LoggerFactory.getLogger(TaskShutdownDameon.class);
    private Task task;
    private TaskStatus taskStatus;
    private String topology_id;
    private Integer task_id;
    private List<AsyncLoopThread> all_threads;
    private StormClusterState zkCluster;
    private Object task_obj;
    private AtomicBoolean isClosing = new AtomicBoolean(false);
    private TaskHeartbeatTrigger taskHeartbeatTrigger;

    public TaskShutdownDameon(TaskStatus taskStatus, String str, Integer num, List<AsyncLoopThread> list, StormClusterState stormClusterState, Object obj, Task task, TaskHeartbeatTrigger taskHeartbeatTrigger) {
        this.taskStatus = taskStatus;
        this.topology_id = str;
        this.task_id = num;
        this.all_threads = list;
        this.zkCluster = stormClusterState;
        this.task_obj = obj;
        this.task = task;
        this.taskHeartbeatTrigger = taskHeartbeatTrigger;
    }

    @Override // backtype.storm.daemon.Shutdownable
    public void shutdown() {
        if (this.isClosing.compareAndSet(false, true)) {
            LOG.info("Begin to shut down task " + this.topology_id + TMultiplexedProtocol.SEPARATOR + this.task_id);
            Iterator<ITaskHook> it = this.task.getUserContext().getHooks().iterator();
            while (it.hasNext()) {
                it.next().cleanup();
            }
            closeComponent(this.task_obj);
            this.taskHeartbeatTrigger.updateExecutorStatus((byte) 2);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            this.taskStatus.setStatus((byte) 2);
            for (AsyncLoopThread asyncLoopThread : this.all_threads) {
                LOG.info("Begin to shutdown " + asyncLoopThread.getThread().getName());
                asyncLoopThread.cleanup();
                JStormUtils.sleepMs(10L);
                asyncLoopThread.interrupt();
                LOG.info("Successfully shutdown " + asyncLoopThread.getThread().getName());
            }
            try {
                this.zkCluster.disconnect();
            } catch (Exception e2) {
                LOG.error("Failed to disconnect zk for task-" + this.task_id);
            }
            LOG.info("Successfully shutdown task " + this.topology_id + TMultiplexedProtocol.SEPARATOR + this.task_id);
        }
    }

    public void join() throws InterruptedException {
        Iterator<AsyncLoopThread> it = this.all_threads.iterator();
        while (it.hasNext()) {
            it.next().join();
        }
    }

    private void closeComponent(Object obj) {
        if (obj instanceof IBolt) {
            ((IBolt) obj).cleanup();
        }
        if (obj instanceof ISpout) {
            ((ISpout) obj).close();
        }
    }

    @Override // com.alibaba.jstorm.cluster.DaemonCommon
    public boolean waiting() {
        return this.taskStatus.isRun();
    }

    public void deactive() {
        if (!(this.task_obj instanceof ISpout)) {
            this.taskStatus.setStatus((byte) 1);
            return;
        }
        this.taskStatus.setStatus((byte) 1);
        WorkerClassLoader.switchThreadContext();
        try {
            ((ISpout) this.task_obj).deactivate();
        } finally {
            WorkerClassLoader.restoreThreadContext();
        }
    }

    public void active() {
        if (!(this.task_obj instanceof ISpout)) {
            this.taskStatus.setStatus((byte) 0);
            return;
        }
        this.taskStatus.setStatus((byte) 0);
        WorkerClassLoader.switchThreadContext();
        try {
            ((ISpout) this.task_obj).activate();
        } finally {
            WorkerClassLoader.restoreThreadContext();
        }
    }

    public void update(Map map) {
        if (this.task_obj instanceof IDynamicComponent) {
            ((IDynamicComponent) this.task_obj).update(map);
        } else if (this.task.getBaseExecutors() != null) {
            this.task.getBaseExecutors().update(map);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        shutdown();
    }

    public int getTaskId() {
        return this.task_id.intValue();
    }

    public Task getTask() {
        return this.task;
    }
}
