/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.hooks.ITaskHook;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IDynamicComponent;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.ShutdownableDameon;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskShutdownDameon
implements ShutdownableDameon {
    private static Logger LOG = LoggerFactory.getLogger(TaskShutdownDameon.class);
    public static final byte QUIT_MSG = -1;
    private Task task;
    private TaskStatus taskStatus;
    private String topology_id;
    private Integer task_id;
    private List<AsyncLoopThread> all_threads;
    private StormClusterState zkCluster;
    private Object task_obj;
    private boolean isClosed = false;

    public TaskShutdownDameon(TaskStatus taskStatus, String topology_id, Integer task_id, List<AsyncLoopThread> all_threads, StormClusterState zkCluster, Object task_obj, Task task) {
        this.taskStatus = taskStatus;
        this.topology_id = topology_id;
        this.task_id = task_id;
        this.all_threads = all_threads;
        this.zkCluster = zkCluster;
        this.task_obj = task_obj;
        this.task = task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        TaskShutdownDameon taskShutdownDameon = this;
        synchronized (taskShutdownDameon) {
            if (this.isClosed) {
                return;
            }
            this.isClosed = true;
        }
        LOG.info("Begin to shut down task " + this.topology_id + ":" + this.task_id);
        this.taskStatus.setStatus((byte)2);
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        for (AsyncLoopThread thr : this.all_threads) {
            LOG.info("Begin to shutdown " + thr.getThread().getName());
            thr.cleanup();
            JStormUtils.sleepMs(10L);
            thr.interrupt();
            LOG.info("Successfully shutdown " + thr.getThread().getName());
        }
        TopologyContext userContext = this.task.getUserContext();
        for (ITaskHook iTaskHook : userContext.getHooks()) {
            iTaskHook.cleanup();
        }
        this.closeComponent(this.task_obj);
        try {
            this.zkCluster.disconnect();
        }
        catch (Exception e) {
            LOG.error("Failed to disconnect zk for task-" + this.task_id);
        }
        LOG.info("Successfully shutdown task " + this.topology_id + ":" + this.task_id);
    }

    public void join() throws InterruptedException {
        for (AsyncLoopThread t : this.all_threads) {
            t.join();
        }
    }

    private void closeComponent(Object _task_obj) {
        if (_task_obj instanceof IBolt) {
            ((IBolt)_task_obj).cleanup();
        }
        if (_task_obj instanceof ISpout) {
            ((ISpout)_task_obj).close();
        }
    }

    @Override
    public boolean waiting() {
        return this.taskStatus.isRun();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deactive() {
        if (this.task_obj instanceof ISpout) {
            this.taskStatus.setStatus((byte)1);
            WorkerClassLoader.switchThreadContext();
            try {
                ((ISpout)this.task_obj).deactivate();
            }
            finally {
                WorkerClassLoader.restoreThreadContext();
            }
        } else {
            this.taskStatus.setStatus((byte)1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void active() {
        if (this.task_obj instanceof ISpout) {
            this.taskStatus.setStatus((byte)0);
            WorkerClassLoader.switchThreadContext();
            try {
                ((ISpout)this.task_obj).activate();
            }
            finally {
                WorkerClassLoader.restoreThreadContext();
            }
        } else {
            this.taskStatus.setStatus((byte)0);
        }
    }

    public void update(Map conf) {
        if (this.task_obj instanceof IDynamicComponent) {
            ((IDynamicComponent)this.task_obj).update(conf);
        }
    }

    @Override
    public void run() {
        this.shutdown();
    }

    public int getTaskId() {
        return this.task_id;
    }

    public Task getTask() {
        return this.task;
    }
}

