package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/WorkerShutdown.class */
public class WorkerShutdown implements ShutdownableDameon {
    private static Logger LOG = LoggerFactory.getLogger(WorkerShutdown.class);
    private List<TaskShutdownDameon> shutdownTasks;
    private AtomicBoolean shutdown;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodePortToSocket;
    private IContext context;
    private List<AsyncLoopThread> threads;
    private StormClusterState zkCluster;
    private ClusterState cluster_state;
    private FlusherPool flusherPool;
    private ScheduledExecutorService threadPool;
    private IConnection recvConnection;
    private Map conf;

    public WorkerShutdown(WorkerData workerData, List<AsyncLoopThread> list) {
        this.shutdownTasks = workerData.getShutdownTasks();
        this.threads = list;
        this.shutdown = workerData.getShutdown();
        this.nodePortToSocket = workerData.getNodeportSocket();
        this.context = workerData.getContext();
        this.zkCluster = workerData.getZkCluster();
        this.threadPool = workerData.getThreadPool();
        this.cluster_state = workerData.getZkClusterstate();
        this.flusherPool = workerData.getFlusherPool();
        this.recvConnection = workerData.getRecvConnection();
        this.conf = workerData.getStormConf();
        Runtime.getRuntime().addShutdownHook(new Thread(this));
    }

    @Override // backtype.storm.daemon.Shutdownable
    public void shutdown() {
        if (this.shutdown.getAndSet(true)) {
            LOG.info("Worker has been shutdown already");
            return;
        }
        if (ConfigExtension.isOutworkerDump(this.conf)) {
            workerDumpInfoOutput();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<TaskShutdownDameon> it = this.shutdownTasks.iterator();
        while (it.hasNext()) {
            arrayList.add(this.flusherPool.submit(it.next()));
        }
        JStormServerUtils.checkFutures(arrayList);
        if (this.recvConnection != null) {
            this.recvConnection.close();
        }
        AsyncLoopRunnable.getShutdown().set(true);
        this.threadPool.shutdown();
        this.flusherPool.shutdown();
        try {
            this.flusherPool.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.error("Error when shutting down client scheduler", e);
        }
        for (AsyncLoopThread asyncLoopThread : this.threads) {
            LOG.info("Begin to shutdown " + asyncLoopThread.getThread().getName());
            asyncLoopThread.cleanup();
            JStormUtils.sleepMs(100L);
            asyncLoopThread.interrupt();
            LOG.info("Successfully " + asyncLoopThread.getThread().getName());
        }
        Iterator<WorkerSlot> it2 = this.nodePortToSocket.keySet().iterator();
        while (it2.hasNext()) {
            this.nodePortToSocket.get(it2.next()).close();
        }
        this.context.term();
        try {
            this.zkCluster.disconnect();
            this.cluster_state.close();
        } catch (Exception e2) {
            LOG.info("Shutdown error,", e2);
        }
        JStormUtils.halt_process(0, "!!!Shutdown!!!");
    }

    public void join() throws InterruptedException {
        Iterator<TaskShutdownDameon> it = this.shutdownTasks.iterator();
        while (it.hasNext()) {
            it.next().join();
        }
        Iterator<AsyncLoopThread> it2 = this.threads.iterator();
        while (it2.hasNext()) {
            it2.next().join();
        }
    }

    @Override // com.alibaba.jstorm.cluster.DaemonCommon
    public boolean waiting() {
        Boolean bool = false;
        Iterator<TaskShutdownDameon> it = this.shutdownTasks.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().waiting()) {
                bool = true;
                break;
            }
        }
        Iterator<AsyncLoopThread> it2 = this.threads.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            if (it2.next().isSleeping().booleanValue()) {
                bool = true;
                break;
            }
        }
        return bool.booleanValue();
    }

    @Override // java.lang.Runnable
    public void run() {
        shutdown();
    }

    private void workerDumpInfoOutput() {
        String str = ManagementFactory.getRuntimeMXBean().getName().split(MetricUtils.DELIM)[0];
        LOG.debug("worker's pid is " + str);
        String logFileName = JStormUtils.getLogFileName();
        if (logFileName == null) {
            return;
        }
        String str2 = logFileName + ".dump";
        try {
            if (!new File(str2).exists()) {
                PathUtils.touch(str2);
            }
            try {
                PrintWriter printWriter = new PrintWriter(new FileWriter(str2, true));
                StringBuilder sb = new StringBuilder();
                sb.append("jstack ");
                sb.append(str);
                LOG.debug("Begin to execute " + sb.toString());
                printWriter.println(JStormUtils.launchProcess(sb.toString(), new HashMap(), false));
                StringBuilder sb2 = new StringBuilder();
                sb2.append("jmap -heap ");
                sb2.append(str);
                LOG.debug("Begin to execute " + sb2.toString());
                printWriter.println(JStormUtils.launchProcess(sb2.toString(), new HashMap(), false));
            } catch (Exception e) {
                LOG.error("can't excute jstack and jmap about " + str);
                LOG.error(String.valueOf(e));
            }
        } catch (Exception e2) {
            LOG.warn("Failed to touch " + str2, e2);
        }
    }
}
