package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClient.class */
public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration configuration, String str) throws Exception {
        LOG.info("Starting JobClient actor system");
        return BootstrapTools.startActorSystem(configuration, str, 0, LOG);
    }

    public static JobListeningContext submitJob(ActorSystem actorSystem, Configuration configuration, HighAvailabilityServices highAvailabilityServices, JobGraph jobGraph, FiniteDuration finiteDuration, boolean z, ClassLoader classLoader) {
        Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
        Preconditions.checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(finiteDuration, "The timeout must not be null.");
        ActorRef actorOf = actorSystem.actorOf(JobSubmissionClientActor.createActorProps(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), finiteDuration, z, configuration));
        return new JobListeningContext(jobGraph.getJobID(), Patterns.ask(actorOf, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT())), actorOf, finiteDuration, classLoader, highAvailabilityServices);
    }

    public static JobListeningContext attachToRunningJob(JobID jobID, Configuration configuration, ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, FiniteDuration finiteDuration, boolean z) {
        Preconditions.checkNotNull(jobID, "The jobID must not be null.");
        Preconditions.checkNotNull(configuration, "The configuration must not be null.");
        Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
        Preconditions.checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
        Preconditions.checkNotNull(finiteDuration, "The timeout must not be null.");
        ActorRef actorOf = actorSystem.actorOf(JobAttachmentClientActor.createActorProps(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), finiteDuration, z));
        return new JobListeningContext(jobID, Patterns.ask(actorOf, new JobClientMessages.AttachToJobAndWait(jobID), new Timeout(AkkaUtils.INF_TIMEOUT())), actorOf, finiteDuration, actorSystem, configuration, highAvailabilityServices);
    }

    public static ClassLoader retrieveClassLoader(JobID jobID, JobManagerGateway jobManagerGateway, Configuration configuration, HighAvailabilityServices highAvailabilityServices, Time time) throws JobRetrievalException {
        try {
            Optional<JobManagerMessages.ClassloadingProps> optional = jobManagerGateway.requestClassloadingProps(jobID, time).get(time.toMilliseconds(), TimeUnit.MILLISECONDS);
            if (!optional.isPresent()) {
                throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
            }
            JobManagerMessages.ClassloadingProps classloadingProps = optional.get();
            try {
                PermanentBlobCache permanentBlobCache = new PermanentBlobCache(configuration, highAvailabilityServices.createBlobStore(), new InetSocketAddress(jobManagerGateway.getHostname(), classloadingProps.blobManagerPort().intValue()));
                Collection<PermanentBlobKey> requiredJarFiles = classloadingProps.requiredJarFiles();
                Collection<URL> requiredClasspaths = classloadingProps.requiredClasspaths();
                URL[] urlArr = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
                int i = 0;
                for (PermanentBlobKey permanentBlobKey : classloadingProps.requiredJarFiles()) {
                    try {
                        int i2 = i;
                        i++;
                        urlArr[i2] = permanentBlobCache.getFile(jobID, permanentBlobKey).toURI().toURL();
                    } catch (Exception e) {
                        try {
                            permanentBlobCache.close();
                        } catch (IOException e2) {
                            LOG.warn("Could not properly close the BlobClient.", e2);
                        }
                        throw new JobRetrievalException(jobID, "Failed to download BlobKey " + permanentBlobKey, e);
                    }
                }
                Iterator<URL> it = requiredClasspaths.iterator();
                while (it.hasNext()) {
                    int i3 = i;
                    i++;
                    urlArr[i3] = it.next();
                }
                return FlinkUserCodeClassLoaders.parentFirst(urlArr, JobClient.class.getClassLoader());
            } catch (IOException e3) {
                throw new JobRetrievalException(jobID, "Failed to setup BlobCache.", e3);
            }
        } catch (Exception e4) {
            throw new JobRetrievalException(jobID, "Could not retrieve the class loading properties from JobManager.", e4);
        }
    }

    public static JobExecutionResult awaitJobResult(JobListeningContext jobListeningContext) throws JobExecutionException {
        FlinkException jobExecutionException;
        JobID jobID = jobListeningContext.getJobID();
        ActorRef jobClientActor = jobListeningContext.getJobClientActor();
        Future<Object> jobResultFuture = jobListeningContext.getJobResultFuture();
        FiniteDuration timeout = jobListeningContext.getTimeout();
        ClassLoader classLoader = jobListeningContext.getClassLoader();
        while (!jobResultFuture.isCompleted()) {
            try {
                Await.ready(jobResultFuture, timeout);
            } catch (InterruptedException e) {
                throw new JobExecutionException(jobID, "Interrupted while waiting for job completion.");
            } catch (TimeoutException e2) {
                try {
                    Await.result(Patterns.ask(jobClientActor, new Identify(true), Timeout.durationToTimeout(timeout)), timeout);
                } catch (Exception e3) {
                    if (!jobResultFuture.isCompleted()) {
                        throw new JobExecutionException(jobID, "JobClientActor seems to have died before the JobExecutionResult could be retrieved.", e3);
                    }
                }
            }
        }
        try {
            try {
                Object result = Await.result(jobResultFuture, Duration.Zero());
                jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
                if (result instanceof JobManagerMessages.JobResultSuccess) {
                    LOG.info("Job execution complete");
                    SerializedJobExecutionResult result2 = ((JobManagerMessages.JobResultSuccess) result).result();
                    if (result2 == null) {
                        throw new JobExecutionException(jobID, "Job was successfully executed but result contained a null JobExecutionResult.");
                    }
                    try {
                        return result2.toJobExecutionResult(classLoader);
                    } catch (Throwable th) {
                        throw new JobExecutionException(jobID, "Job was successfully executed but JobExecutionResult could not be deserialized.");
                    }
                }
                if (!(result instanceof JobManagerMessages.JobResultFailure)) {
                    if (result instanceof JobManagerMessages.JobNotFound) {
                        throw new JobRetrievalException(((JobManagerMessages.JobNotFound) result).jobID(), "Couldn't retrieve Job " + jobID + " because it was not running.");
                    }
                    throw new JobExecutionException(jobID, "Unknown answer from JobManager after submitting the job: " + result);
                }
                LOG.info("Job execution failed");
                SerializedThrowable cause = ((JobManagerMessages.JobResultFailure) result).cause();
                if (cause == null) {
                    throw new JobExecutionException(jobID, "Job execution failed with null as failure cause.");
                }
                FlinkException deserializeError = cause.deserializeError(classLoader);
                if (deserializeError instanceof JobExecutionException) {
                    throw ((JobExecutionException) deserializeError);
                }
                throw new JobExecutionException(jobID, "Job execution failed", deserializeError);
            } finally {
            }
        } catch (Throwable th2) {
            jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
            throw th2;
        }
    }

    public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, Configuration configuration, HighAvailabilityServices highAvailabilityServices, JobGraph jobGraph, FiniteDuration finiteDuration, boolean z, ClassLoader classLoader) throws JobExecutionException {
        return awaitJobResult(submitJob(actorSystem, configuration, highAvailabilityServices, jobGraph, finiteDuration, z, classLoader));
    }

    public static void submitJobDetached(JobManagerGateway jobManagerGateway, Configuration configuration, JobGraph jobGraph, Time time, ClassLoader classLoader) throws JobExecutionException {
        Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(time, "The timeout must not be null.");
        LOG.info("Checking and uploading JAR files");
        try {
            try {
                jobGraph.uploadUserJars(retrieveBlobServerAddress(jobManagerGateway, time).get(time.toMilliseconds(), TimeUnit.MILLISECONDS), configuration);
                try {
                    jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, time).get(time.toMilliseconds(), TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + time, e);
                } catch (Throwable th) {
                    try {
                        ExceptionUtils.tryDeserializeAndThrow(ExceptionUtils.stripExecutionException(th), classLoader);
                    } catch (JobExecutionException e2) {
                        throw e2;
                    } catch (Throwable th2) {
                        throw new JobExecutionException(jobGraph.getJobID(), "JobSubmission failed.", th2);
                    }
                }
            } catch (IOException e3) {
                throw new JobSubmissionException(jobGraph.getJobID(), "Could not upload the program's JAR files to the JobManager.", e3);
            }
        } catch (Exception e4) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Could not retrieve BlobServer address.", e4);
        }
    }

    public static CompletableFuture<InetSocketAddress> retrieveBlobServerAddress(JobManagerGateway jobManagerGateway, Time time) {
        CompletableFuture<Integer> requestBlobServerPort = jobManagerGateway.requestBlobServerPort(time);
        String hostname = jobManagerGateway.getHostname();
        return requestBlobServerPort.thenApply(num -> {
            return new InetSocketAddress(hostname, num.intValue());
        });
    }
}
