package org.apache.flink.runtime.profiling.impl;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.profiling.ProfilingException;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/runtime/profiling/impl/TaskManagerProfilerImpl.class */
public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerProfiler {
    private static final Log LOG = LogFactory.getLog(TaskManagerProfilerImpl.class);
    private final ProfilerImplProtocol jobManagerProfiler;
    private final Timer timer;
    private final ThreadMXBean tmx;
    private final long timerInterval;
    private final InstanceProfiler instanceProfiler;
    private final ProfilingDataContainer profilingDataContainer = new ProfilingDataContainer();
    private final Map<Environment, EnvironmentThreadSet> monitoredThreads = new HashMap();

    public TaskManagerProfilerImpl(InetAddress inetAddress, InstanceConnectionInfo instanceConnectionInfo) throws ProfilingException {
        try {
            this.jobManagerProfiler = (ProfilerImplProtocol) RPC.getProxy(ProfilerImplProtocol.class, new InetSocketAddress(inetAddress, GlobalConfiguration.getInteger(ProfilingUtils.JOBMANAGER_RPC_PORT_KEY, ProfilingUtils.JOBMANAGER_DEFAULT_RPC_PORT)), NetUtils.getSocketFactory());
            this.tmx = ManagementFactory.getThreadMXBean();
            if (!this.tmx.isThreadContentionMonitoringSupported()) {
                throw new ProfilingException("The thread contention monitoring is not supported.");
            }
            this.tmx.setThreadContentionMonitoringEnabled(true);
            this.instanceProfiler = new InstanceProfiler(instanceConnectionInfo);
            this.timerInterval = GlobalConfiguration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY, 2) * 1000;
            long random = (long) (Math.random() * this.timerInterval);
            this.timer = new Timer(true);
            this.timer.schedule(this, random, this.timerInterval);
        } catch (IOException e) {
            throw new ProfilingException(StringUtils.stringifyException(e));
        }
    }

    @Override // org.apache.flink.runtime.profiling.TaskManagerProfiler
    public void registerExecutionListener(Task task, Configuration configuration) {
        task.registerExecutionListener(new EnvironmentListenerImpl(this, task.getRuntimeEnvironment()));
    }

    @Override // org.apache.flink.runtime.profiling.TaskManagerProfiler
    public void unregisterExecutionListener(ExecutionVertexID executionVertexID) {
    }

    @Override // org.apache.flink.runtime.profiling.TaskManagerProfiler
    public void shutdown() {
        this.timer.cancel();
    }

    @Override // java.util.TimerTask, java.lang.Runnable
    public void run() {
        long currentTimeMillis = System.currentTimeMillis();
        InternalInstanceProfilingData internalInstanceProfilingData = null;
        synchronized (this.monitoredThreads) {
            for (Environment environment : this.monitoredThreads.keySet()) {
                InternalExecutionVertexThreadProfilingData captureCPUUtilization = this.monitoredThreads.get(environment).captureCPUUtilization(environment.getJobID(), this.tmx, currentTimeMillis);
                if (captureCPUUtilization != null) {
                    this.profilingDataContainer.addProfilingData(captureCPUUtilization);
                }
            }
            if (!this.monitoredThreads.isEmpty()) {
                try {
                    internalInstanceProfilingData = this.instanceProfiler.generateProfilingData(currentTimeMillis);
                } catch (ProfilingException e) {
                    LOG.error("Error while retrieving instance profiling data: ", e);
                }
            }
        }
        synchronized (this.profilingDataContainer) {
            if (internalInstanceProfilingData != null) {
                this.profilingDataContainer.addProfilingData(internalInstanceProfilingData);
            }
            if (!this.profilingDataContainer.isEmpty()) {
                try {
                    this.jobManagerProfiler.reportProfilingData(this.profilingDataContainer);
                    this.profilingDataContainer.clear();
                } catch (IOException e2) {
                    LOG.error(e2);
                }
            }
        }
    }

    public void registerMainThreadForCPUProfiling(Environment environment, Thread thread, ExecutionVertexID executionVertexID) {
        synchronized (this.monitoredThreads) {
            LOG.debug("Registering thread " + thread.getName() + " for CPU monitoring");
            if (this.monitoredThreads.containsKey(environment)) {
                LOG.error("There is already a main thread registered for environment object " + environment.getTaskName());
            }
            this.monitoredThreads.put(environment, new EnvironmentThreadSet(this.tmx, thread, executionVertexID));
        }
    }

    public void registerUserThreadForCPUProfiling(Environment environment, Thread thread) {
        synchronized (this.monitoredThreads) {
            EnvironmentThreadSet environmentThreadSet = this.monitoredThreads.get(environment);
            if (environmentThreadSet == null) {
                LOG.error("Trying to register " + thread.getName() + " but no main thread found!");
            } else {
                environmentThreadSet.addUserThread(this.tmx, thread);
            }
        }
    }

    public void unregisterMainThreadFromCPUProfiling(Environment environment, Thread thread) {
        synchronized (this.monitoredThreads) {
            LOG.debug("Unregistering thread " + thread.getName() + " from CPU monitoring");
            EnvironmentThreadSet remove = this.monitoredThreads.remove(environment);
            if (remove != null) {
                if (remove.getMainThread() != thread) {
                    LOG.error("The thread " + thread.getName() + " is not the main thread of this environment");
                }
                if (remove.getNumberOfUserThreads() > 0) {
                    LOG.error("Thread " + remove.getMainThread().getName() + " has still unfinished user threads!");
                }
            }
        }
    }

    public void unregisterUserThreadFromCPUProfiling(Environment environment, Thread thread) {
        synchronized (this.monitoredThreads) {
            EnvironmentThreadSet environmentThreadSet = this.monitoredThreads.get(environment);
            if (environmentThreadSet == null) {
                LOG.error("Trying to unregister " + thread.getName() + " but no main thread found!");
            } else {
                environmentThreadSet.removeUserThread(thread);
            }
        }
    }
}
