package org.apache.storm.daemon.supervisor;

import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/supervisor/Supervisor.class */
public class Supervisor implements DaemonCommon, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
    private final Map<String, Object> conf;
    private final IContext sharedContext;
    private volatile boolean active;
    private final ISupervisor iSupervisor;
    private final Utils.UptimeComputer upTime;
    private final String stormVersion;
    private final IStormClusterState stormClusterState;
    private final LocalState localState;
    private final String supervisorId;
    private final String assignmentId;
    private final String hostName;
    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
    private final StormTimer heartbeatTimer;
    private final StormTimer eventTimer;
    private final StormTimer blobUpdateTimer;
    private final Localizer localizer;
    private final AsyncLocalizer asyncLocalizer;
    private EventManager eventManager;
    private ReadClusterState readState;

    private Supervisor(ISupervisor iSupervisor) throws IOException {
        this(Utils.readStormConfig(), null, iSupervisor);
    }

    public Supervisor(Map<String, Object> map, IContext iContext, ISupervisor iSupervisor) throws IOException {
        this.conf = map;
        this.iSupervisor = iSupervisor;
        this.active = true;
        this.upTime = Utils.makeUptimeComputer();
        this.stormVersion = VersionInfo.getVersion();
        this.sharedContext = iContext;
        iSupervisor.prepare(map, ConfigUtils.supervisorIsupervisorDir(map));
        try {
            this.stormClusterState = ClusterUtils.mkStormClusterState(map, Utils.isZkAuthenticationConfiguredStormServer(map) ? SupervisorUtils.supervisorZkAcls() : null, new ClusterStateContext(DaemonType.SUPERVISOR));
            try {
                this.localState = ConfigUtils.supervisorState(map);
                this.localizer = Utils.createLocalizer(map, ConfigUtils.supervisorLocalDir(map));
                this.asyncLocalizer = new AsyncLocalizer(map, this.localizer);
                this.supervisorId = iSupervisor.getSupervisorId();
                this.assignmentId = iSupervisor.getAssignmentId();
                try {
                    this.hostName = Utils.hostname();
                    this.currAssignment = new AtomicReference<>(new HashMap());
                    this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
                    this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
                    this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
                } catch (UnknownHostException e) {
                    throw Utils.wrapInRuntime(e);
                }
            } catch (IOException e2) {
                throw Utils.wrapInRuntime(e2);
            }
        } catch (Exception e3) {
            LOG.error("supervisor can't create stormClusterState");
            throw Utils.wrapInRuntime(e3);
        }
    }

    public String getId() {
        return this.supervisorId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IContext getSharedContext() {
        return this.sharedContext;
    }

    public Map<String, Object> getConf() {
        return this.conf;
    }

    public ISupervisor getiSupervisor() {
        return this.iSupervisor;
    }

    public Utils.UptimeComputer getUpTime() {
        return this.upTime;
    }

    public String getStormVersion() {
        return this.stormVersion;
    }

    public IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalState getLocalState() {
        return this.localState;
    }

    public String getAssignmentId() {
        return this.assignmentId;
    }

    public String getHostName() {
        return this.hostName;
    }

    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
        return this.currAssignment;
    }

    public Localizer getLocalizer() {
        return this.localizer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ILocalizer getAsyncLocalizer() {
        return this.asyncLocalizer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventManager getEventManger() {
        return this.eventManager;
    }

    public void launch() throws Exception {
        LOG.info("Starting Supervisor with conf {}", this.conf);
        FileUtils.cleanDirectory(new File(ConfigUtils.supervisorTmpDir(this.conf)));
        Localizer localizer = getLocalizer();
        SupervisorHeartbeat supervisorHeartbeat = new SupervisorHeartbeat(this.conf, this);
        supervisorHeartbeat.run();
        this.heartbeatTimer.scheduleRecurring(0, Utils.getInt(this.conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)).intValue(), supervisorHeartbeat);
        this.eventManager = new EventManagerImp(false);
        this.readState = new ReadClusterState(this);
        Set<String> readDownloadedTopologyIds = SupervisorUtils.readDownloadedTopologyIds(this.conf);
        if (this.localState.getLocalAssignmentsMap() != null) {
            HashMap hashMap = new HashMap();
            for (LocalAssignment localAssignment : this.localState.getLocalAssignmentsMap().values()) {
                hashMap.put(localAssignment.get_topology_id(), localAssignment);
            }
            for (String str : readDownloadedTopologyIds) {
                LocalAssignment localAssignment2 = (LocalAssignment) hashMap.get(str);
                if (localAssignment2 != null) {
                    SupervisorUtils.addBlobReferences(localizer, str, this.conf, localAssignment2.get_owner());
                } else {
                    LOG.warn("Could not find an owner for topo {}", str);
                }
            }
        }
        localizer.startCleaner();
        UpdateBlobs updateBlobs = new UpdateBlobs(this);
        if (((Boolean) this.conf.get(Config.SUPERVISOR_ENABLE)).booleanValue()) {
            this.eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(this.readState, this.eventManager));
            this.blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobs, this.eventManager));
            this.eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
        }
        LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
    }

    public void launchDaemon() {
        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
        try {
            Map<String, Object> conf = getConf();
            if (ConfigUtils.isLocalMode(conf)) {
                throw new IllegalArgumentException("Cannot start server in local mode!");
            }
            launch();
            Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() { // from class: org.apache.storm.daemon.supervisor.Supervisor.1
                @Override // java.lang.Runnable
                public void run() {
                    Supervisor.this.close();
                }
            });
            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
            StormMetricsRegistry.startMetricsReporters(conf);
        } catch (Exception e) {
            LOG.error("Failed to start supervisor\n", e);
            System.exit(1);
        }
    }

    private void registerWorkerNumGauge(String str, final Map<String, Object> map) {
        StormMetricsRegistry.registerGauge(str, new Callable<Integer>() { // from class: org.apache.storm.daemon.supervisor.Supervisor.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(SupervisorUtils.supervisorWorkerIds(map).size());
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            LOG.info("Shutting down supervisor {}", getId());
            this.active = false;
            this.heartbeatTimer.close();
            this.eventTimer.close();
            this.blobUpdateTimer.close();
            if (this.eventManager != null) {
                this.eventManager.close();
            }
            if (this.readState != null) {
                this.readState.close();
            }
            this.asyncLocalizer.shutdown();
            this.localizer.shutdown();
            getStormClusterState().disconnect();
        } catch (Exception e) {
            LOG.error("Error Shutting down", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void killWorkers(Collection<String> collection, ContainerLauncher containerLauncher) throws InterruptedException, IOException {
        HashSet hashSet = new HashSet();
        for (String str : collection) {
            try {
                Killable recoverContainer = containerLauncher.recoverContainer(str, this.localState);
                if (recoverContainer.areAllProcessesDead()) {
                    recoverContainer.cleanUp();
                } else {
                    recoverContainer.kill();
                    hashSet.add(recoverContainer);
                }
            } catch (Exception e) {
                LOG.error("Error trying to kill {}", str, e);
            }
        }
        int intValue = Utils.getInt(this.conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1).intValue();
        if (!hashSet.isEmpty()) {
            Time.sleepSecs(intValue);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            Killable killable = (Killable) it.next();
            try {
                killable.forceKill();
                long currentTimeMillis = Time.currentTimeMillis();
                while (!killable.areAllProcessesDead()) {
                    if (Time.currentTimeMillis() - currentTimeMillis > 10000) {
                        throw new RuntimeException("Giving up on killing " + killable + " after " + (Time.currentTimeMillis() - currentTimeMillis) + " ms");
                        break;
                    } else {
                        Time.sleep(100L);
                        killable.forceKill();
                    }
                }
                killable.cleanUp();
            } catch (Exception e2) {
                LOG.error("Error trying to clean up {}", killable, e2);
            }
        }
    }

    public void shutdownAllWorkers(UniFunc<Slot> uniFunc, UniFunc<Slot> uniFunc2) {
        if (this.readState != null) {
            this.readState.shutdownAllWorkers(uniFunc, uniFunc2);
            return;
        }
        try {
            killWorkers(SupervisorUtils.supervisorWorkerIds(this.conf), ContainerLauncher.make(getConf(), getId(), getSharedContext()));
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    @Override // org.apache.storm.daemon.DaemonCommon
    public boolean isWaiting() {
        if (this.active) {
            return this.heartbeatTimer.isTimerWaiting() && this.eventTimer.isTimerWaiting() && this.eventManager.waiting();
        }
        return true;
    }

    public static void main(String[] strArr) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        new Supervisor(new StandaloneSupervisor()).launchDaemon();
    }
}
