/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException;
import org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class JobClientActor
extends FlinkUntypedActor
implements LeaderRetrievalListener {
    private final LeaderRetrievalService leaderRetrievalService;
    private final FiniteDuration timeout;
    private final boolean sysoutUpdates;
    private boolean jobSuccessfullySubmitted = false;
    private boolean terminated = false;
    private ActorRef jobManager;
    private UUID leaderSessionID;
    private ActorRef submitter;
    private JobGraph jobGraph;

    public JobClientActor(LeaderRetrievalService leaderRetrievalService, FiniteDuration submissionTimeout, boolean sysoutUpdates) {
        this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
        this.timeout = Preconditions.checkNotNull(submissionTimeout);
        this.sysoutUpdates = sysoutUpdates;
    }

    public void preStart() {
        try {
            this.leaderRetrievalService.start(this);
        }
        catch (Exception e) {
            this.LOG.error("Could not start the leader retrieval service.");
            throw new RuntimeException("Could not start the leader retrieval service.", e);
        }
    }

    public void postStop() {
        try {
            this.leaderRetrievalService.stop();
        }
        catch (Exception e) {
            this.LOG.warn("Could not properly stop the leader retrieval service.");
        }
    }

    @Override
    protected void handleMessage(Object message) {
        if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
            this.logAndPrintMessage(message);
        } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
            this.logAndPrintMessage(message);
        } else if (message instanceof JobClientMessages.JobManagerLeaderAddress) {
            JobClientMessages.JobManagerLeaderAddress msg = (JobClientMessages.JobManagerLeaderAddress)message;
            this.disconnectFromJobManager();
            this.leaderSessionID = msg.leaderSessionID();
            if (msg.address() != null) {
                AkkaUtils.getActorRefFuture(msg.address(), this.getContext().system(), this.timeout).onSuccess((PartialFunction)new OnSuccess<ActorRef>(){

                    public void onSuccess(ActorRef result) throws Throwable {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobClientMessages.JobManagerActorRef(result)), ActorRef.noSender());
                    }
                }, (ExecutionContext)this.getContext().dispatcher());
            }
        } else if (message instanceof JobClientMessages.JobManagerActorRef) {
            JobClientMessages.JobManagerActorRef msg = (JobClientMessages.JobManagerActorRef)message;
            this.connectToJobManager(msg.jobManager());
            if (this.jobGraph != null && !this.jobSuccessfullySubmitted) {
                this.tryToSubmitJob(this.jobGraph);
            }
        } else if (message instanceof JobClientMessages.SubmitJobAndWait) {
            if (!this.terminated) {
                if (this.submitter == null) {
                    this.jobGraph = ((JobClientMessages.SubmitJobAndWait)message).jobGraph();
                    if (this.jobGraph == null) {
                        this.LOG.error("Received null JobGraph");
                        this.sender().tell(this.decorateMessage(new Status.Failure((Throwable)new Exception("JobGraph is null"))), this.getSelf());
                    } else {
                        this.LOG.info("Received job {} ({}).", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
                        this.submitter = this.getSender();
                        this.tryToSubmitJob(this.jobGraph);
                    }
                } else {
                    String msg = "Received repeated 'SubmitJobAndWait'";
                    this.LOG.error(msg);
                    this.getSender().tell(this.decorateMessage(new Status.Failure((Throwable)new Exception(msg))), ActorRef.noSender());
                    this.terminate();
                }
            } else {
                String msg = this.getClass().getName() + " is about to be terminated. Therefore, the " + "job submission cannot be executed.";
                this.LOG.error(msg);
                this.getSender().tell(this.decorateMessage(new Status.Failure((Throwable)new Exception(msg))), ActorRef.noSender());
            }
        } else if (message instanceof JobManagerMessages.JobResultSuccess || message instanceof JobManagerMessages.JobResultFailure) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Received {} message from JobManager", (Object)message.getClass().getSimpleName());
            }
            if (this.hasJobBeenSubmitted()) {
                this.submitter.tell(this.decorateMessage(message), this.getSelf());
            }
            this.terminate();
        } else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
            this.LOG.info("Job was successfully submitted to the JobManager {}.", (Object)this.getSender().path());
            this.jobSuccessfullySubmitted = true;
        } else if (message instanceof Terminated) {
            ActorRef target = ((Terminated)message).getActor();
            if (this.jobManager.equals((Object)target)) {
                this.LOG.info("Lost connection to JobManager {}. Triggering connection timeout.", (Object)this.jobManager.path());
                this.disconnectFromJobManager();
                if (this.hasJobBeenSubmitted()) {
                    this.getContext().system().scheduler().scheduleOnce(this.timeout, this.getSelf(), this.decorateMessage(JobClientMessages.getConnectionTimeout()), (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
                }
            } else {
                this.LOG.warn("Received 'Terminated' for unknown actor " + target);
            }
        } else if (JobClientMessages.getConnectionTimeout().equals(message)) {
            if (!this.isConnected()) {
                if (this.hasJobBeenSubmitted()) {
                    this.submitter.tell(this.decorateMessage(new Status.Failure((Throwable)new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))), this.getSelf());
                }
                this.terminate();
            }
        } else if (JobClientMessages.getSubmissionTimeout().equals(message)) {
            if (!this.jobSuccessfullySubmitted) {
                if (this.hasJobBeenSubmitted()) {
                    this.submitter.tell(this.decorateMessage(new Status.Failure((Throwable)new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out."))), this.getSelf());
                }
                this.terminate();
            }
        } else {
            this.LOG.error("JobClient received unknown message: " + message);
        }
    }

    @Override
    protected UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    private void logAndPrintMessage(Object message) {
        this.LOG.info(message.toString());
        if (this.sysoutUpdates) {
            System.out.println(message.toString());
        }
    }

    @Override
    public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
        this.getSelf().tell(this.decorateMessage(new JobClientMessages.JobManagerLeaderAddress(leaderAddress, leaderSessionID)), this.getSelf());
    }

    @Override
    public void handleError(Exception exception) {
        this.LOG.error("Error occurred in the LeaderRetrievalService.", exception);
        this.getSelf().tell(this.decorateMessage(PoisonPill.getInstance()), this.getSelf());
    }

    private void disconnectFromJobManager() {
        if (this.jobManager != ActorRef.noSender()) {
            this.getContext().unwatch(this.jobManager);
            this.jobManager = ActorRef.noSender();
        }
    }

    private void connectToJobManager(ActorRef jobManager) {
        if (jobManager != ActorRef.noSender()) {
            this.getContext().unwatch(jobManager);
        }
        this.LOG.info("Connected to new JobManager {}.", (Object)jobManager.path());
        this.jobManager = jobManager;
        this.getContext().watch(jobManager);
    }

    private void tryToSubmitJob(final JobGraph jobGraph) {
        this.jobGraph = jobGraph;
        if (this.isConnected()) {
            this.LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", this.jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
            Futures.future((Callable)new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    AkkaActorGateway jobManagerGateway = new AkkaActorGateway(JobClientActor.this.jobManager, JobClientActor.this.leaderSessionID);
                    JobClientActor.this.LOG.info("Upload jar files to job manager {}.", (Object)JobClientActor.this.jobManager.path());
                    try {
                        JobClient.uploadJarFiles(jobGraph, jobManagerGateway, JobClientActor.this.timeout);
                    }
                    catch (IOException exception) {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobManagerMessages.JobResultFailure(new SerializedThrowable(new JobSubmissionException(jobGraph.getJobID(), "Could not upload the jar files to the job manager.", exception)))), ActorRef.noSender());
                    }
                    JobClientActor.this.LOG.info("Submit job to the job manager {}.", (Object)JobClientActor.this.jobManager.path());
                    JobClientActor.this.jobManager.tell(JobClientActor.this.decorateMessage(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), JobClientActor.this.getSelf());
                    JobClientActor.this.getContext().system().scheduler().scheduleOnce(JobClientActor.this.timeout, JobClientActor.this.getSelf(), JobClientActor.this.decorateMessage(JobClientMessages.getSubmissionTimeout()), (ExecutionContext)JobClientActor.this.getContext().dispatcher(), ActorRef.noSender());
                    return null;
                }
            }, (ExecutionContext)this.getContext().dispatcher());
        } else {
            this.LOG.info("Could not submit job {} ({}), because there is no connection to a JobManager.", (Object)jobGraph.getName(), (Object)jobGraph.getJobID());
            this.getContext().system().scheduler().scheduleOnce(this.timeout, this.getSelf(), this.decorateMessage(JobClientMessages.getConnectionTimeout()), (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
        }
    }

    private void terminate() {
        this.terminated = true;
        this.disconnectFromJobManager();
        this.getSelf().tell(this.decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
    }

    private boolean isConnected() {
        return this.jobManager != ActorRef.noSender();
    }

    private boolean hasJobBeenSubmitted() {
        return this.submitter != ActorRef.noSender();
    }

    public static Props createJobClientActorProps(LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout, boolean sysoutUpdates) {
        return Props.create(JobClientActor.class, (Object[])new Object[]{leaderRetrievalService, timeout, sysoutUpdates});
    }
}

