/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.supervisor;

import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.DaemonCommon;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.supervisor.ShutdownWork;
import com.alibaba.jstorm.daemon.supervisor.SupervisorDaemon;
import com.alibaba.jstorm.event.EventManager;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.codahale.metrics.Gauge;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SupervisorManger
extends ShutdownWork
implements SupervisorDaemon,
DaemonCommon,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
    private final Map conf;
    private final String supervisorId;
    private final AtomicBoolean shutdown;
    private final Vector<AsyncLoopThread> threads;
    private final EventManager eventManager;
    private final Httpserver httpserver;
    private final StormClusterState stormClusterState;
    private final JStormMetricsReporter metricsReporter;
    private final ConcurrentHashMap<String, String> workerThreadPidsAtom;
    private volatile boolean isFinishShutdown = false;

    public SupervisorManger(Map conf, String supervisorId, Vector<AsyncLoopThread> threads, EventManager eventManager, Httpserver httpserver, StormClusterState stormClusterState, ConcurrentHashMap<String, String> workerThreadPidsAtom) {
        this.conf = conf;
        this.supervisorId = supervisorId;
        this.shutdown = new AtomicBoolean(false);
        this.threads = threads;
        this.eventManager = eventManager;
        this.httpserver = httpserver;
        this.stormClusterState = stormClusterState;
        this.workerThreadPidsAtom = workerThreadPidsAtom;
        this.metricsReporter = new JStormMetricsReporter(this);
        this.metricsReporter.init();
        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName("CpuUsedRatio", MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

            public Double getValue() {
                return JStormUtils.getTotalCpuUsage();
            }
        }));
        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName("DiskUsage", MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

            public Double getValue() {
                return JStormUtils.getDiskUsage();
            }
        }));
        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName("MemoryUsage", MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

            public Double getValue() {
                return JStormUtils.getTotalMemUsage();
            }
        }));
        Runtime.getRuntime().addShutdownHook(new Thread(this));
    }

    @Override
    public void shutdown() {
        if (this.shutdown.getAndSet(true)) {
            LOG.info("Supervisor has been shutdown before " + this.supervisorId);
            return;
        }
        LOG.info("Shutting down supervisor " + this.supervisorId);
        AsyncLoopRunnable.getShutdown().set(true);
        for (AsyncLoopThread thread : this.threads) {
            thread.cleanup();
            JStormUtils.sleepMs(10L);
            thread.interrupt();
            LOG.info("Successfully shutdown thread:" + thread.getThread().getName());
        }
        this.eventManager.shutdown();
        try {
            this.stormClusterState.disconnect();
        }
        catch (Exception e) {
            LOG.error("Failed to shutdown ZK client", (Throwable)e);
        }
        if (this.httpserver != null) {
            this.httpserver.shutdown();
        }
        this.isFinishShutdown = true;
        JStormUtils.halt_process(0, "!!!Shutdown!!!");
    }

    @Override
    public void ShutdownAllWorkers() {
        String path;
        LOG.info("Begin to shutdown all workers");
        try {
            path = StormConfig.worker_root(this.conf);
        }
        catch (IOException e1) {
            LOG.error("Failed to get Local worker dir", (Throwable)e1);
            return;
        }
        List<String> myWorkerIds = PathUtils.read_dir_contents(path);
        HashMap<String, String> workerId2topologyIds = new HashMap<String, String>();
        for (String workerId : myWorkerIds) {
            workerId2topologyIds.put(workerId, null);
        }
        this.shutWorker(this.conf, this.supervisorId, workerId2topologyIds, this.workerThreadPidsAtom, null, true, null, null);
    }

    @Override
    public Map getConf() {
        return this.conf;
    }

    @Override
    public String getId() {
        return this.supervisorId;
    }

    @Override
    public boolean waiting() {
        if (this.shutdown.get()) {
            return true;
        }
        int size = this.threads.size();
        for (int i = 0; i < size; ++i) {
            if (this.threads.elementAt(i).isSleeping().booleanValue()) continue;
            return false;
        }
        return !this.eventManager.waiting();
    }

    @Override
    public void run() {
        this.shutdown();
    }

    public boolean isFinishShutdown() {
        return this.isFinishShutdown;
    }
}

