/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.IMap;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.Functions;
import com.hazelcast.jet.impl.ExplodeSnapshotP;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotValidator;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.operation.CompleteExecutionOperation;
import com.hazelcast.jet.impl.operation.InitExecutionOperation;
import com.hazelcast.jet.impl.operation.StartExecutionOperation;
import com.hazelcast.jet.impl.operation.TerminateExecutionOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.Operation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MasterJobContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    public static final String SNAPSHOT_VERTEX_PREFIX = "__snapshot_";
    private final MasterContext mc;
    private final ILogger logger;
    private volatile long executionStartTime;
    private volatile ExecutionInvocationCallback executionInvocationCallback;
    private volatile Set<Vertex> vertices;
    @Nonnull
    private volatile CompletableFuture<Void> executionCompletionFuture = CompletableFuture.completedFuture(null);
    private final NonCompletableFuture jobCompletionFuture = new NonCompletableFuture();
    private volatile TerminationMode requestedTerminationMode;

    MasterJobContext(MasterContext masterContext, ILogger logger) {
        this.mc = masterContext;
        this.logger = logger;
    }

    public CompletableFuture<Void> jobCompletionFuture() {
        return this.jobCompletionFuture;
    }

    TerminationMode requestedTerminationMode() {
        return this.requestedTerminationMode;
    }

    private boolean isCancelled() {
        return this.requestedTerminationMode == TerminationMode.CANCEL_FORCEFUL;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void tryStartJob(Function<Long, Long> executionIdSupplier) {
        String dotString;
        Exception exception;
        DAG dag;
        ClassLoader classLoader;
        block30: {
            block29: {
                block28: {
                    block27: {
                        classLoader = null;
                        dag = null;
                        exception = null;
                        dotString = null;
                        this.mc.lock();
                        if (!this.isCancelled()) break block27;
                        this.logger.fine("Skipping init job '" + this.mc.jobName() + "': is already cancelled.");
                        exception = new CancellationException();
                        this.mc.unlock();
                    }
                    if (this.mc.jobStatus() == JobStatus.NOT_RUNNING) break block28;
                    this.logger.fine("Not starting job '" + this.mc.jobName() + "': status is " + (Object)((Object)this.mc.jobStatus()));
                    this.mc.unlock();
                    return;
                }
                if (this.mc.jobExecutionRecord().isSuspended()) {
                    this.mc.jobExecutionRecord().setSuspended(false);
                    this.mc.writeJobExecutionRecord(false);
                    this.mc.setJobStatus(JobStatus.NOT_RUNNING);
                }
                if (!this.scheduleRestartIfQuorumAbsent() && !this.scheduleRestartIfClusterIsNotSafe()) break block29;
                this.mc.unlock();
                return;
            }
            this.executionStartTime = System.nanoTime();
            this.mc.setJobStatus(JobStatus.STARTING);
            this.mc.writeJobExecutionRecord(true);
            if (this.requestedTerminationMode == null) break block30;
            if (this.requestedTerminationMode.actionAfterTerminate() != TerminationMode.ActionAfterTerminate.RESTART) {
                exception = new JobTerminateRequestedException(this.requestedTerminationMode);
                this.mc.unlock();
            }
            this.requestedTerminationMode = null;
        }
        classLoader = this.mc.getJetService().getClassLoader(this.mc.jobId());
        try {
            dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.mc.nodeEngine().getSerializationService(), classLoader, this.mc.jobRecord().getDag());
        }
        catch (Exception e) {
            this.logger.warning("DAG deserialization failed", e);
            exception = e;
            this.mc.unlock();
        }
        try {
            this.vertices = new HashSet<Vertex>();
            dotString = dag.toDotString();
            dag.iterator().forEachRemaining(this.vertices::add);
            this.mc.setExecutionId(executionIdSupplier.apply(this.mc.jobId()));
            this.mc.snapshotContext().onExecutionStarted();
            this.executionCompletionFuture = new CompletableFuture();
        }
        finally {
            this.mc.unlock();
        }
        if (exception != null) {
            this.finalizeJob(exception);
            return;
        }
        long snapshotToRestore = this.mc.jobExecutionRecord().snapshotId();
        try {
            this.mc.jobRepository().clearSnapshotData(this.mc.jobId(), this.mc.jobExecutionRecord().ongoingDataMapIndex());
        }
        catch (Exception e) {
            this.logger.warning("Cannot delete old snapshots for " + this.mc.jobName(), e);
        }
        String mapName = null;
        if (snapshotToRestore >= 0L) {
            mapName = this.mc.jobExecutionRecord().successfulSnapshotDataMapName(this.mc.jobId());
        } else if (this.mc.jobConfig().getInitialSnapshotName() != null) {
            mapName = "__jet.exportedSnapshot." + this.mc.jobConfig().getInitialSnapshotName();
        }
        if (mapName != null) {
            try {
                this.rewriteDagWithSnapshotRestore(dag, snapshotToRestore, mapName);
            }
            catch (Exception e) {
                this.finalizeJob(e);
                return;
            }
        } else {
            this.logger.info("No previous snapshot for " + this.mc.jobIdString() + " found.");
        }
        MembersView membersView = this.getMembersView();
        ClassLoader previousCL = MasterJobContext.swapContextClassLoader(classLoader);
        try {
            this.logger.info("Start executing " + this.mc.jobIdString() + ", execution graph in DOT format:\n" + dotString + "\nHINT: You can use graphviz or http://viz-js.com to visualize the printed graph.");
            this.logger.fine("Building execution plan for " + this.mc.jobIdString());
            this.mc.setExecutionPlanMap(ExecutionPlanBuilder.createExecutionPlans(this.mc.nodeEngine(), membersView, dag, this.mc.jobId(), this.mc.executionId(), this.mc.jobConfig(), this.mc.jobExecutionRecord().ongoingSnapshotId()));
        }
        catch (Exception e) {
            this.finalizeJob(e);
            return;
        }
        finally {
            Thread.currentThread().setContextClassLoader(previousCL);
        }
        this.logger.fine("Built execution plans for " + this.mc.jobIdString());
        Set<MemberInfo> participants = this.mc.executionPlanMap().keySet();
        Function<ExecutionPlan, Operation> operationCtor = plan -> new InitExecutionOperation(this.mc.jobId(), this.mc.executionId(), membersView.getVersion(), participants, (Data)this.mc.nodeEngine().getSerializationService().toData(plan));
        this.mc.invokeOnParticipants(operationCtor, this::onInitStepCompleted, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    Tuple2<CompletableFuture<Void>, String> requestTermination(TerminationMode mode, boolean allowWhileExportingSnapshot) {
        Tuple2<CompletableFuture<Void>, Object> result;
        JobStatus localStatus;
        if (this.mc.jobConfig().getProcessingGuarantee() == ProcessingGuarantee.NONE && mode != TerminationMode.CANCEL_GRACEFUL) {
            mode = mode.withoutTerminalSnapshot();
        }
        this.mc.lock();
        try {
            localStatus = this.mc.jobStatus();
            if (localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT && !allowWhileExportingSnapshot) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Cannot cancel when job status is " + (Object)((Object)JobStatus.SUSPENDED_EXPORTING_SNAPSHOT));
                return tuple2;
            }
            if (localStatus == JobStatus.SUSPENDED && mode != TerminationMode.CANCEL_FORCEFUL) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Job is " + (Object)((Object)JobStatus.SUSPENDED));
                return tuple2;
            }
            if (this.requestedTerminationMode != null) {
                String message = this.requestedTerminationMode == TerminationMode.CANCEL_FORCEFUL && mode == TerminationMode.CANCEL_FORCEFUL ? null : "Job is already terminating in mode: " + this.requestedTerminationMode.name();
                Tuple2<CompletableFuture<Void>, Object> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, message);
                return tuple2;
            }
            this.requestedTerminationMode = mode;
            if (localStatus == JobStatus.SUSPENDED || localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                this.mc.setJobStatus(JobStatus.FAILED);
                this.setFinalResult(new CancellationException());
            }
            if (mode.isWithTerminalSnapshot()) {
                this.mc.snapshotContext().enqueueSnapshot(null, true, null);
            }
            result = Tuple2.tuple2(this.executionCompletionFuture, null);
        }
        finally {
            this.mc.unlock();
        }
        if (localStatus == JobStatus.SUSPENDED) {
            try {
                this.mc.coordinationService().completeJob(this.mc, System.currentTimeMillis(), new CancellationException()).get();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        } else if (localStatus == JobStatus.RUNNING || localStatus == JobStatus.STARTING) {
            this.handleTermination(mode);
        }
        return result;
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long snapshotId, String mapName) {
        IMap<Object, Object> snapshotMap = this.mc.nodeEngine().getHazelcastInstance().getMap(mapName);
        snapshotId = SnapshotValidator.validateSnapshot(snapshotId, this.mc.jobIdString(), snapshotMap);
        this.logger.info("State of " + this.mc.jobIdString() + " will be restored from snapshot " + snapshotId + ", map=" + mapName);
        ArrayList originalVertices = new ArrayList();
        dag.iterator().forEachRemaining(originalVertices::add);
        HashMap<String, Integer> vertexToOrdinal = new HashMap<String, Integer>();
        Vertex readSnapshotVertex = dag.newVertex("__snapshot_read", SourceProcessors.readMapP(mapName));
        long finalSnapshotId = snapshotId;
        Vertex explodeVertex = dag.newVertex("__snapshot_explode", () -> new ExplodeSnapshotP(vertexToOrdinal, finalSnapshotId));
        dag.edge(Edge.between(readSnapshotVertex, explodeVertex).isolated());
        int index = 0;
        for (Vertex userVertex : originalVertices) {
            vertexToOrdinal.put(userVertex.getName(), index);
            int destOrdinal = dag.getInboundEdges(userVertex.getName()).size();
            dag.edge(new SnapshotRestoreEdge(explodeVertex, index, userVertex, destOrdinal));
            ++index;
        }
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.mc.jobExecutionRecord().getQuorumSize();
        if (this.mc.coordinationService().isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': quorum size " + quorumSize + " is not met");
        this.scheduleRestart();
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.mc.coordinationService().shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': cluster is not safe");
        this.scheduleRestart();
        return true;
    }

    private void scheduleRestart() {
        this.mc.assertLockHeld();
        JobStatus jobStatus = this.mc.jobStatus();
        if (jobStatus != JobStatus.NOT_RUNNING && jobStatus != JobStatus.STARTING && jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Restart scheduled in an unexpected state: " + (Object)((Object)jobStatus));
        }
        this.mc.setJobStatus(JobStatus.NOT_RUNNING);
        this.mc.coordinationService().scheduleRestart(this.mc.jobId());
    }

    private MembersView getMembersView() {
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.mc.nodeEngine().getClusterService();
        return clusterService.getMembershipManager().getMembersView();
    }

    private void onInitStepCompleted(Map<MemberInfo, Object> responses) {
        JobStatus status;
        Throwable error = this.getResult("Init", responses);
        if (error == null && (status = this.mc.jobStatus()) != JobStatus.STARTING) {
            error = new IllegalStateException("Cannot execute " + this.mc.jobIdString() + ": status is " + (Object)((Object)status));
        }
        if (error == null) {
            this.invokeStartExecution();
        } else {
            this.invokeCompleteExecution(error);
        }
    }

    private void invokeStartExecution() {
        this.logger.fine("Executing " + this.mc.jobIdString());
        long executionId = this.mc.executionId();
        this.executionInvocationCallback = new ExecutionInvocationCallback(executionId);
        if (this.requestedTerminationMode != null) {
            this.handleTermination(this.requestedTerminationMode);
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new StartExecutionOperation(this.mc.jobId(), executionId);
        Consumer<Map<MemberInfo, Object>> completionCallback = this::onExecuteStepCompleted;
        this.mc.setJobStatus(JobStatus.RUNNING);
        this.mc.invokeOnParticipants(operationCtor, completionCallback, this.executionInvocationCallback);
        if (this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
            this.mc.coordinationService().scheduleSnapshot(this.mc, executionId);
        }
    }

    private void handleTermination(@Nonnull TerminationMode mode) {
        if (mode.isWithTerminalSnapshot()) {
            this.mc.snapshotContext().tryBeginSnapshot();
        } else if (this.executionInvocationCallback != null) {
            this.executionInvocationCallback.cancelInvocations(mode);
        }
    }

    private void onExecuteStepCompleted(Map<MemberInfo, Object> responses) {
        this.invokeCompleteExecution(this.getResult("Execution", responses));
    }

    void setFinalResult(Throwable failure) {
        if (failure == null) {
            this.jobCompletionFuture.internalComplete();
        } else {
            this.jobCompletionFuture.internalCompleteExceptionally(failure);
        }
    }

    private Throwable getResult(String opName, Map<MemberInfo, Object> responses) {
        if (this.isCancelled()) {
            this.logger.fine(this.mc.jobIdString() + " to be cancelled after " + opName);
            return new CancellationException();
        }
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = this.groupResponses(responses);
        Collection successfulMembers = grouped.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toList());
        List<Map.Entry<MemberInfo, Object>> failures = grouped.get(true);
        if (!failures.isEmpty()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " has failures: " + failures);
        }
        if (successfulMembers.size() == this.mc.executionPlanMap().size()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " was successful");
            return null;
        }
        if (failures.stream().allMatch(entry -> entry.getValue() instanceof TerminatedWithSnapshotException)) {
            assert (opName.equals("Execution")) : "opName is '" + opName + "', expected 'Execution'";
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " terminated after a terminal snapshot");
            TerminationMode mode = this.requestedTerminationMode;
            assert (mode != null && mode.isWithTerminalSnapshot()) : "mode=" + (Object)((Object)mode);
            return mode == TerminationMode.CANCEL_GRACEFUL ? new CancellationException() : new JobTerminateRequestedException(mode);
        }
        return failures.stream().map(entry -> (Throwable)entry.getValue()).filter(e -> !(e instanceof CancellationException) && !(e instanceof TerminatedWithSnapshotException) && !ExceptionUtil.isTopologyException(e)).findFirst().map(ExceptionUtil::peel).orElseGet(TopologyChangedException::new);
    }

    private void invokeCompleteExecution(Throwable error) {
        Throwable finalError;
        JobStatus status = this.mc.jobStatus();
        if (status == JobStatus.STARTING || status == JobStatus.RUNNING) {
            this.logger.fine("Sending CompleteExecutionOperation for " + this.mc.jobIdString());
            finalError = error;
        } else {
            this.logCannotComplete(error);
            finalError = new IllegalStateException("Job coordination failed");
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new CompleteExecutionOperation(this.mc.executionId(), finalError);
        this.mc.invokeOnParticipants(operationCtor, responses -> this.onCompleteExecutionCompleted(error), null);
    }

    private void logCannotComplete(Throwable error) {
        if (error != null) {
            this.logger.severe("Cannot properly complete failed " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()), error);
        } else {
            this.logger.severe("Cannot properly complete " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()));
        }
    }

    private void onCompleteExecutionCompleted(Throwable error) {
        if (error instanceof JobTerminateRequestedException && ((JobTerminateRequestedException)error).mode().isWithTerminalSnapshot()) {
            this.mc.snapshotContext().terminalSnapshotFuture().whenCompleteAsync((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> this.finalizeJob(error)));
        } else {
            this.finalizeJob(error);
        }
    }

    private void cancelExecutionInvocations(long jobId, long executionId, TerminationMode mode) {
        this.mc.nodeEngine().getExecutionService().execute("hz:async", () -> this.mc.invokeOnParticipants(plan -> new TerminateExecutionOperation(jobId, executionId, mode), null, null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void finalizeJob(@Nullable Throwable failure) {
        Runnable nonSynchronizedAction = () -> {};
        this.mc.lock();
        try {
            JobStatus status = this.mc.jobStatus();
            if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
                this.logIgnoredCompletion(failure, status);
                return;
            }
            this.completeVertices(failure);
            boolean isSuccess = this.isSuccess(failure);
            this.requestedTerminationMode = null;
            this.executionInvocationCallback = null;
            TerminationMode.ActionAfterTerminate terminationModeAction = failure instanceof JobTerminateRequestedException ? ((JobTerminateRequestedException)failure).mode().actionAfterTerminate() : null;
            this.mc.snapshotContext().onExecutionTerminated();
            if (terminationModeAction == TerminationMode.ActionAfterTerminate.RESTART) {
                this.mc.setJobStatus(JobStatus.NOT_RUNNING);
                nonSynchronizedAction = () -> this.mc.coordinationService().restartJob(this.mc.jobId());
            } else if (ExceptionUtil.isRestartableException(failure) && this.mc.jobConfig().isAutoScaling()) {
                this.scheduleRestart();
            } else if (terminationModeAction == TerminationMode.ActionAfterTerminate.SUSPEND || ExceptionUtil.isRestartableException(failure) && !this.mc.jobConfig().isAutoScaling() && this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
                this.mc.setJobStatus(JobStatus.SUSPENDED);
                this.mc.jobExecutionRecord().setSuspended(true);
                nonSynchronizedAction = () -> this.mc.writeJobExecutionRecord(false);
            } else {
                this.mc.setJobStatus(isSuccess ? JobStatus.COMPLETED : JobStatus.FAILED);
                if (failure instanceof LocalMemberResetException) {
                    this.logger.fine("Cancelling job " + this.mc.jobIdString() + " locally: member (local or remote) reset. We don't delete job metadata: job will restart on majority cluster");
                    this.setFinalResult(new CancellationException());
                    return;
                }
                this.mc.coordinationService().completeJob(this.mc, System.currentTimeMillis(), failure).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, f) -> {
                    if (f != null) {
                        this.logger.warning("Completion of " + this.mc.jobIdString() + " failed", (Throwable)f);
                    } else {
                        this.setFinalResult(failure);
                    }
                }));
            }
        }
        finally {
            this.mc.unlock();
        }
        this.executionCompletionFuture.complete(null);
        nonSynchronizedAction.run();
    }

    private boolean isSuccess(@Nullable Throwable failure) {
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.executionStartTime);
        if (failure == null) {
            this.logger.info(String.format("Execution of %s completed in %,d ms", this.mc.jobIdString(), elapsed));
            return true;
        }
        if (failure instanceof CancellationException || failure instanceof JobTerminateRequestedException) {
            this.logger.info(String.format("Execution of %s completed in %,d ms, reason=%s", this.mc.jobIdString(), elapsed, failure));
            return false;
        }
        this.logger.severe(String.format("Execution of %s failed after %,d ms", this.mc.jobIdString(), elapsed), failure);
        return false;
    }

    private void logIgnoredCompletion(@Nullable Throwable failure, JobStatus status) {
        if (failure != null) {
            this.logger.severe("Ignoring failure completion of " + Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status), failure);
        } else {
            this.logger.severe("Ignoring completion of " + Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status));
        }
    }

    private void completeVertices(@Nullable Throwable failure) {
        if (this.vertices != null) {
            for (Vertex vertex : this.vertices) {
                try {
                    vertex.getMetaSupplier().close(failure);
                }
                catch (Exception e) {
                    this.logger.severe(this.mc.jobIdString() + " encountered an exception in ProcessorMetaSupplier.complete(), ignoring it", e);
                }
            }
        }
    }

    private static ClassLoader swapContextClassLoader(ClassLoader jobClassLoader) {
        Thread currentThread = Thread.currentThread();
        ClassLoader previous = currentThread.getContextClassLoader();
        currentThread.setContextClassLoader(jobClassLoader);
        return previous;
    }

    void resumeJob(Function<Long, Long> executionIdSupplier) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.SUSPENDED) {
                this.logger.info("Not resuming " + this.mc.jobIdString() + ": not " + (Object)((Object)JobStatus.SUSPENDED) + ", but " + (Object)((Object)this.mc.jobStatus()));
                return;
            }
            this.mc.setJobStatus(JobStatus.NOT_RUNNING);
        }
        finally {
            this.mc.unlock();
        }
        this.logger.fine("Resuming job " + this.mc.jobName());
        this.tryStartJob(executionIdSupplier);
    }

    private boolean hasParticipant(String uuid) {
        Map<MemberInfo, ExecutionPlan> planMap = this.mc.executionPlanMap();
        return this.mc.nodeEngine().getLocalMember().getUuid().equals(uuid) || planMap != null && planMap.keySet().stream().anyMatch(mi -> mi.getUuid().equals(uuid));
    }

    @Nonnull
    CompletableFuture<Void> onParticipantGracefulShutdown(String uuid) {
        return this.hasParticipant(uuid) ? this.gracefullyTerminate() : CompletableFuture.completedFuture(null);
    }

    @Nonnull
    CompletableFuture<Void> gracefullyTerminate() {
        return this.requestTermination(TerminationMode.RESTART_GRACEFUL, false).f0();
    }

    boolean maybeScaleUp(int dataMembersWithPartitionsCount) {
        if (!this.mc.jobConfig().isAutoScaling()) {
            return true;
        }
        if (this.mc.executionPlanMap() == null || this.mc.executionPlanMap().size() == dataMembersWithPartitionsCount) {
            LoggingUtil.logFine(this.logger, "Not scaling up %s: not running or already running on all members", this.mc.jobIdString());
            return true;
        }
        JobStatus localStatus = this.mc.jobStatus();
        if (localStatus == JobStatus.RUNNING && this.requestTermination(TerminationMode.RESTART_GRACEFUL, false).f1() == null) {
            this.logger.info("Requested restart of " + this.mc.jobIdString() + " to make use of added member(s). Job was running on " + this.mc.executionPlanMap().size() + " members, cluster now has " + dataMembersWithPartitionsCount + " data members with assigned partitions");
            return true;
        }
        return false;
    }

    private Map<Boolean, List<Map.Entry<MemberInfo, Object>>> groupResponses(Map<MemberInfo, Object> responses) {
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = responses.entrySet().stream().collect(Collectors.partitioningBy(e -> e.getValue() instanceof Throwable));
        grouped.putIfAbsent(true, Collections.emptyList());
        grouped.putIfAbsent(false, Collections.emptyList());
        return grouped;
    }

    private class ExecutionInvocationCallback
    implements ExecutionCallback<Object> {
        private final AtomicBoolean invocationsCancelled = new AtomicBoolean();
        private final long executionId;

        ExecutionInvocationCallback(long executionId) {
            this.executionId = executionId;
        }

        @Override
        public void onResponse(Object response) {
        }

        @Override
        public void onFailure(Throwable t) {
            if (!(ExceptionUtil.peel(t) instanceof TerminatedWithSnapshotException)) {
                this.cancelInvocations(null);
            }
        }

        void cancelInvocations(TerminationMode mode) {
            if (this.invocationsCancelled.compareAndSet(false, true)) {
                MasterJobContext.this.cancelExecutionInvocations(MasterJobContext.this.mc.jobId(), this.executionId, mode);
            }
        }
    }

    private static class SnapshotRestoreEdge
    extends Edge {
        SnapshotRestoreEdge(Vertex source, int sourceOrdinal, Vertex destination, int destOrdinal) {
            super(source, sourceOrdinal, destination, destOrdinal);
            this.distributed();
            this.partitioned(Functions.entryKey());
        }

        @Override
        public int getPriority() {
            return Integer.MIN_VALUE;
        }
    }
}

