package org.apache.flink.client.program;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/client/program/ClusterClient.class */
public abstract class ClusterClient {
    final Optimizer compiler;
    protected final LazyActorSystemLoader actorSystemLoader;
    protected final Configuration flinkConfig;
    protected final FiniteDuration timeout;
    private final FiniteDuration lookupTimeout;
    private JobExecutionResult lastJobExecutionResult;
    private final Logger LOG = LoggerFactory.getLogger(getClass());
    private boolean printStatusDuringExecution = true;
    private boolean detachedJobSubmission = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/client/program/ClusterClient$LazyActorSystemLoader.class */
    public static class LazyActorSystemLoader {
        private final Logger LOG;
        private final Configuration flinkConfig;
        private ActorSystem actorSystem;

        private LazyActorSystemLoader(Configuration configuration, Logger logger) {
            this.flinkConfig = configuration;
            this.LOG = logger;
        }

        public boolean isLoaded() {
            return this.actorSystem != null;
        }

        public void shutdown() {
            if (isLoaded()) {
                this.actorSystem.shutdown();
                this.actorSystem.awaitTermination();
                this.actorSystem = null;
            }
        }

        public ActorSystem get() {
            if (!isLoaded()) {
                this.LOG.info("Starting client actor system.");
                String string = this.flinkConfig.getString("jobmanager.rpc.address", (String) null);
                int integer = this.flinkConfig.getInteger("jobmanager.rpc.port", -1);
                if (string == null || integer == -1) {
                    throw new RuntimeException("The initial JobManager address has not been set correctly.");
                }
                InetSocketAddress inetSocketAddress = new InetSocketAddress(string, integer);
                try {
                    this.actorSystem = AkkaUtils.createActorSystem(this.flinkConfig, new Some(new Tuple2(ConnectionUtils.findConnectingAddress(inetSocketAddress, 2000L, 400L).getCanonicalHostName(), 0)));
                } catch (IOException e) {
                    throw new RuntimeException("Failed to resolve JobManager address at " + inetSocketAddress, e);
                }
            }
            return this.actorSystem;
        }
    }

    public ClusterClient(Configuration configuration) throws IOException {
        this.flinkConfig = (Configuration) Preconditions.checkNotNull(configuration);
        this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
        this.timeout = AkkaUtils.getClientTimeout(configuration);
        this.lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
        this.actorSystemLoader = new LazyActorSystemLoader(configuration, this.LOG);
    }

    public void shutdown() {
        synchronized (this) {
            try {
                finalizeCluster();
                this.actorSystemLoader.shutdown();
            } catch (Throwable th) {
                this.actorSystemLoader.shutdown();
                throw th;
            }
        }
    }

    public void setPrintStatusDuringExecution(boolean z) {
        this.printStatusDuringExecution = z;
    }

    public boolean getPrintStatusDuringExecution() {
        return this.printStatusDuringExecution;
    }

    public InetSocketAddress getJobManagerAddress() {
        try {
            return AkkaUtils.getInetSockeAddressFromAkkaURL(LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.createLeaderRetrievalService(this.flinkConfig), this.timeout).getAddress());
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve JobManager address", e);
        }
    }

    public static String getOptimizedPlanAsJson(Optimizer optimizer, PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(getOptimizedPlan(optimizer, packagedProgram, i));
    }

    public static FlinkPlan getOptimizedPlan(Optimizer optimizer, PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return getOptimizedPlan(optimizer, packagedProgram.getPlanWithJars(), i);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException("Couldn't determine program mode.");
        }
        OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
        if (i > 0) {
            optimizerPlanEnvironment.setParallelism(i);
        }
        return optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer optimizer, Plan plan, int i) throws CompilerException {
        Logger logger = LoggerFactory.getLogger(ClusterClient.class);
        if (i > 0 && plan.getDefaultParallelism() <= 0) {
            logger.debug("Changing plan default parallelism from {} to {}", Integer.valueOf(plan.getDefaultParallelism()), Integer.valueOf(i));
            plan.setDefaultParallelism(i);
        }
        logger.debug("Set parallelism {}, plan default parallelism {}", Integer.valueOf(i), Integer.valueOf(plan.getDefaultParallelism()));
        return optimizer.compile(plan);
    }

    public JobSubmissionResult run(PackagedProgram packagedProgram, int i) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return run(packagedProgram.getPlanWithJars(), i, packagedProgram.getSavepointPath());
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
        }
        this.LOG.info("Starting program in interactive mode");
        ContextEnvironmentFactory contextEnvironmentFactory = new ContextEnvironmentFactory(this, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths(), packagedProgram.getUserCodeClassLoader(), i, isDetached(), packagedProgram.getSavepointPath());
        ContextEnvironment.setAsContext(contextEnvironmentFactory);
        try {
            packagedProgram.invokeInteractiveModeForExecution();
            if (this.lastJobExecutionResult == null && contextEnvironmentFactory.getLastEnvCreated() == null) {
                throw new ProgramInvocationException("The program didn't contain Flink jobs. Perhaps you forgot to call execute() on the execution environment.");
            }
            if (isDetached()) {
                JobSubmissionResult finalizeExecute = ((DetachedEnvironment) contextEnvironmentFactory.getLastEnvCreated()).finalizeExecute();
                ContextEnvironment.unsetContext();
                return finalizeExecute;
            }
            JobExecutionResult jobExecutionResult = this.lastJobExecutionResult;
            ContextEnvironment.unsetContext();
            return jobExecutionResult;
        } catch (Throwable th) {
            ContextEnvironment.unsetContext();
            throw th;
        }
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int i) throws ProgramInvocationException {
        return run(jobWithJars, i, null);
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int i, String str) throws CompilerException, ProgramInvocationException {
        ClassLoader userCodeClassLoader = jobWithJars.getUserCodeClassLoader();
        if (userCodeClassLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        return run(getOptimizedPlan(this.compiler, jobWithJars, i), jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), userCodeClassLoader, str);
    }

    public JobSubmissionResult run(FlinkPlan flinkPlan, List<URL> list, List<URL> list2, ClassLoader classLoader) throws ProgramInvocationException {
        return run(flinkPlan, list, list2, classLoader, null);
    }

    public JobSubmissionResult run(FlinkPlan flinkPlan, List<URL> list, List<URL> list2, ClassLoader classLoader, String str) throws ProgramInvocationException {
        return submitJob(getJobGraph(flinkPlan, list, list2, str), classLoader);
    }

    public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        waitForClusterToBeReady();
        try {
            LeaderRetrievalService createLeaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(this.flinkConfig);
            try {
                logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
                this.lastJobExecutionResult = JobClient.submitJobAndWait(this.actorSystemLoader.get(), createLeaderRetrievalService, jobGraph, this.timeout, this.printStatusDuringExecution, classLoader);
                return this.lastJobExecutionResult;
            } catch (JobExecutionException e) {
                throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
            }
        } catch (Exception e2) {
            throw new ProgramInvocationException("Could not create the leader retrieval service", e2);
        }
    }

    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        waitForClusterToBeReady();
        try {
            ActorGateway jobManagerGateway = getJobManagerGateway();
            try {
                logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
                JobClient.submitJobDetached(jobManagerGateway, jobGraph, this.timeout, classLoader);
                return new JobSubmissionResult(jobGraph.getJobID());
            } catch (JobExecutionException e) {
                throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
            }
        } catch (Exception e2) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e2);
        }
    }

    public void cancel(JobID jobID) throws Exception {
        try {
            Object result = Await.result(getJobManagerGateway().ask(new JobManagerMessages.CancelJob(jobID), this.timeout), this.timeout);
            if (result instanceof JobManagerMessages.CancellationSuccess) {
                this.LOG.info("Job cancellation with ID " + jobID + " succeeded.");
            } else {
                if (!(result instanceof JobManagerMessages.CancellationFailure)) {
                    throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
                }
                Throwable cause = ((JobManagerMessages.CancellationFailure) result).cause();
                this.LOG.info("Job cancellation with ID " + jobID + " failed.", cause);
                throw new Exception("Failed to cancel the job because of \n" + cause.getMessage());
            }
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
        }
    }

    public void stop(JobID jobID) throws Exception {
        try {
            Object result = Await.result(getJobManagerGateway().ask(new JobManagerMessages.StopJob(jobID), this.timeout), this.timeout);
            if (result instanceof JobManagerMessages.StoppingSuccess) {
                this.LOG.info("Job stopping with ID " + jobID + " succeeded.");
            } else {
                if (!(result instanceof JobManagerMessages.StoppingFailure)) {
                    throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
                }
                Throwable cause = ((JobManagerMessages.StoppingFailure) result).cause();
                this.LOG.info("Job stopping with ID " + jobID + " failed.", cause);
                throw new Exception("Failed to stop the job because of \n" + cause.getMessage());
            }
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
        }
    }

    public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
        return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    }

    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader classLoader) throws Exception {
        try {
            Object result = Await.result(getJobManagerGateway().ask(new RequestAccumulatorResults(jobID), this.timeout), this.timeout);
            if (result instanceof AccumulatorResultsFound) {
                return AccumulatorHelper.deserializeAccumulators(((AccumulatorResultsFound) result).result(), classLoader);
            }
            if (result instanceof AccumulatorResultsErroneous) {
                throw ((AccumulatorResultsErroneous) result).cause();
            }
            throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
        } catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }
    }

    public void endSession(JobID jobID) throws Exception {
        if (jobID == null) {
            throw new IllegalArgumentException("The JobID must not be null.");
        }
        endSessions(Collections.singletonList(jobID));
    }

    public void endSessions(List<JobID> list) throws Exception {
        if (list == null) {
            throw new IllegalArgumentException("The JobIDs must not be null");
        }
        ActorGateway jobManagerGateway = getJobManagerGateway();
        for (JobID jobID : list) {
            if (jobID != null) {
                this.LOG.info("Telling job manager to end the session {}.", jobID);
                jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
            }
        }
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer optimizer, JobWithJars jobWithJars, int i) throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(optimizer, jobWithJars.getPlan(), i);
    }

    public JobGraph getJobGraph(PackagedProgram packagedProgram, FlinkPlan flinkPlan) throws ProgramInvocationException {
        return getJobGraph(flinkPlan, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths(), null);
    }

    public JobGraph getJobGraph(PackagedProgram packagedProgram, FlinkPlan flinkPlan, String str) throws ProgramInvocationException {
        return getJobGraph(flinkPlan, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths(), str);
    }

    private JobGraph getJobGraph(FlinkPlan flinkPlan, List<URL> list, List<URL> list2, String str) {
        JobGraph compileJobGraph;
        if (flinkPlan instanceof StreamingPlan) {
            compileJobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
            compileJobGraph.setSavepointPath(str);
        } else {
            compileJobGraph = new JobGraphGenerator(this.flinkConfig).compileJobGraph((OptimizedPlan) flinkPlan);
        }
        Iterator<URL> it = list.iterator();
        while (it.hasNext()) {
            try {
                compileJobGraph.addJar(new Path(it.next().toURI()));
            } catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
        compileJobGraph.setClasspaths(list2);
        return compileJobGraph;
    }

    public ActorGateway getJobManagerGateway() throws Exception {
        this.LOG.debug("Looking up JobManager");
        return LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.createLeaderRetrievalService(this.flinkConfig), this.actorSystemLoader.get(), this.lookupTimeout);
    }

    protected void logAndSysout(String str) {
        this.LOG.info(str);
        if (this.printStatusDuringExecution) {
            System.out.println(str);
        }
    }

    public abstract void waitForClusterToBeReady();

    public abstract String getWebInterfaceURL();

    public abstract GetClusterStatusResponse getClusterStatus();

    protected abstract List<String> getNewMessages();

    public abstract String getClusterIdentifier();

    protected abstract void finalizeCluster();

    public void setDetached(boolean z) {
        this.detachedJobSubmission = z;
    }

    public boolean isDetached() {
        return this.detachedJobSubmission;
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig.clone();
    }

    public abstract int getMaxSlots();

    protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
}
