/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.supervisor;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.ContainerMemoryTracker;
import org.apache.storm.daemon.supervisor.ContainerRecoveryException;
import org.apache.storm.daemon.supervisor.Killable;
import org.apache.storm.daemon.supervisor.OnlyLatestExecutor;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerMetricList;
import org.apache.storm.generated.WorkerMetricPoint;
import org.apache.storm.generated.WorkerMetrics;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metricstore.MetricException;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

public abstract class Container
implements Killable {
    private static final Logger LOG = LoggerFactory.getLogger(Container.class);
    private static final String MEMORY_USED_METRIC = "UsedMemory";
    private static final String SYSTEM_COMPONENT_ID = "System";
    private static final String INVALID_EXECUTOR_ID = "-1";
    private static final String INVALID_STREAM_ID = "None";
    private final Meter numCleanupExceptions;
    private final Meter numKillExceptions;
    private final Meter numForceKillExceptions;
    private final Meter numForceKill;
    private final Timer shutdownDuration;
    private final Timer cleanupDuration;
    protected final Map<String, Object> _conf;
    protected final Map<String, Object> _topoConf;
    protected final String _topologyId;
    protected final String _supervisorId;
    protected final int _supervisorPort;
    protected final int _port;
    protected final LocalAssignment _assignment;
    protected final AdvancedFSOps _ops;
    protected final ResourceIsolationInterface _resourceIsolationManager;
    protected final boolean _symlinksDisabled;
    protected String _workerId;
    protected ContainerType _type;
    protected ContainerMemoryTracker containerMemoryTracker;
    private long lastMetricProcessTime = 0L;
    private Timer.Context shutdownTimer = null;

    protected Container(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, StormMetricsRegistry metricsRegistry, ContainerMemoryTracker containerMemoryTracker) throws IOException {
        assert (type != null);
        assert (conf != null);
        assert (supervisorId != null);
        this._symlinksDisabled = (Boolean)conf.getOrDefault("storm.disable.symlinks", false);
        if (ops == null) {
            ops = AdvancedFSOps.make(conf);
        }
        this._workerId = workerId;
        this._type = type;
        this._port = port;
        this._ops = ops;
        this._conf = conf;
        this._supervisorId = supervisorId;
        this._supervisorPort = supervisorPort;
        this._resourceIsolationManager = resourceIsolationManager;
        this._assignment = assignment;
        if (this._type.isOnlyKillable()) {
            assert (this._assignment == null);
            assert (this._port <= 0);
            assert (this._workerId != null);
            this._topologyId = null;
            this._topoConf = null;
        } else {
            assert (assignment != null);
            assert (port > 0);
            this._topologyId = assignment.get_topology_id();
            if (!this._ops.doRequiredTopoFilesExist(this._conf, this._topologyId)) {
                LOG.info("Missing topology storm code, so can't launch  worker with assignment {} for this supervisor {} on port {} with id {}", new Object[]{this._assignment, this._supervisorId, this._port, this._workerId});
                throw new ContainerRecoveryException("Missing required topology files...");
            }
            this._topoConf = topoConf == null ? this.readTopoConf() : topoConf;
        }
        this.numCleanupExceptions = metricsRegistry.registerMeter("supervisor:num-cleanup-exceptions");
        this.numKillExceptions = metricsRegistry.registerMeter("supervisor:num-kill-exceptions");
        this.numForceKillExceptions = metricsRegistry.registerMeter("supervisor:num-force-kill-exceptions");
        this.numForceKill = metricsRegistry.registerMeter("supervisor:num-workers-force-kill");
        this.shutdownDuration = metricsRegistry.registerTimer("supervisor:worker-shutdown-duration-ns");
        this.cleanupDuration = metricsRegistry.registerTimer("supervisor:worker-per-call-clean-up-duration-ns");
        this.containerMemoryTracker = containerMemoryTracker;
    }

    public String toString() {
        return "topo:" + this._topologyId + " worker:" + this._workerId;
    }

    protected Map<String, Object> readTopoConf() throws IOException {
        assert (this._topologyId != null);
        return ConfigUtils.readSupervisorStormConf(this._conf, (String)this._topologyId);
    }

    protected void kill(long pid) throws IOException {
        ServerUtils.killProcessWithSigTerm(String.valueOf(pid));
    }

    protected void forceKill(long pid) throws IOException {
        ServerUtils.forceKillProcess(String.valueOf(pid));
    }

    @Override
    public void kill() throws IOException {
        LOG.info("Killing {}:{}", (Object)this._supervisorId, (Object)this._workerId);
        if (this.shutdownTimer == null) {
            this.shutdownTimer = this.shutdownDuration.time();
        }
        try {
            Set<Long> pids = this.getAllPids();
            for (Long pid : pids) {
                this.kill(pid);
            }
        }
        catch (IOException e) {
            this.numKillExceptions.mark();
            throw e;
        }
    }

    @Override
    public void forceKill() throws IOException {
        LOG.info("Force Killing {}:{}", (Object)this._supervisorId, (Object)this._workerId);
        this.numForceKill.mark();
        try {
            Set<Long> pids = this.getAllPids();
            for (Long pid : pids) {
                this.forceKill(pid);
            }
        }
        catch (IOException e) {
            this.numForceKillExceptions.mark();
            throw e;
        }
    }

    public LSWorkerHeartbeat readHeartbeat() throws IOException {
        LocalState localState = ConfigUtils.workerState(this._conf, (String)this._workerId);
        LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
        LOG.trace("{}: Reading heartbeat {}", (Object)this._workerId, (Object)hb);
        return hb;
    }

    protected boolean isProcessAlive(long pid, String user) throws IOException {
        if (ServerUtils.IS_ON_WINDOWS) {
            return this.isWindowsProcessAlive(pid, user);
        }
        return this.isPosixProcessAlive(pid, user);
    }

    private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
        boolean ret = false;
        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
        Process p = pb.start();
        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));){
            String read;
            while ((read = in.readLine()) != null) {
                if (!read.contains("User Name:")) continue;
                List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":"));
                if (userNameLineSplitOnWhitespace.size() == 2) {
                    String processUser;
                    List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
                    String string = processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
                    if (user.equals(processUser)) {
                        ret = true;
                    } else {
                        LOG.info("Found {} running as {}, but expected it to be {}", new Object[]{pid, processUser, user});
                    }
                } else {
                    LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", (Object)read);
                }
                break;
            }
        }
        return ret;
    }

    private boolean isPosixProcessAlive(long pid, String user) throws IOException {
        boolean ret = false;
        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
        Process p = pb.start();
        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));){
            String processUser;
            String first = in.readLine();
            assert ("USER".equals(first));
            while ((processUser = in.readLine()) != null) {
                if (user.equals(processUser)) {
                    ret = true;
                    break;
                }
                LOG.info("Found {} running as {}, but expected it to be {}", new Object[]{pid, processUser, user});
            }
        }
        return ret;
    }

    @Override
    public boolean areAllProcessesDead() throws IOException {
        Set<Long> pids = this.getAllPids();
        String user = this.getRunWorkerAsUser();
        boolean allDead = true;
        for (Long pid : pids) {
            LOG.debug("Checking if pid {} owner {} is alive", (Object)pid, (Object)user);
            if (!this.isProcessAlive(pid, user)) {
                LOG.debug("{}: PID {} is dead", (Object)this._workerId, (Object)pid);
                continue;
            }
            allDead = false;
            break;
        }
        if (allDead && this.shutdownTimer != null) {
            this.shutdownTimer.stop();
            this.shutdownTimer = null;
        }
        return allDead;
    }

    @Override
    public void cleanUp() throws IOException {
        try (Timer.Context t = this.cleanupDuration.time();){
            this.containerMemoryTracker.remove(this._port);
            this.cleanUpForRestart();
        }
        catch (IOException e) {
            this.numCleanupExceptions.mark();
            throw e;
        }
    }

    protected void setup() throws IOException {
        this._type.assertFull();
        if (!this._ops.doRequiredTopoFilesExist(this._conf, this._topologyId)) {
            LOG.info("Missing topology storm code, so can't launch  worker with assignment {} for this supervisor {} on port {} with id {}", new Object[]{this._assignment, this._supervisorId, this._port, this._workerId});
            throw new IllegalStateException("Not all needed files are here!!!!");
        }
        LOG.info("Setting up {}:{}", (Object)this._supervisorId, (Object)this._workerId);
        this._ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(this._conf, (String)this._workerId)));
        this._ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(this._conf, (String)this._workerId)));
        this._ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(this._conf, (String)this._workerId)));
        File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(this._conf, (String)this._topologyId, (Integer)this._port));
        if (!this._ops.fileExists(workerArtifacts)) {
            this._ops.forceMkdir(workerArtifacts);
            this._ops.setupWorkerArtifactsDir(this._assignment.get_owner(), workerArtifacts);
        }
        String user = this.getWorkerUser();
        this.writeLogMetadata(user);
        this.saveWorkerUser(user);
        this.createArtifactsLink();
        this.createBlobstoreLinks();
    }

    protected void writeLogMetadata(String user) throws IOException {
        this._type.assertFull();
        HashMap<String, Object> data = new HashMap<String, Object>();
        data.put("topology.submitter.user", user);
        data.put("worker-id", this._workerId);
        HashSet<String> logsGroups = new HashSet<String>();
        if (this._topoConf.get("logs.groups") != null) {
            List groups = (List)this._topoConf.get("logs.groups");
            for (Iterator group : groups) {
                logsGroups.add((String)((Object)group));
            }
        }
        if (this._topoConf.get("topology.groups") != null) {
            List topGroups = (List)this._topoConf.get("topology.groups");
            logsGroups.addAll(topGroups);
        }
        data.put("logs.groups", logsGroups.toArray());
        HashSet<String> logsUsers = new HashSet<String>();
        if (this._topoConf.get("logs.users") != null) {
            List logUsers = (List)this._topoConf.get("logs.users");
            for (String logUser : logUsers) {
                logsUsers.add(logUser);
            }
        }
        if (this._topoConf.get("topology.users") != null) {
            List topUsers = (List)this._topoConf.get("topology.users");
            for (String logUser : topUsers) {
                logsUsers.add(logUser);
            }
        }
        data.put("logs.users", logsUsers.toArray());
        File file = ServerConfigUtils.getLogMetaDataFile(this._conf, this._topologyId, this._port);
        Yaml yaml = new Yaml();
        try (Writer writer = this._ops.getWriter(file);){
            yaml.dump(data, writer);
        }
    }

    protected void createArtifactsLink() throws IOException {
        this._type.assertFull();
        if (!this._symlinksDisabled) {
            File workerDir = new File(ConfigUtils.workerRoot(this._conf, (String)this._workerId));
            File topoDir = new File(ConfigUtils.workerArtifactsRoot(this._conf, (String)this._topologyId, (Integer)this._port));
            if (this._ops.fileExists(workerDir)) {
                LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", (Object)this._workerId, (Object)this._topologyId);
                this._ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
            }
        }
    }

    protected void createBlobstoreLinks() throws IOException {
        this._type.assertFull();
        String stormRoot = ConfigUtils.supervisorStormDistRoot(this._conf, (String)this._topologyId);
        String workerRoot = ConfigUtils.workerRoot(this._conf, (String)this._workerId);
        Map blobstoreMap = (Map)this._topoConf.get("topology.blobstore.map");
        ArrayList<String> blobFileNames = new ArrayList<String>();
        if (blobstoreMap != null) {
            for (Map.Entry entry : blobstoreMap.entrySet()) {
                String key = (String)entry.getKey();
                Map blobInfo = (Map)entry.getValue();
                String ret = null;
                ret = blobInfo != null && blobInfo.containsKey("localname") ? (String)blobInfo.get("localname") : key;
                blobFileNames.add(ret);
            }
        }
        File targetResourcesDir = new File(stormRoot, "resources");
        ArrayList<String> resourceFileNames = new ArrayList<String>();
        if (targetResourcesDir.exists()) {
            resourceFileNames.add("resources");
        }
        resourceFileNames.addAll(blobFileNames);
        if (!this._symlinksDisabled) {
            LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", new Object[]{this._workerId, this._topologyId, resourceFileNames.size(), resourceFileNames});
            if (targetResourcesDir.exists()) {
                this._ops.createSymlink(new File(workerRoot, "resources"), targetResourcesDir);
            } else {
                LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}.", new Object[]{this._workerId, this._topologyId, targetResourcesDir.toString()});
            }
            for (String fileName : blobFileNames) {
                this._ops.createSymlink(new File(workerRoot, fileName), new File(stormRoot, fileName));
            }
        } else if (blobFileNames.size() > 0) {
            LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames);
        }
    }

    protected Set<Long> getAllPids() throws IOException {
        HashSet<Long> ret = new HashSet<Long>();
        for (String listing : ConfigUtils.readDirContents((String)ConfigUtils.workerPidsRoot(this._conf, (String)this._workerId))) {
            ret.add(Long.valueOf(listing));
        }
        if (this._resourceIsolationManager != null) {
            Set<Long> morePids = this._resourceIsolationManager.getRunningPids(this._workerId);
            assert (morePids != null);
            ret.addAll(morePids);
        }
        return ret;
    }

    protected String getWorkerUser() throws IOException {
        LOG.info("GET worker-user for {}", (Object)this._workerId);
        File file = new File(ConfigUtils.workerUserFile(this._conf, (String)this._workerId));
        if (this._ops.fileExists(file)) {
            return this._ops.slurpString(file).trim();
        }
        if (this._assignment != null && this._assignment.is_set_owner()) {
            return this._assignment.get_owner();
        }
        if (ConfigUtils.isLocalMode(this._conf)) {
            return System.getProperty("user.name");
        }
        File f = new File(ConfigUtils.workerArtifactsRoot(this._conf));
        if (f.exists()) {
            return Files.getOwner(f.toPath(), new LinkOption[0]).getName();
        }
        throw new IllegalStateException("Could not recover the user for " + this._workerId);
    }

    protected String getRunWorkerAsUser() {
        return System.getProperty("user.name");
    }

    protected void saveWorkerUser(String user) throws IOException {
        this._type.assertFull();
        LOG.info("SET worker-user {} {}", (Object)this._workerId, (Object)user);
        this._ops.dump(new File(ConfigUtils.workerUserFile(this._conf, (String)this._workerId)), user);
    }

    protected void deleteSavedWorkerUser() throws IOException {
        LOG.info("REMOVE worker-user {}", (Object)this._workerId);
        this._ops.deleteIfExists(new File(ConfigUtils.workerUserFile(this._conf, (String)this._workerId)));
    }

    public void cleanUpForRestart() throws IOException {
        LOG.info("Cleaning up {}:{}", (Object)this._supervisorId, (Object)this._workerId);
        Set<Long> pids = this.getAllPids();
        String user = this.getWorkerUser();
        for (Long pid : pids) {
            File path = new File(ConfigUtils.workerPidPath(this._conf, (String)this._workerId, (long)pid));
            this._ops.deleteIfExists(path, user, this._workerId);
        }
        if (this._resourceIsolationManager != null) {
            this._resourceIsolationManager.releaseResourcesForWorker(this._workerId);
        }
        this._ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(this._conf, (String)this._workerId)), user, this._workerId);
        this._ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(this._conf, (String)this._workerId)), user, this._workerId);
        this._ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(this._conf, (String)this._workerId)), user, this._workerId);
        this._ops.deleteIfExists(new File(ConfigUtils.workerRoot(this._conf, (String)this._workerId)), user, this._workerId);
        this.deleteSavedWorkerUser();
        this._workerId = null;
    }

    public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
        this.updateMemoryAccounting();
        return false;
    }

    protected void updateMemoryAccounting() {
        this._type.assertFull();
        long used = this.getMemoryUsageMb();
        long reserved = this.getMemoryReservationMb();
        this.containerMemoryTracker.setUsedMemoryMb(this._port, this._topologyId, used);
        this.containerMemoryTracker.setReservedMemoryMb(this._port, this._topologyId, reserved);
    }

    public long getTotalTopologyMemoryUsed() {
        this.updateMemoryAccounting();
        return this.containerMemoryTracker.getUsedMemoryMb(this._topologyId);
    }

    public long getTotalTopologyMemoryReserved(LocalAssignment withUpdatedLimits) {
        this.updateMemoryAccounting();
        long ret = this.containerMemoryTracker.getReservedMemoryMb(this._topologyId);
        if (withUpdatedLimits.is_set_total_node_shared()) {
            ret = (long)((double)ret + withUpdatedLimits.get_total_node_shared());
        }
        return ret;
    }

    public long getTotalWorkersForThisTopology() {
        return this.containerMemoryTracker.getAssignedWorkerCount(this._topologyId);
    }

    public long getMemoryUsageMb() {
        return 0L;
    }

    public long getMemoryReservationMb() {
        return 0L;
    }

    public abstract void launch() throws IOException;

    public abstract void relaunch() throws IOException;

    public abstract boolean didMainProcessExit();

    public abstract boolean runProfiling(ProfileRequest var1, boolean var2) throws IOException, InterruptedException;

    public String getWorkerId() {
        return this._workerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processMetrics(OnlyLatestExecutor<Integer> exec, WorkerMetricsProcessor processor) {
        try {
            Optional<Long> usedMemoryForPort = this.containerMemoryTracker.getUsedMemoryMb(this._port);
            if (usedMemoryForPort.isPresent()) {
                long nextMetricProcessTime = this.lastMetricProcessTime + 60000L;
                long currentTimeMsec = System.currentTimeMillis();
                if (currentTimeMsec < nextMetricProcessTime) {
                    return;
                }
                String hostname = Utils.hostname();
                long timestamp = System.currentTimeMillis();
                WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, (double)usedMemoryForPort.get().longValue(), SYSTEM_COMPONENT_ID, INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
                WorkerMetricList metricList = new WorkerMetricList();
                metricList.add_to_metrics(workerMetric);
                WorkerMetrics metrics = new WorkerMetrics(this._topologyId, this._port, hostname, metricList);
                exec.execute(this._port, () -> {
                    try {
                        processor.processWorkerMetrics(this._conf, metrics);
                    }
                    catch (MetricException e) {
                        LOG.error("Failed to process metrics", (Throwable)e);
                    }
                });
            }
        }
        catch (Exception e) {
            LOG.error("Failed to process metrics", (Throwable)e);
        }
        finally {
            this.lastMetricProcessTime = System.currentTimeMillis();
        }
    }

    public static enum ContainerType {
        LAUNCH(false, false),
        RECOVER_FULL(true, false),
        RECOVER_PARTIAL(true, true);

        private final boolean _recovery;
        private final boolean _onlyKillable;

        private ContainerType(boolean recovery, boolean onlyKillable) {
            this._recovery = recovery;
            this._onlyKillable = onlyKillable;
        }

        public boolean isRecovery() {
            return this._recovery;
        }

        public void assertFull() {
            if (this._onlyKillable) {
                throw new IllegalStateException("Container is only Killable.");
            }
        }

        public boolean isOnlyKillable() {
            return this._onlyKillable;
        }
    }
}

