/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManagerServices {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
    @VisibleForTesting
    public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState";
    private final TaskManagerLocation taskManagerLocation;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;
    private final KvStateService kvStateService;
    private final BroadcastVariableManager broadcastVariableManager;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;
    private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
    private final TaskEventDispatcher taskEventDispatcher;

    TaskManagerServices(TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, ShuffleEnvironment<?, ?> shuffleEnvironment, KvStateService kvStateService, BroadcastVariableManager broadcastVariableManager, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, TaskExecutorLocalStateStoresManager taskManagerStateStore, TaskEventDispatcher taskEventDispatcher) {
        this.taskManagerLocation = (TaskManagerLocation)Preconditions.checkNotNull((Object)taskManagerLocation);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memoryManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.shuffleEnvironment = (ShuffleEnvironment)Preconditions.checkNotNull(shuffleEnvironment);
        this.kvStateService = (KvStateService)Preconditions.checkNotNull((Object)kvStateService);
        this.broadcastVariableManager = (BroadcastVariableManager)Preconditions.checkNotNull((Object)broadcastVariableManager);
        this.taskSlotTable = (TaskSlotTable)Preconditions.checkNotNull((Object)taskSlotTable);
        this.jobManagerTable = (JobManagerTable)Preconditions.checkNotNull((Object)jobManagerTable);
        this.jobLeaderService = (JobLeaderService)Preconditions.checkNotNull((Object)jobLeaderService);
        this.taskManagerStateStore = (TaskExecutorLocalStateStoresManager)Preconditions.checkNotNull((Object)taskManagerStateStore);
        this.taskEventDispatcher = (TaskEventDispatcher)Preconditions.checkNotNull((Object)taskEventDispatcher);
    }

    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    public IOManager getIOManager() {
        return this.ioManager;
    }

    public ShuffleEnvironment<?, ?> getShuffleEnvironment() {
        return this.shuffleEnvironment;
    }

    public KvStateService getKvStateService() {
        return this.kvStateService;
    }

    public TaskManagerLocation getTaskManagerLocation() {
        return this.taskManagerLocation;
    }

    public BroadcastVariableManager getBroadcastVariableManager() {
        return this.broadcastVariableManager;
    }

    public TaskSlotTable getTaskSlotTable() {
        return this.taskSlotTable;
    }

    public JobManagerTable getJobManagerTable() {
        return this.jobManagerTable;
    }

    public JobLeaderService getJobLeaderService() {
        return this.jobLeaderService;
    }

    public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
        return this.taskManagerStateStore;
    }

    public TaskEventDispatcher getTaskEventDispatcher() {
        return this.taskEventDispatcher;
    }

    public void shutDown() throws FlinkException {
        Exception exception = null;
        try {
            this.taskManagerStateStore.shutdown();
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.memoryManager.shutdown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.ioManager.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.shuffleEnvironment.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.kvStateService.shutdown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.taskSlotTable.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.jobLeaderService.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        this.taskEventDispatcher.clearAll();
        if (exception != null) {
            throw new FlinkException("Could not properly shut down the TaskManager services.", (Throwable)exception);
        }
    }

    public static TaskManagerServices fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration, MetricGroup taskManagerMetricGroup, Executor taskIOExecutor) throws Exception {
        TaskManagerServices.checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        IOManagerAsync ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
        ShuffleEnvironment<?, ?> shuffleEnvironment = TaskManagerServices.createShuffleEnvironment(taskManagerServicesConfiguration, taskEventDispatcher, taskManagerMetricGroup);
        int dataPort = shuffleEnvironment.start();
        KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
        kvStateService.start();
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getTaskManagerAddress(), dataPort);
        MemoryManager memoryManager = TaskManagerServices.createMemoryManager(taskManagerServicesConfiguration);
        long managedMemorySize = memoryManager.getMemorySize();
        BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
        int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
        List<ResourceProfile> resourceProfiles = Collections.nCopies(numOfSlots, TaskManagerServices.computeSlotResourceProfile(numOfSlots, managedMemorySize));
        TimerService<AllocationID> timerService = new TimerService<AllocationID>(new ScheduledThreadPoolExecutor(1), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
        TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
        String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
        File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
        TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, taskIOExecutor);
        return new TaskManagerServices(taskManagerLocation, memoryManager, ioManager, shuffleEnvironment, kvStateService, broadcastVariableManager, taskSlotTable, jobManagerTable, jobLeaderService, taskStateManager, taskEventDispatcher);
    }

    private static ShuffleEnvironment<?, ?> createShuffleEnvironment(TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, MetricGroup taskManagerMetricGroup) throws FlinkException {
        ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(taskManagerServicesConfiguration.getConfiguration(), taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getMaxJvmHeapMemory(), taskManagerServicesConfiguration.isLocalCommunicationOnly(), taskManagerServicesConfiguration.getTaskManagerAddress(), taskEventDispatcher, taskManagerMetricGroup);
        return ShuffleServiceLoader.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration()).createShuffleEnvironment(shuffleEnvironmentContext);
    }

    private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {
        MemoryManager memoryManager;
        long memorySize;
        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
        if (configuredMemory > 0L) {
            if (preAllocateMemory) {
                LOG.info("Using {} MB for managed memory.", (Object)configuredMemory);
            } else {
                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily.", (Object)configuredMemory);
            }
            memorySize = configuredMemory << 20;
        } else {
            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
            if (memType == MemoryType.HEAP) {
                long freeHeapMemoryWithDefrag = taskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag();
                long relativeMemSize = (long)((float)freeHeapMemoryWithDefrag * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB).", (Object)Float.valueOf(memoryFraction), (Object)(relativeMemSize >> 20));
                } else {
                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), memory will be allocated lazily.", (Object)Float.valueOf(memoryFraction), (Object)(relativeMemSize >> 20));
                }
                memorySize = relativeMemSize;
            } else if (memType == MemoryType.OFF_HEAP) {
                long maxJvmHeapMemory = taskManagerServicesConfiguration.getMaxJvmHeapMemory();
                long directMemorySize = (long)((double)maxJvmHeapMemory / (1.0 - (double)memoryFraction) * (double)memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB).", (Object)Float.valueOf(memoryFraction), (Object)(directMemorySize >> 20));
                } else {
                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB), memory will be allocated lazily.", (Object)Float.valueOf(memoryFraction), (Object)(directMemorySize >> 20));
                }
                memorySize = directMemorySize;
            } else {
                throw new RuntimeException("No supported memory type detected.");
            }
        }
        try {
            memoryManager = new MemoryManager(memorySize, taskManagerServicesConfiguration.getNumberOfSlots(), taskManagerServicesConfiguration.getPageSize(), memType, preAllocateMemory);
        }
        catch (OutOfMemoryError e) {
            if (memType == MemoryType.HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
            }
            if (memType == MemoryType.OFF_HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager off-heap memory (" + memorySize + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
            }
            throw e;
        }
        return memoryManager;
    }

    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument((totalJavaMemorySizeMB > 0L ? 1 : 0) != 0);
        long totalProcessMemory = TaskManagerServices.megabytesToBytes(totalJavaMemorySizeMB);
        long networkReservedMemory = TaskManagerServices.getReservedNetworkMemory(config, totalProcessMemory);
        long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
        if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
            long managedMemorySize = TaskManagerServices.getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
            ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "Managed memory size too large for " + (networkReservedMemory >> 20) + " MB network buffer memory and a total of " + totalJavaMemorySizeMB + " MB JVM memory");
            return TaskManagerServices.bytesToMegabytes(heapAndManagedMemory - managedMemorySize);
        }
        return TaskManagerServices.bytesToMegabytes(heapAndManagedMemory);
    }

    public static long getManagedMemoryFromProcessMemory(Configuration config, long totalProcessMemory) {
        long heapAndManagedMemory = totalProcessMemory - TaskManagerServices.getReservedNetworkMemory(config, totalProcessMemory);
        return TaskManagerServices.getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
    }

    public static long getManagedMemoryFromHeapAndManaged(Configuration config, long heapAndManagedMemory) {
        if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
            String sizeValue = config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE);
            try {
                return MemorySize.parse((String)sizeValue, (MemorySize.MemoryUnit)MemorySize.MemoryUnit.MEGA_BYTES).getBytes();
            }
            catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), (Throwable)e);
            }
        }
        float fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
        return (long)(fraction * (float)heapAndManagedMemory);
    }

    public static long getReservedNetworkMemory(Configuration config, long totalProcessMemory) {
        return NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(totalProcessMemory, config);
    }

    private static void checkTempDirs(String[] tmpDirs) throws IOException {
        for (String dir : tmpDirs) {
            if (dir != null && !dir.equals("")) {
                File file = new File(dir);
                if (!file.exists() && !file.mkdirs()) {
                    throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist and could not be created.");
                }
                if (!file.isDirectory()) {
                    throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
                }
                if (!file.canWrite()) {
                    throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
                }
                if (!LOG.isInfoEnabled()) continue;
                long totalSpaceGb = file.getTotalSpace() >> 30;
                long usableSpaceGb = file.getUsableSpace() >> 30;
                double usablePercentage = (double)usableSpaceGb / (double)totalSpaceGb * 100.0;
                String path = file.getAbsolutePath();
                LOG.info(String.format("Temporary file directory '%s': total %d GB, usable %d GB (%.2f%% usable)", path, totalSpaceGb, usableSpaceGb, usablePercentage));
                continue;
            }
            throw new IllegalArgumentException("Temporary file directory #$id is null.");
        }
    }

    public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
        int managedMemoryPerSlotMB = (int)TaskManagerServices.bytesToMegabytes(managedMemorySize / (long)numOfSlots);
        return new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, managedMemoryPerSlotMB, Collections.emptyMap());
    }

    private static long bytesToMegabytes(long bytes) {
        return bytes >> 20;
    }

    private static long megabytesToBytes(long megabytes) {
        return megabytes << 20;
    }
}

