/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Props$;
import grizzled.slf4j.Logger;
import grizzled.slf4j.Logger$;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils$;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManager$;
import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManager$;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.NetUtils;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple10;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.FiniteDuration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.Nothing$;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scopt.OptionParser;
import scopt.Read$;

public final class JobManager$ {
    public static JobManager$ MODULE$;
    private final Logger LOG;
    private final int STARTUP_FAILURE_RETURN_CODE;
    private final int RUNTIME_FAILURE_RETURN_CODE;

    static {
        new JobManager$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public int STARTUP_FAILURE_RETURN_CODE() {
        return this.STARTUP_FAILURE_RETURN_CODE;
    }

    public int RUNTIME_FAILURE_RETURN_CODE() {
        return this.RUNTIME_FAILURE_RETURN_CODE;
    }

    public void main(String[] args) {
        String string;
        JobManagerMode jobManagerMode;
        Configuration configuration;
        Iterator portRange;
        block10: {
            Tuple4 tuple4;
            block9: {
                EnvironmentInformation.logEnvironmentInfo(this.LOG().logger(), "JobManager", args);
                SignalHandler.register(this.LOG().logger());
                JvmShutdownSafeguard.installAsShutdownHook(this.LOG().logger());
                tuple4 = this.liftedTree1$1(args);
                if (tuple4 == null) break block9;
                Configuration configuration2 = (Configuration)tuple4._1();
                JobManagerMode executionMode = (JobManagerMode)((Object)tuple4._2());
                String externalHostName = (String)tuple4._3();
                portRange = (Iterator)tuple4._4();
                if (configuration2 == null) break block9;
                configuration = configuration2;
                if (executionMode == null) break block9;
                jobManagerMode = executionMode;
                if (externalHostName == null) break block9;
                string = externalHostName;
                if (portRange != null) break block10;
            }
            throw new MatchError((Object)tuple4);
        }
        Iterator iterator = portRange;
        Tuple4 tuple4 = new Tuple4((Object)configuration, (Object)jobManagerMode, (Object)string, (Object)iterator);
        Tuple4 tuple42 = tuple4;
        Configuration configuration3 = (Configuration)tuple42._1();
        JobManagerMode executionMode = (JobManagerMode)((Object)tuple42._2());
        String externalHostName = (String)tuple42._3();
        Iterator portRange2 = (Iterator)tuple42._4();
        if (externalHostName == null) {
            String message = new StringBuilder(72).append("Config parameter '").append(JobManagerOptions.ADDRESS.key()).append("' is missing (hostname/address to bind JobManager to).").toString();
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> message);
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
        }
        if (!portRange2.hasNext()) {
            if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration3)) {
                String message = "Config parameter 'high-availability.jobmanager.port' does not specify a valid port range.";
                this.LOG().error((Function0 & Serializable & scala.Serializable)() -> message);
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            } else {
                String message = new StringBuilder(50).append("Config parameter '").append(JobManagerOptions.ADDRESS.key()).append("' does not specify a valid port.").toString();
                this.LOG().error((Function0 & Serializable & scala.Serializable)() -> message);
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            }
        }
        SecurityUtils.install(new SecurityConfiguration(configuration3));
        try {
            SecurityUtils.getInstalledContext().runSecured(new Callable<BoxedUnit>(configuration3, executionMode, externalHostName, portRange2){
                private final Configuration configuration$1;
                private final JobManagerMode executionMode$1;
                private final String externalHostName$1;
                private final Iterator portRange$1;

                public void call() {
                    JobManager$.MODULE$.runJobManager(this.configuration$1, this.executionMode$1, this.externalHostName$1, this.portRange$1);
                }
                {
                    this.configuration$1 = configuration$1;
                    this.executionMode$1 = executionMode$1;
                    this.externalHostName$1 = externalHostName$1;
                    this.portRange$1 = portRange$1;
                }
            });
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> "Failed to run JobManager.", (Function0 & Serializable & scala.Serializable)() -> t);
            t.printStackTrace();
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
        }
    }

    public void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, int listeningPort) {
        Void void_;
        Option webMonitorOption;
        int numberProcessors = Hardware.getNumberCPUCores();
        ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(numberProcessors, new ExecutorThreadFactory("jobmanager-future"));
        ExecutorService ioExecutor = Executors.newFixedThreadPool(numberProcessors, new ExecutorThreadFactory("jobmanager-io"));
        FiniteDuration timeout = AkkaUtils$.MODULE$.getTimeout(configuration);
        ActorSystem jobManagerSystem = this.startActorSystem(configuration, listeningAddress, listeningPort);
        HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, ioExecutor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
        MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
        metricRegistry.startQueryService(jobManagerSystem, null);
        Tuple4 tuple4 = this.liftedTree2$1(configuration, executionMode, listeningAddress, futureExecutor, ioExecutor, jobManagerSystem, highAvailabilityServices, metricRegistry);
        if (tuple4 == null) {
            throw new MatchError((Object)tuple4);
        }
        Option option = webMonitorOption = (Option)tuple4._3();
        Option webMonitorOption2 = option;
        Await$.MODULE$.ready((Awaitable)jobManagerSystem.whenTerminated(), (Duration)Duration$.MODULE$.Inf());
        webMonitorOption2.foreach((Function1 & Serializable & scala.Serializable)webMonitor -> {
            JobManager$.$anonfun$runJobManager$1(webMonitor);
            return BoxedUnit.UNIT;
        });
        try {
            highAvailabilityServices.close();
        }
        catch (Throwable t) {
            this.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not properly stop the high availability services.", (Function0 & Serializable & scala.Serializable)() -> t);
        }
        try {
            void_ = metricRegistry.shutdown().get();
        }
        catch (Throwable t) {
            this.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not properly shut down the metric registry.", (Function0 & Serializable & scala.Serializable)() -> t);
            void_ = BoxedUnit.UNIT;
        }
        ExecutorUtils.gracefulShutdown((long)timeout.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{futureExecutor, ioExecutor});
    }

    public void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, Iterator<Integer> listeningPortRange) {
        Try result = AkkaUtils$.MODULE$.retryOnBindException((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            int n;
            ServerSocket socket = NetUtils.createSocketFromPorts((Iterator)listeningPortRange, (NetUtils.SocketFactory)new NetUtils.SocketFactory(){

                public ServerSocket createSocket(int port) {
                    return new ServerSocket(port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress()));
                }
            });
            if (socket == null) {
                throw new BindException("Unable to allocate port for JobManager.");
            }
            try {
                n = socket.getLocalPort();
            }
            finally {
                socket.close();
            }
            int port = n;
            MODULE$.runJobManager(configuration, executionMode, listeningAddress, port);
        }, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> !listeningPortRange.hasNext(), 5000L);
        Try try_ = result;
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable f = failure.exception();
            throw f;
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public ActorSystem startActorSystem(Configuration configuration, String externalHostname, int port) {
        ActorSystem jobManagerSystem = BootstrapTools.startActorSystem(configuration, externalHostname, port, this.LOG().logger());
        Address address = AkkaUtils$.MODULE$.getAddress(jobManagerSystem);
        configuration.setString(JobManagerOptions.ADDRESS, (String)address.host().get());
        configuration.setInteger(JobManagerOptions.PORT, BoxesRunTime.unboxToInt((Object)address.port().get()));
        return jobManagerSystem;
    }

    public Tuple4<ActorRef, ActorRef, Option<WebMonitor>, Option<ActorRef>> startJobManagerActors(ActorSystem jobManagerSystem, Configuration configuration, JobManagerMode executionMode, String externalHostname, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricRegistry, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass, Option<Class<? extends FlinkResourceManager<?>>> resourceManagerClass) {
        Tuple4 tuple4;
        None$ none$;
        if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting JobManager web frontend");
            Time timeout = FutureUtils.toTime(AkkaUtils$.MODULE$.getTimeout(configuration));
            WebMonitor webServer = WebMonitorUtils.startWebRuntimeMonitor(configuration, highAvailabilityServices, new AkkaJobManagerRetriever(jobManagerSystem, timeout, 10, Time.milliseconds((long)50L)), new AkkaQueryServiceRetriever(jobManagerSystem, timeout), timeout, new ScheduledExecutorServiceAdapter(futureExecutor));
            none$ = Option$.MODULE$.apply((Object)webServer);
        } else {
            none$ = None$.MODULE$;
        }
        None$ webMonitor = none$;
        webMonitor.foreach((Function1 & Serializable & scala.Serializable)monitor -> {
            configuration.setInteger(WebOptions.PORT, monitor.getServerPort());
            return BoxedUnit.UNIT;
        });
        try {
            None$ none$2;
            BoxedUnit boxedUnit;
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting JobManager actor");
            Tuple2<ActorRef, ActorRef> tuple2 = this.startJobManagerActors(configuration, jobManagerSystem, futureExecutor, ioExecutor, highAvailabilityServices, metricRegistry, (Option<String>)webMonitor.map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.getRestAddress()), jobManagerClass, archiveClass);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ActorRef jobManager = (ActorRef)tuple2._1();
            ActorRef archive = (ActorRef)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)jobManager, (Object)archive);
            Tuple2 tuple23 = tuple22;
            ActorRef jobManager2 = (ActorRef)tuple23._1();
            ActorRef archive2 = (ActorRef)tuple23._2();
            this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "Starting JobManager process reaper");
            jobManagerSystem.actorOf(Props$.MODULE$.apply(ProcessReaper.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobManager2, this.LOG().logger(), BoxesRunTime.boxToInteger((int)this.RUNTIME_FAILURE_RETURN_CODE())})), "JobManager_Process_Reaper");
            JobManagerMode jobManagerMode = executionMode;
            JobManagerMode jobManagerMode2 = JobManagerMode.LOCAL;
            if (!(jobManagerMode != null ? !((Object)((Object)jobManagerMode)).equals((Object)jobManagerMode2) : jobManagerMode2 != null)) {
                this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting embedded TaskManager for JobManager's LOCAL execution mode");
                ResourceID resourceId = ResourceID.generate();
                ActorRef taskManagerActor = TaskManager$.MODULE$.startTaskManagerComponentsAndActor(configuration, resourceId, jobManagerSystem, highAvailabilityServices, metricRegistry, externalHostname, (Option<String>)new Some((Object)"taskmanager"), true, TaskManager.class);
                this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "Starting TaskManager process reaper");
                boxedUnit = jobManagerSystem.actorOf(Props$.MODULE$.apply(ProcessReaper.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{taskManagerActor, this.LOG().logger(), BoxesRunTime.boxToInteger((int)this.RUNTIME_FAILURE_RETURN_CODE())})), "TaskManager_Process_Reaper");
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            webMonitor.foreach((Function1 & Serializable & scala.Serializable)x$5 -> {
                x$5.start();
                return BoxedUnit.UNIT;
            });
            Option<Class<? extends FlinkResourceManager<?>>> option = resourceManagerClass;
            if (option instanceof Some) {
                Some some = (Some)option;
                Class rmClass = (Class)some.value();
                this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "Starting Resource manager actor");
                none$2 = Option$.MODULE$.apply((Object)FlinkResourceManager.startResourceManagerActors(configuration, jobManagerSystem, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), rmClass));
            } else if (None$.MODULE$.equals(option)) {
                this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Resource Manager class not provided. No resource manager will be started.");
                none$2 = None$.MODULE$;
            } else {
                throw new MatchError(option);
            }
            None$ resourceManager = none$2;
            tuple4 = new Tuple4((Object)jobManager2, (Object)archive2, (Object)webMonitor, (Object)resourceManager);
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> "Error while starting up JobManager", (Function0 & Serializable & scala.Serializable)() -> t);
            jobManagerSystem.terminate().onComplete((Function1 & Serializable & scala.Serializable)x0$5 -> {
                JobManager$.$anonfun$startJobManagerActors$13(x0$5);
                return BoxedUnit.UNIT;
            }, org.apache.flink.runtime.concurrent.Executors.directExecutionContext());
            throw t;
        }
        return tuple4;
    }

    public Tuple4<Configuration, JobManagerMode, String, Iterator<Integer>> parseArgs(String[] args) {
        String string;
        OptionParser<JobManagerCliOptions> parser = new OptionParser<JobManagerCliOptions>(){

            public static final /* synthetic */ JobManagerCliOptions $anonfun$new$4(int arg, JobManagerCliOptions conf) {
                conf.setWebUIPort(arg);
                return conf;
            }
            {
                this.head((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Flink JobManager"}));
                this.opt("configDir", Read$.MODULE$.stringRead()).action((Function2 & Serializable & scala.Serializable)(arg, conf) -> {
                    conf.setConfigDir((String)arg);
                    return conf;
                }).text("The configuration directory.");
                this.opt("executionMode", Read$.MODULE$.stringRead()).action((Function2 & Serializable & scala.Serializable)(arg, conf) -> {
                    conf.setJobManagerMode((String)arg);
                    return conf;
                }).text("The execution mode of the JobManager (CLUSTER / LOCAL)");
                this.opt("host", Read$.MODULE$.stringRead()).optional().action((Function2 & Serializable & scala.Serializable)(arg, conf) -> {
                    conf.setHost((String)arg);
                    return conf;
                }).text("Network address for communication with the job manager");
                this.opt("webui-port", Read$.MODULE$.intRead()).optional().action((Function2 & Serializable & scala.Serializable)(arg, conf) -> anon.1.$anonfun$new$4(BoxesRunTime.unboxToInt((Object)arg), conf)).text("Port for the UI web server");
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$new$1(java.lang.String org.apache.flink.runtime.jobmanager.JobManagerCliOptions ), $anonfun$new$2(java.lang.String org.apache.flink.runtime.jobmanager.JobManagerCliOptions ), $anonfun$new$3(java.lang.String org.apache.flink.runtime.jobmanager.JobManagerCliOptions ), $anonfun$new$4$adapted(java.lang.Object org.apache.flink.runtime.jobmanager.JobManagerCliOptions )}, serializedLambda);
            }
        };
        JobManagerCliOptions cliOptions = (JobManagerCliOptions)parser.parse((Seq)Predef$.MODULE$.wrapRefArray((Object[])args), (Object)new JobManagerCliOptions()).getOrElse(() -> JobManager$.$anonfun$parseArgs$1(args, (OptionParser)parser));
        String configDir = cliOptions.getConfigDir();
        if (configDir == null) {
            throw new Exception("Missing parameter '--configDir'");
        }
        if (cliOptions.getJobManagerMode() == null) {
            throw new Exception("Missing parameter '--executionMode'");
        }
        this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("Loading configuration from ").append(configDir).toString());
        Configuration configuration = GlobalConfiguration.loadConfiguration((String)configDir);
        try {
            FileSystem.initialize((Configuration)configuration);
        }
        catch (IOException e) {
            throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
        }
        if (cliOptions.getWebUIPort() >= 0) {
            configuration.setInteger(WebOptions.PORT, cliOptions.getWebUIPort());
        }
        if (cliOptions.getHost() != null) {
            configuration.setString(JobManagerOptions.ADDRESS, cliOptions.getHost());
        }
        String host = configuration.getString(JobManagerOptions.ADDRESS);
        if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting JobManager with high-availability");
            configuration.setInteger(JobManagerOptions.PORT, 0);
            string = configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE);
        } else {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting JobManager without high-availability");
            int listeningPort = configuration.getInteger(JobManagerOptions.PORT);
            if (listeningPort <= 0 || listeningPort >= 65536) {
                String message = new StringBuilder(78).append("Config parameter '").append(JobManagerOptions.PORT.key()).append("' is invalid, it must be greater than 0 and less than 65536.").toString();
                this.LOG().error((Function0 & Serializable & scala.Serializable)() -> message);
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            }
            string = String.valueOf(listeningPort);
        }
        String portRange = string;
        JobManagerMode executionMode = cliOptions.getJobManagerMode();
        this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Starting JobManager on ").append(host).append(":").append(portRange).append(" with execution mode ").append((Object)executionMode).toString());
        Iterator portRangeIterator = NetUtils.getPortRangeFromString((String)portRange);
        return new Tuple4((Object)configuration, (Object)executionMode, (Object)host, (Object)portRangeIterator);
    }

    public Tuple10<InstanceManager, Scheduler, BlobServer, BlobLibraryCacheManager, RestartStrategyFactory, FiniteDuration, Object, Option<Path>, FiniteDuration, JobManagerMetricGroup> createJobManagerComponents(Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, BlobStore blobStore, MetricRegistry metricRegistry) {
        FiniteDuration finiteDuration;
        Option option;
        FiniteDuration timeout = AkkaUtils$.MODULE$.getTimeout(configuration);
        String classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
        String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns((Configuration)configuration);
        RestartStrategyFactory restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration);
        int archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT);
        String archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
        if (archiveDir != null) {
            try {
                option = Option$.MODULE$.apply((Object)WebMonitorUtils.validateAndNormalizeUri(new Path(archiveDir).toUri()));
            }
            catch (Exception e) {
                this.LOG().warn((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(102).append("Failed to validate specified archive directory in '").append(archiveDir).append("'. ").append("Jobs will not be archived for the HistoryServer.").toString(), (Function0 & Serializable & scala.Serializable)() -> e);
                option = Option$.MODULE$.empty();
            }
        } else {
            this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "No archive directory was configured. Jobs will not be archived.");
            option = Option$.MODULE$.empty();
        }
        Option archivePath = option;
        BlobServer blobServer = null;
        InstanceManager instanceManager = null;
        Scheduler scheduler = null;
        BlobLibraryCacheManager libraryCacheManager = null;
        try {
            blobServer = new BlobServer(configuration, blobStore);
            blobServer.start();
            instanceManager = new InstanceManager();
            scheduler = new Scheduler((Executor)ExecutionContext$.MODULE$.fromExecutor((Executor)futureExecutor));
            libraryCacheManager = new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns);
            instanceManager.addInstanceListener(scheduler);
        }
        catch (Throwable t) {
            if (scheduler != null) {
                scheduler.shutdown();
            }
            if (instanceManager != null) {
                instanceManager.shutdown();
            }
            if (libraryCacheManager != null) {
                libraryCacheManager.shutdown();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            throw t;
        }
        String jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY);
        if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty()) {
            finiteDuration = timeout;
        } else {
            try {
                finiteDuration = FiniteDuration$.MODULE$.apply(Duration$.MODULE$.apply(jobRecoveryTimeoutStr).toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (NumberFormatException n) {
                throw new Exception(new StringBuilder(27).append("Invalid config value for ").append(HighAvailabilityOptions.HA_JOB_DELAY.key()).append(": ").append(new StringBuilder(60).append(jobRecoveryTimeoutStr).append(". Value must be a valid duration (such as '10 s' or '1 min')").toString()).toString());
            }
        }
        FiniteDuration jobRecoveryTimeout = finiteDuration;
        JobManagerMetricGroup jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, configuration.getString(JobManagerOptions.ADDRESS), ConfigurationUtils.getSystemResourceMetricsProbingInterval((Configuration)configuration));
        return new Tuple10((Object)instanceManager, (Object)scheduler, (Object)blobServer, (Object)libraryCacheManager, (Object)restartStrategy, (Object)timeout, (Object)BoxesRunTime.boxToInteger((int)archiveCount), (Object)archivePath, (Object)jobRecoveryTimeout, (Object)jobManagerMetricGroup);
    }

    public Tuple2<ActorRef, ActorRef> startJobManagerActors(Configuration configuration, ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricRegistry, Option<String> optRestAddress, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass) {
        return this.startJobManagerActors(configuration, actorSystem, futureExecutor, ioExecutor, highAvailabilityServices, metricRegistry, optRestAddress, (Option<String>)new Some((Object)"jobmanager"), (Option<String>)new Some((Object)"archive"), jobManagerClass, archiveClass);
    }

    public Tuple2<ActorRef, ActorRef> startJobManagerActors(Configuration configuration, ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricRegistry, Option<String> optRestAddress, Option<String> jobManagerActorName, Option<String> archiveActorName, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass) {
        ActorRef actorRef;
        ActorRef actorRef2;
        Tuple10<InstanceManager, Scheduler, BlobServer, BlobLibraryCacheManager, RestartStrategyFactory, FiniteDuration, Object, Option<Path>, FiniteDuration, JobManagerMetricGroup> tuple10 = this.createJobManagerComponents(configuration, futureExecutor, ioExecutor, highAvailabilityServices.createBlobStore(), metricRegistry);
        if (tuple10 == null) {
            throw new MatchError(tuple10);
        }
        InstanceManager instanceManager = (InstanceManager)tuple10._1();
        Scheduler scheduler = (Scheduler)tuple10._2();
        BlobServer blobServer = (BlobServer)tuple10._3();
        BlobLibraryCacheManager libraryCacheManager = (BlobLibraryCacheManager)tuple10._4();
        RestartStrategyFactory restartStrategy = (RestartStrategyFactory)tuple10._5();
        FiniteDuration timeout = (FiniteDuration)tuple10._6();
        int archiveCount = BoxesRunTime.unboxToInt((Object)tuple10._7());
        Option archivePath = (Option)tuple10._8();
        FiniteDuration jobRecoveryTimeout = (FiniteDuration)tuple10._9();
        JobManagerMetricGroup jobManagerMetricGroup = (JobManagerMetricGroup)tuple10._10();
        Tuple10 tuple102 = new Tuple10((Object)instanceManager, (Object)scheduler, (Object)blobServer, (Object)libraryCacheManager, (Object)restartStrategy, (Object)timeout, (Object)BoxesRunTime.boxToInteger((int)archiveCount), (Object)archivePath, (Object)jobRecoveryTimeout, (Object)jobManagerMetricGroup);
        Tuple10 tuple103 = tuple102;
        InstanceManager instanceManager2 = (InstanceManager)tuple103._1();
        Scheduler scheduler2 = (Scheduler)tuple103._2();
        BlobServer blobServer2 = (BlobServer)tuple103._3();
        BlobLibraryCacheManager libraryCacheManager2 = (BlobLibraryCacheManager)tuple103._4();
        RestartStrategyFactory restartStrategy2 = (RestartStrategyFactory)tuple103._5();
        FiniteDuration timeout2 = (FiniteDuration)tuple103._6();
        int archiveCount2 = BoxesRunTime.unboxToInt((Object)tuple103._7());
        Option archivePath2 = (Option)tuple103._8();
        FiniteDuration jobRecoveryTimeout2 = (FiniteDuration)tuple103._9();
        JobManagerMetricGroup jobManagerMetricGroup2 = (JobManagerMetricGroup)tuple103._10();
        Props archiveProps = this.getArchiveProps(archiveClass, archiveCount2, (Option<Path>)archivePath2);
        Option<String> option = archiveActorName;
        if (option instanceof Some) {
            Some some = (Some)option;
            String actorName = (String)some.value();
            actorRef2 = actorSystem.actorOf(archiveProps, actorName);
        } else if (None$.MODULE$.equals(option)) {
            actorRef2 = actorSystem.actorOf(archiveProps);
        } else {
            throw new MatchError(option);
        }
        ActorRef archive = actorRef2;
        Props jobManagerProps = this.getJobManagerProps(jobManagerClass, configuration, futureExecutor, ioExecutor, instanceManager2, scheduler2, blobServer2, libraryCacheManager2, archive, restartStrategy2, timeout2, highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), jobRecoveryTimeout2, jobManagerMetricGroup2, optRestAddress);
        Option<String> option2 = jobManagerActorName;
        if (option2 instanceof Some) {
            Some some = (Some)option2;
            String actorName = (String)some.value();
            actorRef = actorSystem.actorOf(jobManagerProps, actorName);
        } else if (None$.MODULE$.equals(option2)) {
            actorRef = actorSystem.actorOf(jobManagerProps);
        } else {
            throw new MatchError(option2);
        }
        ActorRef jobManager = actorRef;
        return new Tuple2((Object)jobManager, (Object)archive);
    }

    public Props getArchiveProps(Class<? extends MemoryArchivist> archiveClass, int archiveCount, Option<Path> archivePath) {
        return Props$.MODULE$.apply(archiveClass, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)archiveCount), archivePath}));
    }

    public Props getJobManagerProps(Class<? extends JobManager> jobManagerClass, Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, BlobServer blobServer, LibraryCacheManager libraryCacheManager, ActorRef archive, RestartStrategyFactory restartStrategyFactory, FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphStore, CheckpointRecoveryFactory checkpointRecoveryFactory, FiniteDuration jobRecoveryTimeout, JobManagerMetricGroup jobManagerMetricGroup, Option<String> optRestAddress) {
        return Props$.MODULE$.apply(jobManagerClass, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{configuration, futureExecutor, ioExecutor, instanceManager, scheduler, blobServer, libraryCacheManager, archive, restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, jobManagerMetricGroup, optRestAddress}));
    }

    private final Tuple4 liftedTree1$1(String[] args$1) {
        Tuple4<Configuration, JobManagerMode, String, Iterator<Integer>> tuple4;
        try {
            tuple4 = this.parseArgs(args$1);
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> t.getMessage(), (Function0 & Serializable & scala.Serializable)() -> t);
            t.printStackTrace();
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            tuple4 = null;
        }
        return tuple4;
    }

    private final Tuple4 liftedTree2$1(Configuration configuration$3, JobManagerMode executionMode$3, String listeningAddress$2, ScheduledExecutorService futureExecutor$1, ExecutorService ioExecutor$1, ActorSystem jobManagerSystem$1, HighAvailabilityServices highAvailabilityServices$1, MetricRegistryImpl metricRegistry$1) {
        Tuple4<ActorRef, ActorRef, Option<WebMonitor>, Option<ActorRef>> tuple4;
        try {
            tuple4 = this.startJobManagerActors(jobManagerSystem$1, configuration$3, executionMode$3, listeningAddress$2, futureExecutor$1, ioExecutor$1, highAvailabilityServices$1, metricRegistry$1, JobManager.class, MemoryArchivist.class, Option$.MODULE$.apply(StandaloneResourceManager.class));
        }
        catch (Throwable t) {
            futureExecutor$1.shutdownNow();
            ioExecutor$1.shutdownNow();
            throw t;
        }
        return tuple4;
    }

    public static final /* synthetic */ void $anonfun$runJobManager$1(WebMonitor webMonitor) {
        try {
            webMonitor.stop();
        }
        catch (Throwable t) {
            MODULE$.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not properly stop the web monitor.", (Function0 & Serializable & scala.Serializable)() -> t);
        }
    }

    public static final /* synthetic */ void $anonfun$startJobManagerActors$13(Try x0$5) {
        Try try_ = x0$5;
        if (try_ instanceof Success) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable tt = failure.exception();
            MODULE$.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not cleanly shut down actor system", (Function0 & Serializable & scala.Serializable)() -> tt);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    public static final /* synthetic */ Nothing$ $anonfun$parseArgs$1(String[] args$2, OptionParser parser$1) {
        throw new Exception(new StringBuilder(41).append("Invalid command line arguments: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])args$2)).mkString(" ")).append(". Usage: ").append(parser$1.usage()).toString());
    }

    private JobManager$() {
        MODULE$ = this;
        this.LOG = Logger$.MODULE$.apply(JobManager.class);
        this.STARTUP_FAILURE_RETURN_CODE = 1;
        this.RUNTIME_FAILURE_RETURN_CODE = 2;
    }
}

