package org.apache.flink.runtime.jobmanager;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManager.class */
public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol, JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
    private static final int FAILURE_RETURN_CODE = 1;
    private final Server jobManagerServer;
    private final InstanceManager instanceManager;
    private final Scheduler scheduler;
    private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
    private final EventCollector eventCollector;
    private final ArchiveListener archive;
    private final AccumulatorManager accumulatorManager;
    private final int recommendedClientPollingInterval;
    private volatile boolean isShutDown;
    private WebInfoServer server;
    private BlobLibraryCacheManager libraryCacheManager;
    private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);

    public JobManager(ExecutionMode executionMode) throws Exception {
        String string = GlobalConfiguration.getString("jobmanager.rpc.address", (String) null);
        InetAddress inetAddress = null;
        if (string != null) {
            try {
                inetAddress = InetAddress.getByName(string);
            } catch (UnknownHostException e) {
                throw new Exception("Cannot convert " + string + " to an IP address: " + e.getMessage(), e);
            }
        }
        int integer = GlobalConfiguration.getInteger("jobmanager.rpc.port", 6123);
        this.recommendedClientPollingInterval = GlobalConfiguration.getInteger("jobclient.polling.interval", 2);
        this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
        this.libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), GlobalConfiguration.getConfiguration());
        int integer2 = GlobalConfiguration.getInteger("jobmanager.web.history", 5);
        if (integer2 > 0) {
            this.archive = new MemoryArchivist(integer2);
            this.eventCollector.registerArchivist(this.archive);
        } else {
            this.archive = null;
        }
        this.currentJobs = new ConcurrentHashMap<>();
        this.accumulatorManager = new AccumulatorManager(Math.min(FAILURE_RETURN_CODE, integer2));
        InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, integer);
        try {
            this.jobManagerServer = RPC.getServer(this, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), GlobalConfiguration.getInteger("jobmanager.rpc.numhandler", 8));
            this.jobManagerServer.start();
            LOG.info("Starting job manager in " + executionMode + " mode");
            if (executionMode == ExecutionMode.LOCAL) {
                this.instanceManager = new LocalInstanceManager(GlobalConfiguration.getInteger("localinstancemanager.numtaskmanager", FAILURE_RETURN_CODE));
            } else {
                if (executionMode != ExecutionMode.CLUSTER) {
                    throw new IllegalArgumentException("ExecutionMode");
                }
                this.instanceManager = new InstanceManager();
            }
            this.scheduler = new Scheduler(this.executorService);
            this.instanceManager.addInstanceListener(this.scheduler);
        } catch (IOException e2) {
            throw new Exception("Cannot start RPC server: " + e2.getMessage(), e2);
        }
    }

    public void shutdown() {
        if (this.isShutdownInProgress.compareAndSet(false, true)) {
            Iterator<ExecutionGraph> it = this.currentJobs.values().iterator();
            while (it.hasNext()) {
                it.next().fail(new Exception("The JobManager is shutting down."));
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    LOG.debug("Shutdown of executor thread pool interrupted", e);
                }
            }
            if (this.instanceManager != null) {
                this.instanceManager.shutdown();
            }
            if (this.libraryCacheManager != null) {
                try {
                    this.libraryCacheManager.shutdown();
                } catch (IOException e2) {
                    LOG.warn("Could not properly shutdown the library cache manager.", e2);
                }
            }
            if (this.jobManagerServer != null) {
                this.jobManagerServer.stop();
            }
            if (this.eventCollector != null) {
                this.eventCollector.shutdown();
            }
            if (this.scheduler != null) {
                this.scheduler.shutdown();
            }
            if (this.server != null) {
                try {
                    this.server.stop();
                } catch (Exception e3) {
                    throw new RuntimeException("Error shtopping the web-info-server.", e3);
                }
            }
            this.isShutDown = true;
            LOG.debug("Shutdown of job manager completed");
        }
    }

    @Override // org.apache.flink.runtime.protocols.JobManagementProtocol
    public JobSubmissionResult submitJob(JobGraph jobGraph) throws IOException {
        if (jobGraph == null) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
        }
        if (jobGraph.getNumberOfVertices() == 0) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Job is empty.");
        }
        ExecutionGraph executionGraph = null;
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Received job %s (%s)", jobGraph.getJobID(), jobGraph.getName()));
            }
            this.libraryCacheManager.register(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys());
            ExecutionGraph executionGraph2 = this.currentJobs.get(jobGraph.getJobID());
            if (executionGraph2 == null) {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Creating new execution graph for job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ')');
                }
                executionGraph2 = new ExecutionGraph(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(), this.executorService);
                if (this.currentJobs.putIfAbsent(jobGraph.getJobID(), executionGraph2) != null) {
                    throw new JobException("Concurrent submission of a job with the same jobId: " + jobGraph.getJobID());
                }
            } else if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", jobGraph.getJobID()));
            }
            executionGraph2.registerJobStatusListener(this);
            ClassLoader classLoader = this.libraryCacheManager.getClassLoader(jobGraph.getJobID());
            if (classLoader == null) {
                throw new JobException("The user code class loader could not be initialized.");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Running master initialization of job %s (%s)", jobGraph.getJobID(), jobGraph.getName()));
            }
            for (AbstractJobVertex abstractJobVertex : jobGraph.getVertices()) {
                String invokableClassName = abstractJobVertex.getInvokableClassName();
                if (invokableClassName == null || invokableClassName.length() == 0) {
                    throw new JobException(String.format("The vertex %s (%s) has no invokable class.", abstractJobVertex.getID(), abstractJobVertex.getName()));
                }
                abstractJobVertex.initializeOnMaster(classLoader);
            }
            List<AbstractJobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", Integer.valueOf(verticesSortedTopologicallyFromSources.size()), jobGraph.getJobID(), jobGraph.getName()));
            }
            executionGraph2.attachJobGraph(verticesSortedTopologicallyFromSources);
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", jobGraph.getJobID(), jobGraph.getName()));
            }
            executionGraph2.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
            if (this.eventCollector != null) {
                this.eventCollector.registerJob(executionGraph2, false, System.currentTimeMillis());
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Scheduling job " + jobGraph.getName());
            }
            executionGraph2.scheduleForExecution(this.scheduler);
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
        } catch (Throwable th) {
            LOG.error("Job submission failed.", th);
            if (0 != 0) {
                executionGraph.fail(th);
                try {
                    executionGraph.waitForJobEnd(10000L);
                } catch (InterruptedException e) {
                    LOG.error("Interrupted while waiting for job to finish canceling.");
                }
            }
            if (this.currentJobs.contains(jobGraph.getJobID())) {
                this.currentJobs.remove(jobGraph.getJobID());
                this.libraryCacheManager.unregister(jobGraph.getJobID());
            }
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(th));
        }
    }

    @Override // org.apache.flink.runtime.protocols.JobManagementProtocol
    public JobCancelResult cancelJob(JobID jobID) throws IOException {
        LOG.info("Trying to cancel job with ID " + jobID);
        final ExecutionGraph executionGraph = this.currentJobs.get(jobID);
        if (executionGraph == null) {
            LOG.info("No job found with ID " + jobID);
            return new JobCancelResult(AbstractJobResult.ReturnCode.ERROR, "Cannot find job with ID " + jobID);
        }
        executionGraph.execute(new Runnable() { // from class: org.apache.flink.runtime.jobmanager.JobManager.1
            @Override // java.lang.Runnable
            public void run() {
                executionGraph.cancel();
            }
        });
        return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
    }

    @Override // org.apache.flink.runtime.protocols.JobManagerProtocol
    public boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) throws IOException {
        Preconditions.checkNotNull(taskExecutionState);
        ExecutionGraph executionGraph = this.currentJobs.get(taskExecutionState.getJobID());
        if (executionGraph != null) {
            return executionGraph.updateState(taskExecutionState);
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Orphaned execution task: UpdateTaskExecutionState call cannot find execution graph for ID " + taskExecutionState.getJobID() + " to change state to " + taskExecutionState.getExecutionState());
        return false;
    }

    @Override // org.apache.flink.runtime.protocols.InputSplitProviderProtocol
    public InputSplit requestNextInputSplit(JobID jobID, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        ExecutionGraph executionGraph = this.currentJobs.get(jobID);
        if (executionGraph == null) {
            LOG.error("Cannot find execution graph to job ID " + jobID);
            return null;
        }
        ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null) {
            LOG.error("Cannot find execution vertex for vertex ID " + jobVertexID);
            return null;
        }
        InputSplitAssigner splitAssigner = jobVertex.getSplitAssigner();
        if (splitAssigner == null) {
            LOG.error("No InputSplitAssigner for vertex ID " + jobVertexID);
            return null;
        }
        String str = null;
        Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptID);
        if (execution == null) {
            LOG.error("Can not find Execution for attempt " + executionAttemptID);
        } else {
            AllocatedSlot assignedResource = execution.getAssignedResource();
            if (assignedResource != null) {
                str = assignedResource.getInstance().getInstanceConnectionInfo().getHostname();
            }
        }
        return splitAssigner.getNextInputSplit(str);
    }

    @Override // org.apache.flink.runtime.executiongraph.JobStatusListener
    public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus jobStatus, String str) {
        JobID jobID = executionGraph.getJobID();
        if (LOG.isInfoEnabled()) {
            LOG.info(String.format("Job %s (%s) switched to %s%s", jobID, executionGraph.getJobName(), jobStatus, str == null ? "." : ": " + str));
        }
        if (jobStatus.isTerminalState()) {
            this.currentJobs.remove(jobID);
            try {
                this.libraryCacheManager.unregister(jobID);
            } catch (Throwable th) {
                LOG.warn("Could not properly unregister job " + jobID + " from the library cache.");
            }
        }
    }

    @Override // org.apache.flink.runtime.protocols.JobManagementProtocol
    public JobProgressResult getJobProgress(JobID jobID) throws IOException {
        if (this.eventCollector == null) {
            return new JobProgressResult(AbstractJobResult.ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
        }
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        this.eventCollector.getEventsForJob(jobID, serializableArrayList, false);
        return new JobProgressResult(AbstractJobResult.ReturnCode.SUCCESS, null, serializableArrayList);
    }

    @Override // org.apache.flink.runtime.protocols.ChannelLookupProtocol
    public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo instanceConnectionInfo, JobID jobID, ChannelID channelID) {
        ExecutionGraph executionGraph = this.currentJobs.get(jobID);
        if (executionGraph != null) {
            return executionGraph.lookupConnectionInfoAndDeployReceivers(instanceConnectionInfo, channelID);
        }
        LOG.error("Cannot find execution graph to job ID " + jobID);
        return ConnectionInfoLookupResponse.createReceiverNotFound();
    }

    public boolean isShutDown() {
        return this.isShutDown;
    }

    public InstanceManager getInstanceManager() {
        return this.instanceManager;
    }

    @Override // org.apache.flink.runtime.protocols.JobManagementProtocol
    public IntegerRecord getRecommendedPollingInterval() throws IOException {
        return new IntegerRecord(this.recommendedClientPollingInterval);
    }

    @Override // org.apache.flink.runtime.protocols.ExtendedManagementProtocol
    public List<RecentJobEvent> getRecentJobs() throws IOException {
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getRecentJobs(serializableArrayList);
        return serializableArrayList;
    }

    @Override // org.apache.flink.runtime.protocols.ExtendedManagementProtocol
    public List<AbstractEvent> getEvents(JobID jobID) throws IOException {
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getEventsForJob(jobID, serializableArrayList, true);
        return serializableArrayList;
    }

    @Override // org.apache.flink.runtime.protocols.ExtendedManagementProtocol
    public int getTotalNumberOfRegisteredSlots() {
        return getInstanceManager().getTotalNumberOfSlots();
    }

    @Override // org.apache.flink.runtime.protocols.ExtendedManagementProtocol
    public int getNumberOfSlotsAvailableToScheduler() {
        return this.scheduler.getNumberOfAvailableSlots();
    }

    public void startInfoServer() {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        try {
            this.server = new WebInfoServer(configuration, configuration.getInteger("jobmanager.web.port", 8081), this);
            this.server.start();
        } catch (FileNotFoundException e) {
            LOG.error(e.getMessage(), e);
        } catch (Exception e2) {
            LOG.error("Cannot instantiate info server: " + e2.getMessage(), e2);
        }
    }

    public List<RecentJobEvent> getOldJobs() throws IOException {
        if (this.archive == null) {
            throw new IOException("No instance of the event collector found");
        }
        return this.archive.getJobs();
    }

    public ArchiveListener getArchive() {
        return this.archive;
    }

    public int getNumberOfTaskManagers() {
        return this.instanceManager.getNumberOfRegisteredTaskManagers();
    }

    public Map<InstanceID, Instance> getInstances() {
        return this.instanceManager.getAllRegisteredInstances();
    }

    @Override // org.apache.flink.runtime.protocols.AccumulatorProtocol
    public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException {
        this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), accumulatorEvent.getAccumulators(this.libraryCacheManager.getClassLoader(accumulatorEvent.getJobID())));
    }

    @Override // org.apache.flink.runtime.protocols.AccumulatorProtocol
    public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException {
        return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID));
    }

    public Map<String, Accumulator<?, ?>> getAccumulators(JobID jobID) {
        return this.accumulatorManager.getJobAccumulators(jobID);
    }

    public Map<JobID, ExecutionGraph> getCurrentJobs() {
        return Collections.unmodifiableMap(this.currentJobs);
    }

    public ExecutionGraph getRecentExecutionGraph(JobID jobID) throws IOException {
        ExecutionGraph executionGraph = this.currentJobs.get(jobID);
        if (executionGraph == null) {
            executionGraph = this.eventCollector.getManagementGraph(jobID);
            if (executionGraph == null && this.archive != null) {
                executionGraph = this.archive.getExecutionGraph(jobID);
            }
        }
        if (executionGraph == null) {
            throw new IOException("Cannot find execution graph for job with ID " + jobID);
        }
        return executionGraph;
    }

    @Override // org.apache.flink.runtime.protocols.JobManagerProtocol
    public boolean sendHeartbeat(InstanceID instanceID) {
        return this.instanceManager.reportHeartBeat(instanceID);
    }

    @Override // org.apache.flink.runtime.protocols.JobManagerProtocol
    public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int i) {
        if (this.instanceManager == null || this.scheduler == null) {
            return null;
        }
        return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, i);
    }

    public static void main(String[] strArr) {
        try {
            initialize(strArr).startInfoServer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            System.exit(FAILURE_RETURN_CODE);
        }
        Object obj = new Object();
        synchronized (obj) {
            try {
                obj.wait();
            } catch (InterruptedException e2) {
            }
        }
    }

    public static JobManager initialize(String[] strArr) throws Exception {
        OptionBuilder.withArgName("config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify configuration directory.");
        Option create = OptionBuilder.create("configDir");
        OptionBuilder.withArgName("execution mode");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify execution mode.");
        Option create2 = OptionBuilder.create("executionMode");
        Options options = new Options();
        options.addOption(create);
        options.addOption(create2);
        CommandLine commandLine = null;
        try {
            commandLine = new GnuParser().parse(options, strArr);
        } catch (ParseException e) {
            LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(FAILURE_RETURN_CODE);
        }
        String optionValue = commandLine.getOptionValue(create.getOpt(), (String) null);
        String optionValue2 = commandLine.getOptionValue(create2.getOpt(), "local");
        ExecutionMode executionMode = null;
        if ("local".equals(optionValue2)) {
            executionMode = ExecutionMode.LOCAL;
        } else if ("cluster".equals(optionValue2)) {
            executionMode = ExecutionMode.CLUSTER;
        } else {
            System.err.println("Unrecognized execution mode: " + optionValue2);
            System.exit(FAILURE_RETURN_CODE);
        }
        EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
        GlobalConfiguration.loadConfiguration(optionValue);
        JobManager jobManager = new JobManager(executionMode);
        Configuration configuration = GlobalConfiguration.getConfiguration();
        if (optionValue != null && new File(optionValue).isDirectory()) {
            configuration.setString("flink.base.dir.path", optionValue + "/..");
        }
        GlobalConfiguration.includeConfiguration(configuration);
        return jobManager;
    }

    @Override // org.apache.flink.runtime.protocols.ServiceDiscoveryProtocol
    public int getBlobServerPort() {
        return this.libraryCacheManager.getBlobServerPort();
    }
}
