/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Props$;
import com.typesafe.config.Config;
import grizzled.slf4j.Logger;
import grizzled.slf4j.Logger$;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils$;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManager$;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.NetUtils;
import org.jboss.netty.channel.ChannelException;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple4;
import scala.Tuple9;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.FiniteDuration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Failure;
import scala.util.Try;
import scala.util.Try$;
import scopt.OptionParser;
import scopt.Read$;

public final class JobManager$ {
    public static final JobManager$ MODULE$;
    private final Logger LOG;
    private final int STARTUP_FAILURE_RETURN_CODE;
    private final int RUNTIME_FAILURE_RETURN_CODE;

    static {
        new JobManager$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public int STARTUP_FAILURE_RETURN_CODE() {
        return this.STARTUP_FAILURE_RETURN_CODE;
    }

    public int RUNTIME_FAILURE_RETURN_CODE() {
        return this.RUNTIME_FAILURE_RETURN_CODE;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void main(String[] args) {
        Tuple4 tuple4;
        EnvironmentInformation.logEnvironmentInfo(this.LOG().logger(), "JobManager", args);
        SignalHandler.register(this.LOG().logger());
        JvmShutdownSafeguard.installAsShutdownHook(this.LOG().logger());
        Tuple4 tuple42 = this.liftedTree2$1(args);
        if (tuple42 == null) throw new MatchError((Object)tuple42);
        Configuration configuration = (Configuration)tuple42._1();
        JobManagerMode executionMode = (JobManagerMode)((Object)tuple42._2());
        String externalHostName = (String)tuple42._3();
        Iterator portRange = (Iterator)tuple42._4();
        if (configuration == null) throw new MatchError((Object)tuple42);
        Configuration configuration2 = configuration;
        if (executionMode == null) throw new MatchError((Object)tuple42);
        JobManagerMode jobManagerMode = executionMode;
        if (externalHostName == null) throw new MatchError((Object)tuple42);
        String string = externalHostName;
        if (portRange == null) throw new MatchError((Object)tuple42);
        Iterator iterator = portRange;
        Tuple4 tuple43 = tuple4 = new Tuple4((Object)configuration2, (Object)jobManagerMode, (Object)string, (Object)iterator);
        Configuration configuration3 = (Configuration)tuple43._1();
        JobManagerMode executionMode2 = (JobManagerMode)((Object)tuple43._2());
        String externalHostName2 = (String)tuple43._3();
        Iterator portRange2 = (Iterator)tuple43._4();
        if (externalHostName2 == null) {
            String message = "Config parameter 'jobmanager.rpc.address' is missing (hostname/address to bind JobManager to).";
            this.LOG().error((Function0)new Serializable(message){
                public static final long serialVersionUID = 0L;
                private final String message$1;

                public final String apply() {
                    return this.message$1;
                }
                {
                    this.message$1 = message$1;
                }
            });
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
        }
        if (!portRange2.hasNext()) {
            if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration3)) {
                String message = "Config parameter 'high-availability.jobmanager.port' does not specify a valid port range.";
                this.LOG().error((Function0)new Serializable(message){
                    public static final long serialVersionUID = 0L;
                    private final String message$2;

                    public final String apply() {
                        return this.message$2;
                    }
                    {
                        this.message$2 = message$2;
                    }
                });
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            } else {
                String message = new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Config parameter '"})).s((Seq)Nil$.MODULE$)).append((Object)"jobmanager.rpc.address").append((Object)"' does not specify a valid port.").toString();
                this.LOG().error((Function0)new Serializable(message){
                    public static final long serialVersionUID = 0L;
                    private final String message$3;

                    public final String apply() {
                        return this.message$3;
                    }
                    {
                        this.message$3 = message$3;
                    }
                });
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            }
        }
        SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration3));
        try {
            SecurityUtils.getInstalledContext().runSecured(new Callable<BoxedUnit>(configuration3, executionMode2, externalHostName2, portRange2){
                private final Configuration configuration$1;
                private final JobManagerMode executionMode$1;
                private final String externalHostName$1;
                private final Iterator portRange$1;

                public void call() {
                    JobManager$.MODULE$.runJobManager(this.configuration$1, this.executionMode$1, this.externalHostName$1, this.portRange$1);
                }
                {
                    this.configuration$1 = configuration$1;
                    this.executionMode$1 = executionMode$1;
                    this.externalHostName$1 = externalHostName$1;
                    this.portRange$1 = portRange$1;
                }
            });
            return;
        }
        catch (Throwable throwable) {
            this.LOG().error((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Failed to run JobManager.";
                }
            }, (Function0)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$13;

                public final Throwable apply() {
                    return this.t$13;
                }
                {
                    this.t$13 = t$13;
                }
            });
            throwable.printStackTrace();
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, int listeningPort) {
        Option webMonitorOption;
        Option option;
        HighAvailabilityServices highAvailabilityServices;
        int numberProcessors = Hardware.getNumberCPUCores();
        ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(numberProcessors, new ExecutorThreadFactory("jobmanager-future"));
        ExecutorService ioExecutor = Executors.newFixedThreadPool(numberProcessors, new ExecutorThreadFactory("jobmanager-io"));
        FiniteDuration timeout = AkkaUtils$.MODULE$.getTimeout(configuration);
        ActorSystem jobManagerSystem = this.startActorSystem(configuration, listeningAddress, listeningPort);
        Tuple4 tuple4 = this.liftedTree3$1(configuration, executionMode, listeningAddress, futureExecutor, ioExecutor, jobManagerSystem, highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, ioExecutor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION));
        if (tuple4 == null) throw new MatchError((Object)tuple4);
        Option webMonitorOption2 = option = (webMonitorOption = (Option)tuple4._3());
        jobManagerSystem.awaitTermination();
        webMonitorOption2.foreach((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(WebMonitor webMonitor) {
                try {
                    webMonitor.stop();
                }
                catch (Throwable throwable) {
                    JobManager$.MODULE$.LOG().warn((Function0)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return "Could not properly stop the web monitor.";
                        }
                    }, (Function0)new Serializable(this, throwable){
                        public static final long serialVersionUID = 0L;
                        private final Throwable t$14;

                        public final Throwable apply() {
                            return this.t$14;
                        }
                        {
                            this.t$14 = t$14;
                        }
                    });
                }
            }
        });
        try {
            highAvailabilityServices.close();
        }
        catch (Throwable throwable) {
            this.LOG().warn((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Could not properly stop the high availability services.";
                }
            }, (Function0)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$15;

                public final Throwable apply() {
                    return this.t$15;
                }
                {
                    this.t$15 = t$15;
                }
            });
        }
        org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(timeout.toMillis(), TimeUnit.MILLISECONDS, futureExecutor, ioExecutor);
    }

    public void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, Iterator<Integer> listeningPortRange) {
        Try result = this.retryOnBindException((Function0)new Serializable(configuration, executionMode, listeningAddress, listeningPortRange){
            public static final long serialVersionUID = 0L;
            private final Configuration configuration$2;
            private final JobManagerMode executionMode$2;
            private final String listeningAddress$1;
            private final Iterator listeningPortRange$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            /*
             * WARNING - void declaration
             */
            public void apply$mcV$sp() {
                int n;
                ServerSocket socket = NetUtils.createSocketFromPorts((Iterator)this.listeningPortRange$1, (NetUtils.SocketFactory)new NetUtils.SocketFactory(this){

                    public ServerSocket createSocket(int port) {
                        return new ServerSocket(port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress()));
                    }
                });
                if (socket == null) {
                    throw new BindException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to allocate port for JobManager."})).s((Seq)Nil$.MODULE$));
                }
                try {
                    n = socket.getLocalPort();
                }
                catch (Throwable throwable) {
                    void var1_1;
                    var1_1.close();
                    throw throwable;
                }
                socket.close();
                int port = n;
                JobManager$.MODULE$.runJobManager(this.configuration$2, this.executionMode$2, this.listeningAddress$1, port);
            }
            {
                this.configuration$2 = configuration$2;
                this.executionMode$2 = executionMode$2;
                this.listeningAddress$1 = listeningAddress$1;
                this.listeningPortRange$1 = listeningPortRange$1;
            }
        }, (Function0<Object>)new Serializable(listeningPortRange){
            public static final long serialVersionUID = 0L;
            private final Iterator listeningPortRange$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return !this.listeningPortRange$1.hasNext();
            }
            {
                this.listeningPortRange$1 = listeningPortRange$1;
            }
        }, 5000L);
        Try try_ = result;
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable f = failure.exception();
            throw f;
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public <T> Try<T> retryOnBindException(Function0<T> fn, Function0<Object> stopCond, long maxSleepBetweenRetries) {
        Try try_;
        block5: {
            Try try_2;
            block6: {
                Failure failure;
                block7: {
                    Exception exception;
                    while (true) {
                        Throwable x;
                        boolean bl = false;
                        Failure failure2 = null;
                        try_2 = Try$.MODULE$.apply(fn);
                        if (try_2 instanceof Failure) {
                            bl = true;
                            failure2 = (Failure)try_2;
                            Throwable x2 = failure2.exception();
                            if (x2 instanceof BindException) {
                                if (stopCond.apply$mcZ$sp()) {
                                    try_ = new Failure((Throwable)new RuntimeException("Unable to do further retries starting the actor system"));
                                    break block5;
                                }
                                this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                                maxSleepBetweenRetries = this.retryOnBindException$default$3();
                                continue;
                            }
                        }
                        if (!bl || !((x = failure2.exception()) instanceof Exception)) break block6;
                        exception = (Exception)x;
                        Throwable throwable = exception.getCause();
                        if (!(throwable instanceof ChannelException)) break;
                        if (stopCond.apply$mcZ$sp()) {
                            failure = new Failure((Throwable)new RuntimeException("Unable to do further retries starting the actor system"));
                            break block7;
                        }
                        this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                        maxSleepBetweenRetries = this.retryOnBindException$default$3();
                    }
                    failure = new Failure((Throwable)exception);
                }
                try_ = failure;
                break block5;
            }
            try_ = try_2;
        }
        return try_;
    }

    public <T> long retryOnBindException$default$3() {
        return 0L;
    }

    /*
     * WARNING - void declaration
     */
    public ActorSystem startActorSystem(Configuration configuration, String externalHostname, int port) {
        Address address;
        ActorSystem jobManagerSystem;
        String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString((String)externalHostname, (int)port);
        this.LOG().info((Function0)new Serializable(hostPort){
            public static final long serialVersionUID = 0L;
            private final String hostPort$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting JobManager actor system reachable at ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.hostPort$1}));
            }
            {
                this.hostPort$1 = hostPort$1;
            }
        });
        try {
            Config akkaConfig = AkkaUtils$.MODULE$.getAkkaConfig(configuration, (Option<scala.Tuple2<String, Object>>)new Some((Object)new scala.Tuple2((Object)externalHostname, (Object)BoxesRunTime.boxToInteger((int)port))));
            if (this.LOG().isDebugEnabled()) {
                this.LOG().debug((Function0)new Serializable(akkaConfig){
                    public static final long serialVersionUID = 0L;
                    private final Config akkaConfig$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)"Using akka configuration\n ").append((Object)this.akkaConfig$1).toString();
                    }
                    {
                        this.akkaConfig$1 = akkaConfig$1;
                    }
                });
            }
            jobManagerSystem = AkkaUtils$.MODULE$.createActorSystem(akkaConfig);
            address = AkkaUtils$.MODULE$.getAddress(jobManagerSystem);
        }
        catch (Throwable throwable) {
            Throwable cause;
            if (throwable instanceof ChannelException && (cause = throwable.getCause()) != null && throwable.getCause() instanceof BindException) {
                void var4_4;
                throw new Exception(new StringBuilder().append((Object)"Unable to create JobManager at address ").append((Object)var4_4).append((Object)" - ").append((Object)cause.getMessage()).toString(), throwable);
            }
            throw new Exception("Could not create JobManager actor system", throwable);
        }
        configuration.setString("jobmanager.rpc.address", (String)address.host().get());
        configuration.setInteger("jobmanager.rpc.port", BoxesRunTime.unboxToInt((Object)address.port().get()));
        return jobManagerSystem;
    }

    public Tuple4<ActorRef, ActorRef, Option<WebMonitor>, Option<ActorRef>> startJobManagerActors(ActorSystem jobManagerSystem, Configuration configuration, JobManagerMode executionMode, String externalHostname, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass, Option<Class<? extends FlinkResourceManager<?>>> resourceManagerClass) {
        None$ none$;
        if (configuration.getInteger("jobmanager.web.port", 0) >= 0) {
            this.LOG().info((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Starting JobManager web frontend";
                }
            });
            WebMonitor webServer = WebMonitorUtils.startWebRuntimeMonitor(configuration, highAvailabilityServices, jobManagerSystem);
            none$ = Option$.MODULE$.apply((Object)webServer);
        } else {
            none$ = None$.MODULE$;
        }
        None$ webMonitor = none$;
        webMonitor.foreach((Function1)new Serializable(configuration){
            public static final long serialVersionUID = 0L;
            private final Configuration configuration$4;

            public final void apply(WebMonitor monitor) {
                this.configuration$4.setInteger("jobmanager.web.port", monitor.getServerPort());
            }
            {
                this.configuration$4 = configuration$4;
            }
        });
        try {
            scala.Tuple2<ActorRef, ActorRef> tuple2;
            block11: {
                Option<Class<? extends FlinkResourceManager<?>>> option;
                block14: {
                    None$ none$2;
                    ActorRef archive;
                    ActorRef jobManager;
                    block13: {
                        block12: {
                            BoxedUnit boxedUnit;
                            scala.Tuple2 tuple22;
                            this.LOG().info((Function0)new Serializable(){
                                public static final long serialVersionUID = 0L;

                                public final String apply() {
                                    return "Starting JobManager actor";
                                }
                            });
                            tuple2 = this.startJobManagerActors(configuration, jobManagerSystem, futureExecutor, ioExecutor, highAvailabilityServices, jobManagerClass, archiveClass);
                            if (tuple2 == null) break block11;
                            ActorRef jobManager2 = (ActorRef)tuple2._1();
                            ActorRef archive2 = (ActorRef)tuple2._2();
                            scala.Tuple2 tuple23 = tuple22 = new scala.Tuple2((Object)jobManager2, (Object)archive2);
                            jobManager = (ActorRef)tuple23._1();
                            archive = (ActorRef)tuple23._2();
                            this.LOG().debug((Function0)new Serializable(){
                                public static final long serialVersionUID = 0L;

                                public final String apply() {
                                    return "Starting JobManager process reaper";
                                }
                            });
                            jobManagerSystem.actorOf(Props$.MODULE$.apply(ProcessReaper.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobManager, this.LOG().logger(), BoxesRunTime.boxToInteger((int)this.RUNTIME_FAILURE_RETURN_CODE())})), "JobManager_Process_Reaper");
                            JobManagerMode jobManagerMode = executionMode;
                            JobManagerMode jobManagerMode2 = JobManagerMode.LOCAL;
                            if (!(jobManagerMode != null ? !((Object)((Object)jobManagerMode)).equals((Object)jobManagerMode2) : jobManagerMode2 != null)) {
                                this.LOG().info((Function0)new Serializable(){
                                    public static final long serialVersionUID = 0L;

                                    public final String apply() {
                                        return "Starting embedded TaskManager for JobManager's LOCAL execution mode";
                                    }
                                });
                                ActorRef taskManagerActor = TaskManager$.MODULE$.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), jobManagerSystem, highAvailabilityServices, externalHostname, (Option<String>)new Some((Object)TaskExecutor.TASK_MANAGER_NAME), true, TaskManager.class);
                                this.LOG().debug((Function0)new Serializable(){
                                    public static final long serialVersionUID = 0L;

                                    public final String apply() {
                                        return "Starting TaskManager process reaper";
                                    }
                                });
                                boxedUnit = jobManagerSystem.actorOf(Props$.MODULE$.apply(ProcessReaper.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{taskManagerActor, this.LOG().logger(), BoxesRunTime.boxToInteger((int)this.RUNTIME_FAILURE_RETURN_CODE())})), "TaskManager_Process_Reaper");
                            } else {
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            webMonitor.foreach((Function1)new Serializable(configuration){
                                public static final long serialVersionUID = 0L;
                                private final Configuration configuration$4;

                                public final void apply(WebMonitor monitor) {
                                    Tuple2<String, Integer> hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(this.configuration$4);
                                    String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl((String)hostnamePort.f0, Predef$.MODULE$.Integer2int((Integer)hostnamePort.f1), JobMaster.JOB_MANAGER_NAME, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION, this.configuration$4);
                                    monitor.start(jobManagerAkkaUrl);
                                }
                                {
                                    this.configuration$4 = configuration$4;
                                }
                            });
                            option = resourceManagerClass;
                            if (!(option instanceof Some)) break block12;
                            Some some = (Some)option;
                            Class rmClass = (Class)some.x();
                            this.LOG().debug((Function0)new Serializable(){
                                public static final long serialVersionUID = 0L;

                                public final String apply() {
                                    return "Starting Resource manager actor";
                                }
                            });
                            none$2 = Option$.MODULE$.apply((Object)FlinkResourceManager.startResourceManagerActors(configuration, jobManagerSystem, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), rmClass));
                            break block13;
                        }
                        None$ none$3 = None$.MODULE$;
                        Option<Class<? extends FlinkResourceManager<?>>> option2 = option;
                        if (none$3 != null ? !none$3.equals(option2) : option2 != null) break block14;
                        this.LOG().info((Function0)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return "Resource Manager class not provided. No resource manager will be started.";
                            }
                        });
                        none$2 = None$.MODULE$;
                    }
                    None$ resourceManager = none$2;
                    return new Tuple4((Object)jobManager, (Object)archive, (Object)webMonitor, (Object)resourceManager);
                }
                throw new MatchError(option);
            }
            throw new MatchError(tuple2);
        }
        catch (Throwable throwable) {
            this.LOG().error((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Error while starting up JobManager";
                }
            }, (Function0)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$16;

                public final Throwable apply() {
                    return this.t$16;
                }
                {
                    this.t$16 = t$16;
                }
            });
            try {
                jobManagerSystem.shutdown();
            }
            catch (Throwable throwable2) {
                this.LOG().warn((Function0)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Could not cleanly shut down actor system";
                    }
                }, (Function0)new Serializable(throwable2){
                    public static final long serialVersionUID = 0L;
                    private final Throwable tt$2;

                    public final Throwable apply() {
                        return this.tt$2;
                    }
                    {
                        this.tt$2 = tt$2;
                    }
                });
            }
            throw throwable;
        }
    }

    public Tuple4<Configuration, JobManagerMode, String, Iterator<Integer>> parseArgs(String[] args) {
        String string;
        Configuration configuration;
        JobManagerCliOptions cliOptions;
        block8: {
            OptionParser<JobManagerCliOptions> parser = new OptionParser<JobManagerCliOptions>(){
                {
                    this.head((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Flink JobManager"}));
                    this.opt("configDir", Read$.MODULE$.stringRead()).action((Function2)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final JobManagerCliOptions apply(String arg, JobManagerCliOptions conf) {
                            conf.setConfigDir(arg);
                            return conf;
                        }
                    }).text("The configuration directory.");
                    this.opt("executionMode", Read$.MODULE$.stringRead()).action((Function2)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final JobManagerCliOptions apply(String arg, JobManagerCliOptions conf) {
                            conf.setJobManagerMode(arg);
                            return conf;
                        }
                    }).text("The execution mode of the JobManager (CLUSTER / LOCAL)");
                    this.opt("host", Read$.MODULE$.stringRead()).optional().action((Function2)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final JobManagerCliOptions apply(String arg, JobManagerCliOptions conf) {
                            conf.setHost(arg);
                            return conf;
                        }
                    }).text("Network address for communication with the job manager");
                    this.opt("webui-port", Read$.MODULE$.intRead()).optional().action((Function2)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final JobManagerCliOptions apply(int arg, JobManagerCliOptions conf) {
                            conf.setWebUIPort(arg);
                            return conf;
                        }
                    }).text("Port for the UI web server");
                }
            };
            cliOptions = (JobManagerCliOptions)parser.parse((Seq)Predef$.MODULE$.wrapRefArray((Object[])args), (Object)new JobManagerCliOptions()).getOrElse((Function0)new Serializable(args, (OptionParser)parser){
                public static final long serialVersionUID = 0L;
                private final String[] args$2;
                private final OptionParser parser$1;

                public final Nothing$ apply() {
                    throw new Exception(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Invalid command line arguments: ", ". Usage: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.refArrayOps((Object[])this.args$2).mkString(" "), this.parser$1.usage()})));
                }
                {
                    this.args$2 = args$2;
                    this.parser$1 = parser$1;
                }
            });
            String configDir = cliOptions.getConfigDir();
            if (configDir == null) {
                throw new Exception("Missing parameter '--configDir'");
            }
            if (cliOptions.getJobManagerMode() == null) {
                throw new Exception("Missing parameter '--executionMode'");
            }
            this.LOG().info((Function0)new Serializable(configDir){
                public static final long serialVersionUID = 0L;
                private final String configDir$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Loading configuration from ").append((Object)this.configDir$1).toString();
                }
                {
                    this.configDir$1 = configDir$1;
                }
            });
            configuration = GlobalConfiguration.loadConfiguration((String)configDir);
            try {
                FileSystem.setDefaultScheme((Configuration)configuration);
                if (cliOptions.getWebUIPort() < 0) break block8;
            }
            catch (IOException iOException) {
                throw new Exception("Error while setting the default filesystem scheme from configuration.", iOException);
            }
            configuration.setInteger("jobmanager.web.port", cliOptions.getWebUIPort());
        }
        if (cliOptions.getHost() != null) {
            configuration.setString("jobmanager.rpc.address", cliOptions.getHost());
        }
        String host = configuration.getString("jobmanager.rpc.address", null);
        if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
            this.LOG().info((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Starting JobManager with high-availability";
                }
            });
            configuration.setInteger("jobmanager.rpc.port", 0);
            string = configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE);
        } else {
            this.LOG().info((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Starting JobManager without high-availability";
                }
            });
            int listeningPort = configuration.getInteger("jobmanager.rpc.port", 6123);
            if (listeningPort <= 0 || listeningPort >= 65536) {
                String message = "Config parameter 'jobmanager.rpc.port' is invalid, it must be greater than 0 and less than 65536.";
                this.LOG().error((Function0)new Serializable(message){
                    public static final long serialVersionUID = 0L;
                    private final String message$4;

                    public final String apply() {
                        return this.message$4;
                    }
                    {
                        this.message$4 = message$4;
                    }
                });
                System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            }
            string = String.valueOf(listeningPort);
        }
        String portRange = string;
        JobManagerMode executionMode = cliOptions.getJobManagerMode();
        this.LOG().info((Function0)new Serializable(host, portRange, executionMode){
            public static final long serialVersionUID = 0L;
            private final String host$1;
            private final String portRange$2;
            private final JobManagerMode executionMode$4;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting JobManager on ", ":", " with execution mode ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.host$1, this.portRange$2, this.executionMode$4}));
            }
            {
                this.host$1 = host$1;
                this.portRange$2 = portRange$2;
                this.executionMode$4 = executionMode$4;
            }
        });
        Iterator portRangeIterator = NetUtils.getPortRangeFromString((String)portRange);
        return new Tuple4((Object)configuration, (Object)executionMode, (Object)host, (Object)portRangeIterator);
    }

    /*
     * Loose catch block
     */
    public Tuple9<InstanceManager, Scheduler, BlobLibraryCacheManager, RestartStrategyFactory, FiniteDuration, Object, Option<Path>, FiniteDuration, Option<MetricRegistry>> createJobManagerComponents(Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, BlobStore blobStore) {
        FiniteDuration finiteDuration;
        Option option;
        FiniteDuration timeout = AkkaUtils$.MODULE$.getTimeout(configuration);
        long cleanupInterval = configuration.getLong("library-cache-manager.cleanup.interval", 3600L) * 1000L;
        RestartStrategyFactory restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration);
        int archiveCount = configuration.getInteger("jobmanager.web.history", 5);
        String archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
        if (archiveDir == null) {
            this.LOG().debug((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "No archive directory was configured. Jobs will not be archived.";
                }
            });
            option = Option$.MODULE$.empty();
        } else {
            try {
                option = Option$.MODULE$.apply((Object)WebMonitorUtils.validateAndNormalizeUri(new Path(archiveDir).toUri()));
            }
            catch (Exception exception) {
                this.LOG().warn((Function0)new Serializable(archiveDir){
                    public static final long serialVersionUID = 0L;
                    private final String archiveDir$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to validate specified archive directory in '", "'. "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.archiveDir$1}))).append((Object)"Jobs will not be archived for the HistoryServer.").toString();
                    }
                    {
                        this.archiveDir$1 = archiveDir$1;
                    }
                }, (Function0)new Serializable(exception){
                    public static final long serialVersionUID = 0L;
                    private final Exception e$10;

                    public final Exception apply() {
                        return this.e$10;
                    }
                    {
                        this.e$10 = e$10;
                    }
                });
                option = Option$.MODULE$.empty();
            }
        }
        Option archivePath = option;
        BlobServer blobServer = null;
        InstanceManager instanceManager = null;
        Scheduler scheduler = null;
        BlobLibraryCacheManager libraryCacheManager = null;
        try {
            blobServer = new BlobServer(configuration, blobStore);
            instanceManager = new InstanceManager();
            scheduler = new Scheduler((Executor)ExecutionContext$.MODULE$.fromExecutor((Executor)futureExecutor));
            libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval);
            instanceManager.addInstanceListener(scheduler);
        }
        catch (Throwable throwable) {
            if (libraryCacheManager != null) {
                libraryCacheManager.shutdown();
            }
            if (scheduler != null) {
                scheduler.shutdown();
            }
            if (instanceManager != null) {
                instanceManager.shutdown();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            throw throwable;
        }
        String jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY);
        if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty()) {
            finiteDuration = timeout;
        } else {
            finiteDuration = FiniteDuration$.MODULE$.apply(Duration$.MODULE$.apply(jobRecoveryTimeoutStr).toMillis(), TimeUnit.MILLISECONDS);
        }
        FiniteDuration jobRecoveryTimeout = finiteDuration;
        None$ none$ = Option$.MODULE$.apply((Object)new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)));
        catch (NumberFormatException numberFormatException) {
            throw new Exception(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Invalid config value for ", ": "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{HighAvailabilityOptions.HA_JOB_DELAY.key()}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ". Value must be a valid duration (such as '10 s' or '1 min')"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobRecoveryTimeoutStr}))).toString());
        }
        catch (Exception exception) {
            none$ = None$.MODULE$;
        }
        None$ metricRegistry = none$;
        return new Tuple9((Object)instanceManager, (Object)scheduler, (Object)libraryCacheManager, (Object)restartStrategy, (Object)timeout, (Object)BoxesRunTime.boxToInteger((int)archiveCount), (Object)archivePath, (Object)jobRecoveryTimeout, (Object)metricRegistry);
    }

    public scala.Tuple2<ActorRef, ActorRef> startJobManagerActors(Configuration configuration, ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass) {
        return this.startJobManagerActors(configuration, actorSystem, futureExecutor, ioExecutor, highAvailabilityServices, (Option<String>)new Some((Object)JobMaster.JOB_MANAGER_NAME), (Option<String>)new Some((Object)JobMaster.ARCHIVE_NAME), jobManagerClass, archiveClass);
    }

    public scala.Tuple2<ActorRef, ActorRef> startJobManagerActors(Configuration configuration, ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, Option<String> jobManagerActorName, Option<String> archiveActorName, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass) {
        Tuple9<InstanceManager, Scheduler, BlobLibraryCacheManager, RestartStrategyFactory, FiniteDuration, Object, Option<Path>, FiniteDuration, Option<MetricRegistry>> tuple9;
        block2: {
            Option<String> option;
            block5: {
                Option<String> option2;
                block8: {
                    Option option3;
                    block11: {
                        ActorRef jobManager;
                        ActorRef archive;
                        block10: {
                            block9: {
                                ActorRef actorRef;
                                Option metricsRegistry;
                                block7: {
                                    Props jobManagerProps;
                                    block6: {
                                        ActorRef actorRef2;
                                        FiniteDuration jobRecoveryTimeout;
                                        FiniteDuration timeout;
                                        RestartStrategyFactory restartStrategy;
                                        BlobLibraryCacheManager libraryCacheManager;
                                        Scheduler scheduler;
                                        InstanceManager instanceManager;
                                        block4: {
                                            Props archiveProps;
                                            block3: {
                                                Tuple9 tuple92;
                                                tuple9 = this.createJobManagerComponents(configuration, futureExecutor, ioExecutor, highAvailabilityServices.createBlobStore());
                                                if (tuple9 == null) break block2;
                                                InstanceManager instanceManager2 = (InstanceManager)tuple9._1();
                                                Scheduler scheduler2 = (Scheduler)tuple9._2();
                                                BlobLibraryCacheManager libraryCacheManager2 = (BlobLibraryCacheManager)tuple9._3();
                                                RestartStrategyFactory restartStrategy2 = (RestartStrategyFactory)tuple9._4();
                                                FiniteDuration timeout2 = (FiniteDuration)tuple9._5();
                                                int archiveCount = BoxesRunTime.unboxToInt((Object)tuple9._6());
                                                Option archivePath = (Option)tuple9._7();
                                                FiniteDuration jobRecoveryTimeout2 = (FiniteDuration)tuple9._8();
                                                Option metricsRegistry2 = (Option)tuple9._9();
                                                Tuple9 tuple93 = tuple92 = new Tuple9((Object)instanceManager2, (Object)scheduler2, (Object)libraryCacheManager2, (Object)restartStrategy2, (Object)timeout2, (Object)BoxesRunTime.boxToInteger((int)archiveCount), (Object)archivePath, (Object)jobRecoveryTimeout2, (Object)metricsRegistry2);
                                                instanceManager = (InstanceManager)tuple93._1();
                                                scheduler = (Scheduler)tuple93._2();
                                                libraryCacheManager = (BlobLibraryCacheManager)tuple93._3();
                                                restartStrategy = (RestartStrategyFactory)tuple93._4();
                                                timeout = (FiniteDuration)tuple93._5();
                                                int archiveCount2 = BoxesRunTime.unboxToInt((Object)tuple93._6());
                                                Option archivePath2 = (Option)tuple93._7();
                                                jobRecoveryTimeout = (FiniteDuration)tuple93._8();
                                                metricsRegistry = (Option)tuple93._9();
                                                archiveProps = this.getArchiveProps(archiveClass, archiveCount2, (Option<Path>)archivePath2);
                                                option = archiveActorName;
                                                if (!(option instanceof Some)) break block3;
                                                Some some = (Some)option;
                                                String actorName = (String)some.x();
                                                actorRef2 = actorSystem.actorOf(archiveProps, actorName);
                                                break block4;
                                            }
                                            None$ none$ = None$.MODULE$;
                                            Option<String> option4 = option;
                                            if (none$ != null ? !none$.equals(option4) : option4 != null) break block5;
                                            actorRef2 = actorSystem.actorOf(archiveProps);
                                        }
                                        archive = actorRef2;
                                        jobManagerProps = this.getJobManagerProps(jobManagerClass, configuration, futureExecutor, ioExecutor, instanceManager, scheduler, libraryCacheManager, archive, restartStrategy, timeout, highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), jobRecoveryTimeout, (Option<MetricRegistry>)metricsRegistry);
                                        option2 = jobManagerActorName;
                                        if (!(option2 instanceof Some)) break block6;
                                        Some some = (Some)option2;
                                        String actorName = (String)some.x();
                                        actorRef = actorSystem.actorOf(jobManagerProps, actorName);
                                        break block7;
                                    }
                                    None$ none$ = None$.MODULE$;
                                    Option<String> option5 = option2;
                                    if (none$ != null ? !none$.equals(option5) : option5 != null) break block8;
                                    actorRef = actorSystem.actorOf(jobManagerProps);
                                }
                                jobManager = actorRef;
                                option3 = metricsRegistry;
                                if (!(option3 instanceof Some)) break block9;
                                Some some = (Some)option3;
                                MetricRegistry registry = (MetricRegistry)some.x();
                                registry.startQueryService(actorSystem, null);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block10;
                            }
                            None$ none$ = None$.MODULE$;
                            Option option6 = option3;
                            if (none$ != null ? !none$.equals(option6) : option6 != null) break block11;
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        return new scala.Tuple2((Object)jobManager, (Object)archive);
                    }
                    throw new MatchError((Object)option3);
                }
                throw new MatchError(option2);
            }
            throw new MatchError(option);
        }
        throw new MatchError(tuple9);
    }

    public Props getArchiveProps(Class<? extends MemoryArchivist> archiveClass, int archiveCount, Option<Path> archivePath) {
        return Props$.MODULE$.apply(archiveClass, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)archiveCount), archivePath}));
    }

    public Props getJobManagerProps(Class<? extends JobManager> jobManagerClass, Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, LibraryCacheManager libraryCacheManager, ActorRef archive, RestartStrategyFactory restartStrategyFactory, FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphStore, CheckpointRecoveryFactory checkpointRecoveryFactory, FiniteDuration jobRecoveryTimeout, Option<MetricRegistry> metricsRegistry) {
        return Props$.MODULE$.apply(jobManagerClass, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{configuration, futureExecutor, ioExecutor, instanceManager, scheduler, libraryCacheManager, archive, restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, metricsRegistry}));
    }

    private final Tuple4 liftedTree2$1(String[] args$1) {
        Tuple4<Configuration, JobManagerMode, String, Iterator<Integer>> tuple4;
        try {
            tuple4 = this.parseArgs(args$1);
        }
        catch (Throwable throwable) {
            this.LOG().error((Function0)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$12;

                public final String apply() {
                    return this.t$12.getMessage();
                }
                {
                    this.t$12 = t$12;
                }
            }, (Function0)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$12;

                public final Throwable apply() {
                    return this.t$12;
                }
                {
                    this.t$12 = t$12;
                }
            });
            throwable.printStackTrace();
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            tuple4 = null;
        }
        return tuple4;
    }

    private final Tuple4 liftedTree3$1(Configuration configuration$3, JobManagerMode executionMode$3, String listeningAddress$2, ScheduledExecutorService futureExecutor$1, ExecutorService ioExecutor$1, ActorSystem jobManagerSystem$1, HighAvailabilityServices highAvailabilityServices$1) {
        try {
            return this.startJobManagerActors(jobManagerSystem$1, configuration$3, executionMode$3, listeningAddress$2, futureExecutor$1, ioExecutor$1, highAvailabilityServices$1, JobManager.class, MemoryArchivist.class, Option$.MODULE$.apply(StandaloneResourceManager.class));
        }
        catch (Throwable throwable) {
            futureExecutor$1.shutdownNow();
            ioExecutor$1.shutdownNow();
            throw throwable;
        }
    }

    private final void sleepBeforeRetry$1(long maxSleepBetweenRetries$1) {
        if (maxSleepBetweenRetries$1 > 0L) {
            long sleepTime = (long)(Math.random() * (double)maxSleepBetweenRetries$1);
            this.LOG().info((Function0)new Serializable(sleepTime){
                public static final long serialVersionUID = 0L;
                private final long sleepTime$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Retrying after bind exception. Sleeping for ", " ms."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.sleepTime$1)}));
                }
                {
                    this.sleepTime$1 = sleepTime$1;
                }
            });
            Thread.sleep(sleepTime);
        }
    }

    private JobManager$() {
        MODULE$ = this;
        this.LOG = Logger$.MODULE$.apply(JobManager.class);
        this.STARTUP_FAILURE_RETURN_CODE = 1;
        this.RUNTIME_FAILURE_RETURN_CODE = 2;
    }
}

