/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.client.JobClientActor;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobTimeoutException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException {
        LOG.info("Starting JobClient actor system");
        Some remoting = new Some((Object)new Tuple2((Object)"", (Object)0));
        ActorSystem system = AkkaUtils.createActorSystem(config, (Option<Tuple2<String, Object>>)remoting);
        Address address = system.provider().getDefaultAddress();
        String hostAddress = address.host().isDefined() ? NetUtils.ipAddressToUrlString((InetAddress)InetAddress.getByName((String)address.host().get())) : "(unknown)";
        int port = address.port().isDefined() ? (Integer)address.port().get() : -1;
        LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
        return system;
    }

    public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) throws JobExecutionException {
        Object answer;
        Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
        Preconditions.checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(timeout, "The timeout must not be null.");
        Props jobClientActorProps = JobClientActor.createJobClientActorProps(leaderRetrievalService, timeout, sysoutLogUpdates);
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
        try {
            Future future = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(jobGraph), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
            answer = Await.result((Awaitable)future, (Duration)AkkaUtils.INF_TIMEOUT());
        }
        catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
        }
        catch (Throwable throwable) {
            throw new JobExecutionException(jobGraph.getJobID(), "Communication with JobManager failed: " + throwable.getMessage(), throwable);
        }
        finally {
            jobClientActor.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        }
        if (answer instanceof JobManagerMessages.JobResultSuccess) {
            LOG.info("Job execution complete");
            SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess)answer).result();
            if (result != null) {
                try {
                    return result.toJobExecutionResult(classLoader);
                }
                catch (Throwable t) {
                    throw new JobExecutionException(jobGraph.getJobID(), "Job was successfully executed but JobExecutionResult could not be deserialized.");
                }
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Job was successfully executed but result contained a null JobExecutionResult.");
        }
        if (answer instanceof JobManagerMessages.JobResultFailure) {
            LOG.info("Job execution failed");
            SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure)answer).cause();
            if (serThrowable != null) {
                Throwable cause = serThrowable.deserializeError(classLoader);
                if (cause instanceof JobExecutionException) {
                    throw (JobExecutionException)cause;
                }
                throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed with null as failure cause.");
        }
        throw new JobExecutionException(jobGraph.getJobID(), "Unknown answer from JobManager after submitting the job: " + answer);
    }

    public static void submitJobDetached(ActorGateway jobManagerGateway, JobGraph jobGraph, FiniteDuration timeout, ClassLoader classLoader) throws JobExecutionException {
        Object result;
        Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(timeout, "The timeout must not be null.");
        try {
            Future<Object> future = jobManagerGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), timeout);
            result = Await.result(future, (Duration)timeout);
        }
        catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + timeout.toString(), e);
        }
        catch (Throwable t) {
            throw new JobExecutionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause());
        }
        if (result instanceof JobManagerMessages.JobSubmitSuccess) {
            JobID respondedID = ((JobManagerMessages.JobSubmitSuccess)result).jobId();
            if (!respondedID.equals((Object)jobGraph.getJobID())) {
                throw new JobExecutionException(jobGraph.getJobID(), "JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + respondedID);
            }
        } else {
            if (result instanceof JobManagerMessages.JobResultFailure) {
                try {
                    SerializedThrowable t = ((JobManagerMessages.JobResultFailure)result).cause();
                    throw t.deserializeError(classLoader);
                }
                catch (JobExecutionException e) {
                    throw e;
                }
                catch (Throwable t) {
                    throw new JobExecutionException(jobGraph.getJobID(), "JobSubmission failed: " + t.getMessage(), t);
                }
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
        }
    }

    public static void uploadJarFiles(JobGraph jobGraph, ActorGateway jobManagerGateway, FiniteDuration timeout) throws IOException {
        if (jobGraph.hasUsercodeJarFiles()) {
            int port;
            Future<Object> futureBlobPort = jobManagerGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
            try {
                Object result = Await.result(futureBlobPort, (Duration)timeout);
                if (!(result instanceof Integer)) {
                    throw new Exception("Expected port number (int) as answer, received " + result);
                }
                port = (Integer)result;
            }
            catch (Exception e) {
                throw new IOException("Could not retrieve the JobManager's blob port.", e);
            }
            Option jmHost = jobManagerGateway.actor().path().address().host();
            String jmHostname = jmHost.isDefined() ? (String)jmHost.get() : "localhost";
            InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
            jobGraph.uploadRequiredJarFiles(serverAddress);
        }
    }
}

