/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import scala.Function1;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class Execution
implements Serializable {
    private static final long serialVersionUID = 42L;
    private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
    private static final Logger LOG = ExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    private final long[] stateTimestamps;
    private final int attemptNumber;
    private final FiniteDuration timeout;
    private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private volatile SimpleSlot assignedResource;
    private volatile Throwable failureCause;
    private volatile InstanceConnectionInfo assignedResourceLocation;
    private SerializedValue<StateHandle<?>> operatorState;

    public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
        this.vertex = Preconditions.checkNotNull(vertex);
        this.attemptId = new ExecutionAttemptID();
        this.attemptNumber = attemptNumber;
        this.stateTimestamps = new long[ExecutionState.values().length];
        this.markTimestamp(ExecutionState.CREATED, startTimestamp);
        this.timeout = timeout;
        this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue();
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    public int getAttemptNumber() {
        return this.attemptNumber;
    }

    public ExecutionState getState() {
        return this.state;
    }

    public SimpleSlot getAssignedResource() {
        return this.assignedResource;
    }

    public InstanceConnectionInfo getAssignedResourceLocation() {
        return this.assignedResourceLocation;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    public long getStateTimestamp(ExecutionState state) {
        return this.stateTimestamps[state.ordinal()];
    }

    public boolean isFinished() {
        return this.state == ExecutionState.FINISHED || this.state == ExecutionState.FAILED || this.state == ExecutionState.CANCELED;
    }

    public void prepareForArchiving() {
        if (this.assignedResource != null && this.assignedResource.isAlive()) {
            throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
        }
        this.assignedResource = null;
        this.partialInputChannelDeploymentDescriptors.clear();
        this.partialInputChannelDeploymentDescriptors = null;
    }

    public void setInitialState(SerializedValue<StateHandle<?>> initialState) {
        if (this.state != ExecutionState.CREATED) {
            throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
        }
        this.operatorState = initialState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
        if (scheduler == null) {
            throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
        }
        SlotSharingGroup sharingGroup = this.vertex.getJobVertex().getSlotSharingGroup();
        CoLocationConstraint locationConstraint = this.vertex.getLocationConstraint();
        if (locationConstraint != null && sharingGroup == null) {
            throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
        }
        if (this.transitionState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
            ScheduledUnit toSchedule;
            ScheduledUnit scheduledUnit = toSchedule = locationConstraint == null ? new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint);
            if (queued) {
                SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
                future.setFutureAction(new SlotAllocationFutureAction(){

                    @Override
                    public void slotAllocated(SimpleSlot slot) {
                        try {
                            Execution.this.deployToSlot(slot);
                        }
                        catch (Throwable t) {
                            try {
                                slot.releaseSlot();
                            }
                            finally {
                                Execution.this.markFailed(t);
                            }
                        }
                    }
                });
            } else {
                SimpleSlot slot = scheduler.scheduleImmediately(toSchedule);
                try {
                    this.deployToSlot(slot);
                }
                catch (Throwable t) {
                    try {
                        slot.releaseSlot();
                    }
                    finally {
                        this.markFailed(t);
                    }
                }
            }
            return true;
        }
        return false;
    }

    public void deployToSlot(final SimpleSlot slot) throws JobException {
        if (slot == null) {
            throw new NullPointerException();
        }
        if (!slot.isAlive()) {
            throw new JobException("Target slot for deployment is not alive.");
        }
        ExecutionState previous = this.state;
        if (previous == ExecutionState.SCHEDULED || previous == ExecutionState.CREATED) {
            if (!this.transitionState(previous, ExecutionState.DEPLOYING)) {
                throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + (Object)((Object)previous));
        }
        try {
            if (!slot.setExecutedVertex(this)) {
                throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
            }
            this.assignedResource = slot;
            this.assignedResourceLocation = slot.getInstance().getInstanceConnectionInfo();
            if (this.state != ExecutionState.DEPLOYING) {
                slot.releaseSlot();
                return;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Deploying %s (attempt #%d) to %s", this.vertex.getSimpleName(), this.attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
            }
            final TaskDeploymentDescriptor deployment = this.vertex.createDeploymentDescriptor(this.attemptId, slot, this.operatorState);
            this.vertex.getExecutionGraph().registerExecution(this);
            final Instance instance = slot.getInstance();
            Future deployAction = Patterns.ask((ActorRef)instance.getTaskManager(), (Object)new TaskMessages.SubmitTask(deployment), (Timeout)new Timeout(this.timeout));
            deployAction.onComplete((Function1)new OnComplete<Object>(){

                public void onComplete(Throwable failure, Object success) throws Throwable {
                    if (failure != null) {
                        if (failure instanceof TimeoutException) {
                            String taskname = Task.getTaskNameWithSubtaskAndID(deployment.getTaskName(), deployment.getIndexInSubtaskGroup(), deployment.getNumberOfSubtasks(), Execution.this.attemptId);
                            Execution.this.markFailed(new Exception("Cannot deploy task " + taskname + " - TaskManager (" + instance + ") not responding after a timeout of " + Execution.this.timeout, failure));
                        } else {
                            Execution.this.markFailed(failure);
                        }
                    } else if (!success.equals(Messages.getAcknowledge())) {
                        Execution.this.markFailed(new Exception("Failed to deploy the task to slot " + slot + ": Response was not of type Acknowledge"));
                    }
                }
            }, AkkaUtils.globalExecutionContext());
        }
        catch (Throwable t) {
            this.markFailed(t);
            ExceptionUtils.rethrow((Throwable)t);
        }
    }

    public void cancel() {
        ExecutionState current;
        block8: {
            while (true) {
                if ((current = this.state) == ExecutionState.CANCELING || current == ExecutionState.CANCELED) {
                    return;
                }
                if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                    if (!this.transitionState(current, ExecutionState.CANCELING)) continue;
                    this.sendCancelRpcCall();
                    return;
                }
                if (current == ExecutionState.FINISHED || current == ExecutionState.FAILED) {
                    this.sendFailIntermediateResultPartitionsRPCCall();
                    return;
                }
                if (current != ExecutionState.CREATED && current != ExecutionState.SCHEDULED) break block8;
                if (this.transitionState(current, ExecutionState.CANCELED)) break;
            }
            this.markTimestamp(ExecutionState.CANCELING, this.getStateTimestamp(ExecutionState.CANCELED));
            try {
                this.vertex.getExecutionGraph().deregisterExecution(this);
                if (this.assignedResource != null) {
                    this.assignedResource.releaseSlot();
                }
            }
            finally {
                this.vertex.executionCanceled();
            }
            return;
        }
        throw new IllegalStateException(current.name());
    }

    void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
        int numConsumers = allConsumers.size();
        if (numConsumers > 1) {
            this.fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
        } else if (numConsumers == 0) {
            return;
        }
        for (ExecutionEdge edge : allConsumers.get(0)) {
            Execution partitionExecution;
            final ExecutionVertex consumerVertex = edge.getTarget();
            Execution consumer = consumerVertex.getCurrentExecutionAttempt();
            ExecutionState consumerState = consumer.getState();
            IntermediateResultPartition partition = edge.getSource();
            if (consumerState == ExecutionState.CREATED) {
                partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
                consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
                Futures.future((Callable)new Callable<Boolean>(){

                    @Override
                    public Boolean call() throws Exception {
                        try {
                            ExecutionGraph consumerGraph = consumerVertex.getExecutionGraph();
                            consumerVertex.scheduleForExecution(consumerVertex.getExecutionGraph().getScheduler(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
                        }
                        catch (Throwable t) {
                            Execution.this.fail(new IllegalStateException("Could not schedule consumer vertex " + consumerVertex, t));
                        }
                        return true;
                    }
                }, (ExecutionContext)AkkaUtils.globalExecutionContext());
                if (consumerVertex.getExecutionState() != ExecutionState.RUNNING) continue;
                consumerVertex.sendPartitionInfos();
                continue;
            }
            if (consumerState == ExecutionState.RUNNING) {
                ResultPartitionLocation partitionLocation;
                SimpleSlot consumerSlot = consumer.getAssignedResource();
                if (consumerSlot == null) continue;
                Instance consumerInstance = consumerSlot.getInstance();
                ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), this.attemptId);
                Instance partitionInstance = partition.getProducer().getCurrentAssignedResource().getInstance();
                if (consumerInstance.equals(partitionInstance)) {
                    partitionLocation = ResultPartitionLocation.createLocal();
                } else {
                    ConnectionID connectionId = new ConnectionID(partitionInstance.getInstanceConnectionInfo(), partition.getIntermediateResult().getConnectionIndex());
                    partitionLocation = ResultPartitionLocation.createRemote(connectionId);
                }
                InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(partitionId, partitionLocation);
                TaskMessages.UpdateTaskSinglePartitionInfo updateTaskMessage = new TaskMessages.UpdateTaskSinglePartitionInfo(consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
                this.sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
                continue;
            }
            if (consumerState != ExecutionState.SCHEDULED && consumerState != ExecutionState.DEPLOYING) continue;
            partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
            consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
            if (consumerVertex.getExecutionState() != ExecutionState.RUNNING) continue;
            consumerVertex.sendPartitionInfos();
        }
    }

    public void fail(Throwable t) {
        this.processFail(t, false);
    }

    void markFailed(Throwable t) {
        this.processFail(t, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void markFinished() {
        ExecutionState current;
        while ((current = this.state) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
            if (!this.transitionState(current, ExecutionState.FINISHED)) continue;
            try {
                for (IntermediateResultPartition finishedPartition : this.getVertex().finishAllBlockingPartitions()) {
                    IntermediateResultPartition[] allPartitions;
                    for (IntermediateResultPartition partition : allPartitions = finishedPartition.getIntermediateResult().getPartitions()) {
                        this.scheduleOrUpdateConsumers(partition.getConsumers());
                    }
                }
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionFinished();
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.cancelingComplete();
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task FINISHED, but concurrently went to state " + (Object)((Object)this.state));
            }
            return;
        }
        this.markFailed(new Exception("Vertex received FINISHED message while being in state " + (Object)((Object)this.state)));
    }

    void cancelingComplete() {
        ExecutionState current;
        block6: {
            do {
                if ((current = this.state) == ExecutionState.CANCELED) {
                    return;
                }
                if (current != ExecutionState.CANCELING && current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING) break block6;
            } while (!this.transitionState(current, ExecutionState.CANCELED));
            try {
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionCanceled();
            }
            return;
        }
        if (current != ExecutionState.FAILED) {
            String message = String.format("Asynchronous race: Found state %s after successful cancel call.", new Object[]{this.state});
            LOG.error(message);
            this.vertex.getExecutionGraph().fail(new Exception(message));
        }
    }

    void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
        this.partialInputChannelDeploymentDescriptors.add(partitionInfo);
    }

    void sendPartitionInfos() {
        if (this.partialInputChannelDeploymentDescriptors != null && !this.partialInputChannelDeploymentDescriptors.isEmpty()) {
            PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
            ArrayList<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
            ArrayList<InputChannelDeploymentDescriptor> inputChannelDeploymentDescriptors = new ArrayList<InputChannelDeploymentDescriptor>();
            while ((partialInputChannelDeploymentDescriptor = this.partialInputChannelDeploymentDescriptors.poll()) != null) {
                resultIDs.add(partialInputChannelDeploymentDescriptor.getResultId());
                inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
            }
            TaskMessages.UpdateTaskMultiplePartitionInfos updateTaskMessage = TaskMessages.createUpdateTaskMultiplePartitionInfos(this.attemptId, resultIDs, inputChannelDeploymentDescriptors);
            this.sendUpdateTaskRpcCall(this.assignedResource, updateTaskMessage);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processFail(Throwable t, boolean isCallback) {
        ExecutionState current;
        do {
            if ((current = this.state) == ExecutionState.FAILED) {
                return false;
            }
            if (current != ExecutionState.CANCELED) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", new Object[]{this.getVertexWithAttempt(), ExecutionState.FAILED, ExecutionState.CANCELED}));
            }
            return false;
        } while (!this.transitionState(current, ExecutionState.FAILED, t));
        this.failureCause = t;
        try {
            if (this.assignedResource != null) {
                this.assignedResource.releaseSlot();
            }
            this.vertex.getExecutionGraph().deregisterExecution(this);
        }
        finally {
            this.vertex.executionFailed(t);
        }
        if (!(isCallback || current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
            }
            try {
                if (this.assignedResource != null) {
                    this.sendCancelRpcCall();
                }
            }
            catch (Throwable tt) {
                LOG.error("Error triggering cancel call while marking task as failed.", tt);
            }
        }
        return true;
    }

    boolean switchToRunning() {
        if (this.transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
            this.sendPartitionInfos();
            return true;
        }
        ExecutionState currentState = this.state;
        if (currentState != ExecutionState.FINISHED && currentState != ExecutionState.CANCELED) {
            if (currentState == ExecutionState.CANCELING || currentState == ExecutionState.FAILED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", this.getVertexWithAttempt()));
                }
                this.sendCancelRpcCall();
            } else {
                String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.", new Object[]{this.getVertexWithAttempt(), currentState});
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message);
                }
                this.sendCancelRpcCall();
                this.markFailed(new Exception(message));
            }
        }
        return false;
    }

    private void sendCancelRpcCall() {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new TaskMessages.CancelTask(this.attemptId), 3, AkkaUtils.globalExecutionContext(), this.timeout);
            cancelResult.onComplete((Function1)new OnComplete<Object>(){

                public void onComplete(Throwable failure, Object success) throws Throwable {
                    if (failure != null) {
                        Execution.this.fail(new Exception("Task could not be canceled.", failure));
                    } else {
                        TaskMessages.TaskOperationResult result = (TaskMessages.TaskOperationResult)success;
                        if (!result.success()) {
                            LOG.debug("Cancel task call did not find task. Probably akka message call race.");
                        }
                    }
                }
            }, AkkaUtils.globalExecutionContext());
        }
    }

    private void sendFailIntermediateResultPartitionsRPCCall() {
        Instance instance;
        SimpleSlot slot = this.assignedResource;
        if (slot != null && (instance = slot.getInstance()).isAlive()) {
            try {
                instance.getTaskManager().tell((Object)new TaskMessages.FailIntermediateResultPartitions(this.attemptId), ActorRef.noSender());
            }
            catch (Throwable t) {
                this.fail(new Exception("Intermediate result partition could not be failed.", t));
            }
        }
    }

    private void sendUpdateTaskRpcCall(SimpleSlot consumerSlot, TaskMessages.UpdatePartitionInfo updateTaskMsg) {
        if (consumerSlot != null) {
            final Instance instance = consumerSlot.getInstance();
            Future futureUpdate = Patterns.ask((ActorRef)instance.getTaskManager(), (Object)updateTaskMsg, (Timeout)new Timeout(this.timeout));
            futureUpdate.onFailure((PartialFunction)new OnFailure(){

                public void onFailure(Throwable failure) throws Throwable {
                    Execution.this.fail(new IllegalStateException("Update task on instance " + instance + " failed due to:", failure));
                }
            }, AkkaUtils.globalExecutionContext());
        }
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
        return this.transitionState(currentState, targetState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
        if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
            this.markTimestamp(targetState);
            LOG.info(this.getVertex().getTaskNameWithSubtaskIndex() + " (" + (Object)((Object)this.getAttemptId()) + ") switched from " + (Object)((Object)currentState) + " to " + (Object)((Object)targetState));
            try {
                this.vertex.notifyStateTransition(this.attemptId, targetState, error);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state transition.", t);
            }
            return true;
        }
        return false;
    }

    private void markTimestamp(ExecutionState state) {
        this.markTimestamp(state, System.currentTimeMillis());
    }

    private void markTimestamp(ExecutionState state, long timestamp) {
        this.stateTimestamps[state.ordinal()] = timestamp;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getSimpleName() + " - execution #" + this.attemptNumber;
    }

    public String toString() {
        return String.format("Attempt #%d (%s) @ %s - [%s]", new Object[]{this.attemptNumber, this.vertex.getSimpleName(), this.assignedResource == null ? "(unassigned)" : this.assignedResource.toString(), this.state});
    }
}

