package org.apache.flink.runtime.taskmanager;

import java.io.File;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManager.class */
public class TaskManager implements TaskOperationProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
    private static final int STARTUP_FAILURE_RETURN_CODE = 1;
    private static final int MAX_LOST_HEART_BEATS = 3;
    private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
    public static final String ARG_CONF_DIR = "tempDir";
    private final InstanceConnectionInfo localInstanceConnectionInfo;
    private final HardwareDescription hardwareDescription;
    private final ExecutionMode executionMode;
    private final JobManagerProtocol jobManager;
    private final InputSplitProviderProtocol globalInputSplitProvider;
    private final ChannelLookupProtocol lookupService;
    private final AccumulatorProtocol accumulatorProtocolProxy;
    private final LibraryCacheManager libraryCacheManager;
    private final Server taskManagerServer;
    private final ChannelManager channelManager;
    private final TaskManagerProfiler profiler;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final int numberOfSlots;
    private final Thread heartbeatThread;
    private volatile InstanceID registeredId;
    private volatile boolean shutdownComplete;
    private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
    private final FileCache fileCache = new FileCache();
    private final ConcurrentHashMap<ExecutionAttemptID, Task> runningTasks = new ConcurrentHashMap<>();
    private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.taskmanager.TaskManager$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManager$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$ExecutionMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$taskmanager$TaskManager$AddressDetectionState = new int[AddressDetectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$TaskManager$AddressDetectionState[AddressDetectionState.ADDRESS.ordinal()] = TaskManager.STARTUP_FAILURE_RETURN_CODE;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$TaskManager$AddressDetectionState[AddressDetectionState.FAST_CONNECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$taskmanager$TaskManager$AddressDetectionState[AddressDetectionState.SLOW_CONNECT.ordinal()] = TaskManager.MAX_LOST_HEART_BEATS;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$flink$runtime$ExecutionMode = new int[ExecutionMode.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$ExecutionMode[ExecutionMode.LOCAL.ordinal()] = TaskManager.STARTUP_FAILURE_RETURN_CODE;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$ExecutionMode[ExecutionMode.CLUSTER.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManager$AddressDetectionState.class */
    public enum AddressDetectionState {
        ADDRESS(50),
        FAST_CONNECT(50),
        SLOW_CONNECT(1000);

        private int timeout;

        AddressDetectionState(int i) {
            this.timeout = i;
        }

        public int getTimeout() {
            return this.timeout;
        }
    }

    public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManagerProtocol, InputSplitProviderProtocol inputSplitProviderProtocol, ChannelLookupProtocol channelLookupProtocol, AccumulatorProtocol accumulatorProtocol, InetSocketAddress inetSocketAddress, InetAddress inetAddress) throws Exception {
        long j;
        if (executionMode == null || jobManagerProtocol == null || inputSplitProviderProtocol == null || channelLookupProtocol == null || accumulatorProtocol == null) {
            throw new NullPointerException();
        }
        LOG.info("TaskManager execution mode: " + executionMode);
        this.executionMode = executionMode;
        this.jobManager = jobManagerProtocol;
        this.lookupService = channelLookupProtocol;
        this.globalInputSplitProvider = inputSplitProviderProtocol;
        this.accumulatorProtocolProxy = accumulatorProtocol;
        int integer = GlobalConfiguration.getInteger("taskmanager.numberOfTaskSlots", -1);
        if (integer == -1) {
            integer = STARTUP_FAILURE_RETURN_CODE;
            LOG.info("Number of task slots not configured. Creating one task slot.");
        } else {
            if (integer <= 0) {
                throw new Exception("Illegal value for the number of task slots: " + integer);
            }
            LOG.info("Creating " + integer + " task slot(s).");
        }
        this.numberOfSlots = integer;
        int integer2 = GlobalConfiguration.getInteger("taskmanager.rpc.port", -1);
        int integer3 = GlobalConfiguration.getInteger("taskmanager.data.port", -1);
        integer2 = integer2 == -1 ? getAvailablePort() : integer2;
        this.localInstanceConnectionInfo = new InstanceConnectionInfo(inetAddress, integer2, integer3 == -1 ? getAvailablePort() : integer3);
        LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
        try {
            this.taskManagerServer = RPC.getServer(this, inetAddress.getHostAddress(), integer2, Math.min(this.numberOfSlots, 2 * Hardware.getNumberCPUCores()));
            this.taskManagerServer.start();
            if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
                this.profiler = ProfilingUtils.loadTaskManagerProfiler(GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY, "org.apache.flink.runtime.profiling.impl.TaskManagerProfilerImpl"), inetSocketAddress.getAddress(), this.localInstanceConnectionInfo);
                if (this.profiler == null) {
                    LOG.error("Cannot find class name for the profiler.");
                } else {
                    LOG.info("Profiling of jobs is enabled.");
                }
            } else {
                this.profiler = null;
                LOG.info("Profiling of jobs is disabled.");
            }
            String[] split = GlobalConfiguration.getString("taskmanager.tmp.dirs", ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
            checkTempDirs(split);
            int integer4 = GlobalConfiguration.getInteger("taskmanager.network.numberOfBuffers", 2048);
            int integer5 = GlobalConfiguration.getInteger("taskmanager.network.bufferSizeInBytes", DefaultMemoryManager.DEFAULT_PAGE_SIZE);
            try {
                NetworkConnectionManager networkConnectionManager = null;
                switch (AnonymousClass4.$SwitchMap$org$apache$flink$runtime$ExecutionMode[executionMode.ordinal()]) {
                    case STARTUP_FAILURE_RETURN_CODE /* 1 */:
                        networkConnectionManager = new LocalConnectionManager();
                        break;
                    case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                        networkConnectionManager = new NettyConnectionManager(this.localInstanceConnectionInfo.address(), this.localInstanceConnectionInfo.dataPort(), integer5, GlobalConfiguration.getInteger("taskmanager.net.numInThreads", -1), GlobalConfiguration.getInteger("taskmanager.net.numOutThreads", -1), GlobalConfiguration.getInteger("taskmanager.net.nettyLowWaterMark", -1), GlobalConfiguration.getInteger("taskmanager.net.nettyHighWaterMark", -1));
                        break;
                }
                this.channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, integer4, integer5, networkConnectionManager);
                long integer6 = GlobalConfiguration.getInteger("taskmanager.memory.size", -1);
                if (integer6 == -1) {
                    float f = GlobalConfiguration.getFloat("taskmanager.memory.fraction", 0.7f);
                    j = ((float) EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()) * f;
                    LOG.info("Using " + f + " of the free heap space for managed memory.");
                } else {
                    if (integer6 <= 0) {
                        throw new Exception("Invalid value for Memory Manager memory size: " + integer6);
                    }
                    j = integer6 << 20;
                }
                int integer7 = GlobalConfiguration.getInteger("taskmanager.network.bufferSizeInBytes", DefaultMemoryManager.DEFAULT_PAGE_SIZE);
                LOG.info("Initializing memory manager with " + (j >>> 20) + " megabytes of memory. Page size is " + integer7 + " bytes.");
                try {
                    GlobalConfiguration.getBoolean("taskmanager.memory.lazyalloc", false);
                    this.memoryManager = new DefaultMemoryManager(j, this.numberOfSlots, integer7);
                    this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize());
                    int blobServerPort = this.jobManager.getBlobServerPort();
                    if (blobServerPort == -1) {
                        LOG.warn("Unable to determine BLOB server address: User library download will not be available");
                        this.libraryCacheManager = new FallbackLibraryCacheManager();
                    } else {
                        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(inetSocketAddress.getAddress(), blobServerPort);
                        LOG.info("Determined BLOB server address to be " + inetSocketAddress2);
                        this.libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(inetSocketAddress2), GlobalConfiguration.getConfiguration());
                    }
                    this.ioManager = new IOManagerAsync(split);
                    final long integer8 = GlobalConfiguration.getInteger("taskmanager.heartbeat-interval", 5000);
                    this.heartbeatThread = new Thread() { // from class: org.apache.flink.runtime.taskmanager.TaskManager.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            TaskManager.this.registerAndRunHeartbeatLoop(integer8, TaskManager.MAX_LOST_HEART_BEATS);
                        }
                    };
                    this.heartbeatThread.setName("Heartbeat Thread");
                    this.heartbeatThread.start();
                    final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
                    final List garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
                    LOG.info(getMemoryUsageStatsAsString(memoryMXBean));
                    if (GlobalConfiguration.getBoolean("taskmanager.debug.memory.startLogThread", false)) {
                        final int integer9 = GlobalConfiguration.getInteger("taskmanager.debug.memory.logIntervalMs", 5000);
                        new Thread(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.TaskManager.2
                            @Override // java.lang.Runnable
                            public void run() {
                                while (!TaskManager.this.isShutDown()) {
                                    try {
                                        Thread.sleep(integer9);
                                        TaskManager.LOG.info(TaskManager.this.getMemoryUsageStatsAsString(memoryMXBean));
                                        TaskManager.LOG.info(TaskManager.this.getGarbageCollectorStatsAsString(garbageCollectorMXBeans));
                                    } catch (InterruptedException e) {
                                        TaskManager.LOG.warn("Unexpected interruption of memory usage logger thread.");
                                        return;
                                    }
                                }
                            }
                        }).start();
                    }
                } catch (Throwable th) {
                    LOG.error("Unable to initialize memory manager with " + (j >>> 20) + " megabytes of memory.", th);
                    throw new Exception("Unable to initialize memory manager.", th);
                }
            } catch (IOException e) {
                LOG.error(StringUtils.stringifyException(e));
                throw new Exception("Failed to instantiate ChannelManager.", e);
            }
        } catch (IOException e2) {
            LOG.error("Failed to start TaskManager server. " + e2.getMessage(), e2);
            throw new Exception("Failed to start taskmanager server. " + e2.getMessage(), e2);
        }
    }

    public void shutdown() {
        if (this.shutdownStarted.compareAndSet(false, true)) {
            LOG.info("Shutting down TaskManager");
            cancelAndClearEverything(new Exception("Task Manager is shutting down"));
            this.heartbeatThread.interrupt();
            try {
                this.heartbeatThread.join(1000L);
            } catch (InterruptedException e) {
            }
            this.registeredId = null;
            stopProxy(this.jobManager);
            stopProxy(this.globalInputSplitProvider);
            stopProxy(this.lookupService);
            stopProxy(this.accumulatorProtocolProxy);
            try {
                this.taskManagerServer.stop();
            } catch (Throwable th) {
                LOG.warn("TaskManager RPC server did not shut down properly.", th);
            }
            if (this.profiler != null) {
                this.profiler.shutdown();
            }
            try {
                this.channelManager.shutdown();
            } catch (Throwable th2) {
                LOG.warn("ChannelManager did not shutdown properly: " + th2.getMessage(), th2);
            }
            if (this.ioManager != null) {
                this.ioManager.shutdown();
            }
            if (this.memoryManager != null) {
                this.memoryManager.shutdown();
            }
            if (this.libraryCacheManager != null) {
                try {
                    this.libraryCacheManager.shutdown();
                } catch (IOException e2) {
                    LOG.warn("Could not properly shutdown the library cache manager.", e2);
                }
            }
            this.fileCache.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e3) {
                    LOG.debug("Shutdown of executor thread pool interrupted", e3);
                }
            }
            this.shutdownComplete = true;
        }
    }

    public boolean isShutDown() {
        return this.shutdownComplete;
    }

    public InstanceConnectionInfo getConnectionInfo() {
        return this.localInstanceConnectionInfo;
    }

    public ExecutionMode getExecutionMode() {
        return this.executionMode;
    }

    public InstanceID getRegisteredId() {
        return this.registeredId;
    }

    public boolean isRegistered() {
        return this.registeredId != null;
    }

    public Map<ExecutionAttemptID, Task> getAllRunningTasks() {
        return Collections.unmodifiableMap(this.runningTasks);
    }

    public ChannelManager getChannelManager() {
        return this.channelManager;
    }

    public BroadcastVariableManager getBroadcastVariableManager() {
        return this.bcVarManager;
    }

    @Override // org.apache.flink.runtime.protocols.TaskOperationProtocol
    public TaskOperationResult cancelTask(ExecutionAttemptID executionAttemptID) throws IOException {
        final Task task = this.runningTasks.get(executionAttemptID);
        if (task == null) {
            return new TaskOperationResult(executionAttemptID, false, "No task with that execution ID was found.");
        }
        this.executorService.execute(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.TaskManager.3
            @Override // java.lang.Runnable
            public void run() {
                task.cancelExecution();
            }
        });
        return new TaskOperationResult(executionAttemptID, true);
    }

    @Override // org.apache.flink.runtime.protocols.TaskOperationProtocol
    public TaskOperationResult submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
        String stringifyException;
        JobID jobID = taskDeploymentDescriptor.getJobID();
        JobVertexID vertexID = taskDeploymentDescriptor.getVertexID();
        ExecutionAttemptID executionId = taskDeploymentDescriptor.getExecutionId();
        int indexInSubtaskGroup = taskDeploymentDescriptor.getIndexInSubtaskGroup();
        int currentNumberOfSubtasks = taskDeploymentDescriptor.getCurrentNumberOfSubtasks();
        Task task = null;
        if (this.shutdownStarted.get()) {
            return new TaskOperationResult(executionId, false, "TaskManager is shut down.");
        }
        if (this.registeredId == null) {
            return new TaskOperationResult(executionId, false, "TaskManager lost connection to JobManager.");
        }
        try {
            this.libraryCacheManager.registerTask(jobID, executionId, taskDeploymentDescriptor.getRequiredJarFiles());
            ClassLoader classLoader = this.libraryCacheManager.getClassLoader(jobID);
            if (classLoader == null) {
                throw new Exception("No user code ClassLoader available.");
            }
            Task task2 = new Task(jobID, vertexID, indexInSubtaskGroup, currentNumberOfSubtasks, executionId, taskDeploymentDescriptor.getTaskName(), this);
            if (this.runningTasks.putIfAbsent(executionId, task2) != null) {
                throw new Exception("TaskManager contains already a task with executionId " + executionId);
            }
            RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment(task2, taskDeploymentDescriptor, classLoader, this.memoryManager, this.ioManager, new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexID, executionId), this.accumulatorProtocolProxy, this.bcVarManager);
            task2.setEnvironment(runtimeEnvironment);
            this.channelManager.register(task2);
            Configuration jobConfiguration = taskDeploymentDescriptor.getJobConfiguration();
            if (this.profiler != null && jobConfiguration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
                task2.registerProfiler(this.profiler, jobConfiguration);
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : DistributedCache.readFileInfoFromConfig(taskDeploymentDescriptor.getJobConfiguration())) {
                hashMap.put(entry.getKey(), this.fileCache.createTmpFile((String) entry.getKey(), (DistributedCache.DistributedCacheEntry) entry.getValue(), jobID));
            }
            runtimeEnvironment.addCopyTasksForCacheFile(hashMap);
            if (!task2.startExecution()) {
                throw new CancelTaskException();
            }
            if (this.shutdownStarted.get() || this.registeredId == null) {
                throw new Exception("Task Manager is shut down or is not connected to a JobManager.");
            }
            return new TaskOperationResult(executionId, true);
        } catch (Throwable th) {
            if (th instanceof CancelTaskException) {
                stringifyException = "Task was canceled";
            } else {
                LOG.error("Could not instantiate task", th);
                stringifyException = ExceptionUtils.stringifyException(th);
            }
            try {
                try {
                    task.failExternally(th);
                } catch (Throwable th2) {
                    LOG.error("Error during cleanup of task deployment", th2);
                }
                this.runningTasks.remove(executionId);
                if (0 != 0) {
                    removeAllTaskResources(null);
                }
                this.libraryCacheManager.unregisterTask(jobID, executionId);
            } catch (Throwable th3) {
                LOG.error("Error during cleanup of task deployment", th3);
                return new TaskOperationResult(executionId, false, stringifyException);
            }
            return new TaskOperationResult(executionId, false, stringifyException);
        }
    }

    private void unregisterTask(ExecutionAttemptID executionAttemptID) {
        Task remove = this.runningTasks.remove(executionAttemptID);
        if (remove != null) {
            removeAllTaskResources(remove);
            this.libraryCacheManager.unregisterTask(remove.getJobID(), executionAttemptID);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Cannot find task with ID " + executionAttemptID + " to unregister");
        }
    }

    private void removeAllTaskResources(Task task) {
        this.channelManager.unregister(task.getExecutionId(), task);
        task.unregisterProfiler(this.profiler);
        task.unregisterMemoryManager(this.memoryManager);
        try {
            if (task.getEnvironment() != null) {
                for (Map.Entry entry : DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
                    this.fileCache.deleteTmpFile((String) entry.getKey(), (DistributedCache.DistributedCacheEntry) entry.getValue(), task.getJobID());
                }
            }
        } catch (Throwable th) {
            LOG.error("Error cleaning up local files from the distributed cache.", th);
        }
    }

    public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionAttemptID, ExecutionState executionState, Throwable th) {
        Task task;
        boolean z = false;
        try {
            try {
                z = this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionAttemptID, executionState, th));
                if (!z && (task = this.runningTasks.get(executionAttemptID)) != null) {
                    task.failExternally(new IllegalStateException("Task has been disposed on JobManager."));
                }
                if (!z || executionState == ExecutionState.FINISHED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
                    unregisterTask(executionAttemptID);
                }
            } catch (Throwable th2) {
                LOG.error("Error sending task state update to JobManager.", th2);
                ExceptionUtils.rethrow(th2, "Error sending task state update to JobManager.");
                if (!z || executionState == ExecutionState.FINISHED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
                    unregisterTask(executionAttemptID);
                }
            }
        } catch (Throwable th3) {
            if (!z || executionState == ExecutionState.FINISHED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
                unregisterTask(executionAttemptID);
            }
            throw th3;
        }
    }

    public void cancelAndClearEverything(Throwable th) {
        if (this.runningTasks.size() > 0) {
            LOG.info("Cancelling all computations and discarding all cached data.");
            for (Task task : this.runningTasks.values()) {
                task.failExternally(th);
                this.runningTasks.remove(task.getExecutionId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerAndRunHeartbeatLoop(long j, int i) {
        loop0: while (!this.shutdownStarted.get()) {
            InstanceID instanceID = null;
            long j2 = 100;
            while (true) {
                long j3 = j2;
                if (this.shutdownStarted.get()) {
                    break;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Trying to register at Jobmanager...");
                }
                try {
                    instanceID = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo, this.hardwareDescription, this.numberOfSlots);
                } catch (Exception e) {
                    if (j3 >= 5000) {
                        LOG.error("Connection to JobManager failed.", e);
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug("Could not connect to JobManager.", e);
                    }
                }
                if (instanceID == null) {
                    throw new Exception("Registration attempt refused by JobManager.");
                    break loop0;
                } else {
                    if (instanceID != null) {
                        this.registeredId = instanceID;
                        break;
                    }
                    try {
                        Thread.sleep(j3);
                    } catch (InterruptedException e2) {
                        if (!this.shutdownStarted.get()) {
                            LOG.error("TaskManager's registration loop was interrupted without shutdown.");
                        }
                    }
                    j2 = Math.min(2 * j3, 10000L);
                }
            }
            int i2 = 0;
            while (true) {
                if (!this.shutdownStarted.get()) {
                    try {
                        Thread.sleep(j);
                    } catch (InterruptedException e3) {
                        if (!this.shutdownStarted.get()) {
                            LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
                        }
                    }
                    try {
                        if (this.jobManager.sendHeartbeat(instanceID)) {
                            i2 = 0;
                        } else {
                            i2 += STARTUP_FAILURE_RETURN_CODE;
                            LOG.error("JobManager rejected heart beat.");
                        }
                    } catch (IOException e4) {
                        if (!this.shutdownStarted.get()) {
                            i2 += STARTUP_FAILURE_RETURN_CODE;
                            LOG.error("Sending the heart beat failed on I/O error: " + e4.getMessage(), e4);
                        }
                    }
                    if (i2 == i) {
                        LOG.error("TaskManager has lost connection to JobManager.");
                        this.registeredId = null;
                        cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
                        try {
                            Thread.sleep(10000L);
                            break;
                        } catch (InterruptedException e5) {
                            if (!this.shutdownStarted.get()) {
                                LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
                            }
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
        MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
        MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage();
        return String.format("Memory usage stats: [HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)]", Long.valueOf(heapMemoryUsage.getUsed() >> 20), Long.valueOf(heapMemoryUsage.getCommitted() >> 20), Long.valueOf(heapMemoryUsage.getMax() >> 20), Long.valueOf(nonHeapMemoryUsage.getUsed() >> 20), Long.valueOf(nonHeapMemoryUsage.getCommitted() >> 20), Long.valueOf(nonHeapMemoryUsage.getMax() >> 20));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("Garbage collector stats: ");
        int i = 0;
        while (i < list.size()) {
            GarbageCollectorMXBean garbageCollectorMXBean = list.get(i);
            sb.append(String.format("[%s, GC TIME (ms): %d, GC COUNT: %d]", garbageCollectorMXBean.getName(), Long.valueOf(garbageCollectorMXBean.getCollectionTime()), Long.valueOf(garbageCollectorMXBean.getCollectionCount())));
            sb.append(i < list.size() - STARTUP_FAILURE_RETURN_CODE ? ", " : "");
            i += STARTUP_FAILURE_RETURN_CODE;
        }
        return sb.toString();
    }

    public static TaskManager createTaskManager(ExecutionMode executionMode) throws Exception {
        LOG.info("Reading location of job manager from configuration");
        String string = GlobalConfiguration.getString("jobmanager.rpc.address", (String) null);
        int integer = GlobalConfiguration.getInteger("jobmanager.rpc.port", 6123);
        if (string == null) {
            throw new Exception("Job manager address not configured in the GlobalConfiguration.");
        }
        try {
            return createTaskManager(executionMode, new InetSocketAddress(InetAddress.getByName(string), integer));
        } catch (UnknownHostException e) {
            LOG.error("Could not resolve JobManager host name.");
            throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
        }
    }

    public static TaskManager createTaskManager(ExecutionMode executionMode, InetSocketAddress inetSocketAddress) throws Exception {
        try {
            return createTaskManager(executionMode, inetSocketAddress, getTaskManagerAddress(inetSocketAddress));
        } catch (IOException e) {
            throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e);
        }
    }

    public static TaskManager createTaskManager(ExecutionMode executionMode, InetSocketAddress inetSocketAddress, InetAddress inetAddress) throws Exception {
        LOG.info("Connecting to JobManager at: " + inetSocketAddress);
        JobManagerProtocol jobManagerProtocol = null;
        InputSplitProviderProtocol inputSplitProviderProtocol = null;
        ChannelLookupProtocol channelLookupProtocol = null;
        AccumulatorProtocol accumulatorProtocol = null;
        try {
            try {
                jobManagerProtocol = (JobManagerProtocol) RPC.getProxy(JobManagerProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
                try {
                    inputSplitProviderProtocol = (InputSplitProviderProtocol) RPC.getProxy(InputSplitProviderProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
                    try {
                        channelLookupProtocol = (ChannelLookupProtocol) RPC.getProxy(ChannelLookupProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
                        try {
                            accumulatorProtocol = (AccumulatorProtocol) RPC.getProxy(AccumulatorProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
                            TaskManager taskManager = new TaskManager(executionMode, jobManagerProtocol, inputSplitProviderProtocol, channelLookupProtocol, accumulatorProtocol, inetSocketAddress, inetAddress);
                            if (STARTUP_FAILURE_RETURN_CODE == 0) {
                                stopProxy(jobManagerProtocol);
                                stopProxy(inputSplitProviderProtocol);
                                stopProxy(channelLookupProtocol);
                                stopProxy(accumulatorProtocol);
                            }
                            return taskManager;
                        } catch (IOException e) {
                            LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), e);
                            throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
                        }
                    } catch (IOException e2) {
                        LOG.error(e2.getMessage(), e2);
                        throw new Exception("Failed to initialize channel lookup protocol. " + e2.getMessage(), e2);
                    }
                } catch (IOException e3) {
                    LOG.error(e3.getMessage(), e3);
                    throw new Exception("Failed to initialize connection to global input split provider: " + e3.getMessage(), e3);
                }
            } catch (IOException e4) {
                LOG.error("Could not connect to the JobManager: " + e4.getMessage(), e4);
                throw new Exception("Failed to initialize connection to JobManager: " + e4.getMessage(), e4);
            }
        } catch (Throwable th) {
            if (0 == 0) {
                stopProxy(jobManagerProtocol);
                stopProxy(inputSplitProviderProtocol);
                stopProxy(channelLookupProtocol);
                stopProxy(accumulatorProtocol);
            }
            throw th;
        }
    }

    public static void main(String[] strArr) throws IOException {
        OptionBuilder.withArgName("config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify configuration directory.");
        Option create = OptionBuilder.create("configDir");
        OptionBuilder.withArgName("temporary directory (overwrites configured option)");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify temporary directory.");
        Option create2 = OptionBuilder.create(ARG_CONF_DIR);
        create.setRequired(true);
        create2.setRequired(false);
        Options options = new Options();
        options.addOption(create);
        options.addOption(create2);
        CommandLine commandLine = null;
        try {
            commandLine = new GnuParser().parse(options, strArr);
        } catch (ParseException e) {
            System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }
        String optionValue = commandLine.getOptionValue(create.getOpt(), (String) null);
        String optionValue2 = commandLine.getOptionValue(create2.getOpt(), (String) null);
        GlobalConfiguration.loadConfiguration(optionValue);
        if (optionValue2 != null && GlobalConfiguration.getString("taskmanager.tmp.dirs", (String) null) == null) {
            Configuration configuration = GlobalConfiguration.getConfiguration();
            configuration.setString("taskmanager.tmp.dirs", optionValue2);
            LOG.info("Setting temporary directory to " + optionValue2);
            GlobalConfiguration.includeConfiguration(configuration);
        }
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
        try {
            createTaskManager(ExecutionMode.CLUSTER);
        } catch (Throwable th) {
            LOG.error("Taskmanager startup failed: " + th.getMessage(), th);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }
        Object obj = new Object();
        synchronized (obj) {
            try {
                obj.wait();
            } catch (InterruptedException e2) {
            }
        }
    }

    private static final void checkTempDirs(String[] strArr) throws Exception {
        for (int i = 0; i < strArr.length; i += STARTUP_FAILURE_RETURN_CODE) {
            File file = new File((String) Preconditions.checkNotNull(strArr[i], "Temporary file directory #" + (i + STARTUP_FAILURE_RETURN_CODE) + " is null."));
            Preconditions.checkArgument(file.exists(), "Temporary file directory '" + file.getAbsolutePath() + "' does not exist.");
            Preconditions.checkArgument(file.isDirectory(), "Temporary file directory '" + file.getAbsolutePath() + "' is not a directory.");
            Preconditions.checkArgument(file.canWrite(), "Temporary file directory '" + file.getAbsolutePath() + "' is not writable.");
            if (LOG.isInfoEnabled()) {
                long totalSpace = file.getTotalSpace() >> 30;
                long usableSpace = file.getUsableSpace() >> 30;
                LOG.info(String.format("Temporary file directory '%s': total %d GB, usable %d GB [%.2f%% usable]", file.getAbsolutePath(), Long.valueOf(totalSpace), Long.valueOf(usableSpace), Double.valueOf((usableSpace / totalSpace) * 100.0d)));
            }
        }
    }

    private static final void stopProxy(VersionedProtocol versionedProtocol) {
        if (versionedProtocol != null) {
            try {
                RPC.stopProxy(versionedProtocol);
            } catch (Throwable th) {
                LOG.error("Error while shutting down RPC proxy.", th);
            }
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:36:0x00fe. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:41:0x0153 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:45:0x0004 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static java.net.InetAddress getTaskManagerAddress(java.net.InetSocketAddress r5) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 354
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.taskmanager.TaskManager.getTaskManagerAddress(java.net.InetSocketAddress):java.net.InetAddress");
    }

    private static int getAvailablePort() {
        int localPort;
        for (int i = 0; i < 50; i += STARTUP_FAILURE_RETURN_CODE) {
            ServerSocket serverSocket = null;
            try {
                try {
                    serverSocket = new ServerSocket(0);
                    localPort = serverSocket.getLocalPort();
                } catch (Throwable th) {
                    if (serverSocket != null) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                        }
                    }
                    throw th;
                }
            } catch (IOException e) {
                LOG.debug("Unable to allocate port with exception {}", e);
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th3) {
                    }
                }
            }
            if (localPort != 0) {
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                    }
                }
                return localPort;
            }
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (Throwable th5) {
                }
            }
        }
        throw new RuntimeException("Could not find a free permitted port on the machine.");
    }

    private static boolean hasCommonPrefix(byte[] bArr, byte[] bArr2) {
        return bArr[0] == bArr2[0] && bArr[STARTUP_FAILURE_RETURN_CODE] == bArr2[STARTUP_FAILURE_RETURN_CODE];
    }

    private static boolean tryToConnect(InetAddress inetAddress, SocketAddress socketAddress, int i) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying to connect to JobManager (" + socketAddress + ") from local address " + inetAddress + " with timeout " + i);
        }
        boolean z = STARTUP_FAILURE_RETURN_CODE;
        Socket socket = null;
        try {
            try {
                socket = new Socket();
                socket.bind(new InetSocketAddress(inetAddress, 0));
                socket.connect(socketAddress, i);
                if (socket != null) {
                    socket.close();
                }
            } catch (Exception e) {
                LOG.info("Failed to connect to JobManager from address '" + inetAddress + "': " + e.getMessage());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed with exception", e);
                }
                z = false;
                if (socket != null) {
                    socket.close();
                }
            }
            return z;
        } catch (Throwable th) {
            if (socket != null) {
                socket.close();
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.protocols.TaskOperationProtocol
    public void killTaskManager() throws IOException {
        LOG.info("Killing TaskManager");
        System.exit(0);
    }
}
