package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.function.Functions;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.metrics.JobMetrics;
import com.hazelcast.jet.core.metrics.Measurement;
import com.hazelcast.jet.core.metrics.MetricNames;
import com.hazelcast.jet.core.metrics.MetricTags;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.CancellationByUserException;
import com.hazelcast.jet.impl.exception.ExecutionNotFoundException;
import com.hazelcast.jet.impl.exception.JetDisabledException;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.operation.GetLocalJobMetricsOperation;
import com.hazelcast.jet.impl.operation.InitExecutionOperation;
import com.hazelcast.jet.impl.operation.StartExecutionOperation;
import com.hazelcast.jet.impl.operation.TerminateExecutionOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.version.Version;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/MasterJobContext.class */
public class MasterJobContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    public static final String SNAPSHOT_VERTEX_PREFIX = "__snapshot_";
    private static final int COLLECT_METRICS_RETRY_DELAY_MILLIS = 100;
    private static final Runnable NO_OP;
    private final MasterContext mc;
    private final ILogger logger;
    private final int defaultParallelism;
    private final int defaultQueueSize;
    private volatile ExecutionFailureCallback executionFailureCallback;
    private volatile Set<Vertex> vertices;
    private volatile TerminationRequest terminationRequest;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile long executionStartTime = System.currentTimeMillis();

    @Nonnull
    private volatile List<RawJobMetrics> jobMetrics = Collections.emptyList();

    @Nonnull
    private volatile CompletableFuture<Void> executionCompletionFuture = CompletableFuture.completedFuture(null);
    private final NonCompletableFuture jobCompletionFuture = new NonCompletableFuture();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/MasterJobContext$ExecutionFailureCallback.class */
    public class ExecutionFailureCallback implements BiConsumer<Address, Object> {
        private final AtomicBoolean invocationsCancelled = new AtomicBoolean();
        private final long executionId;
        private final Map<Address, CompletableFuture<Void>> startOperationResponses;

        ExecutionFailureCallback(long j, Map<Address, CompletableFuture<Void>> map) {
            this.executionId = j;
            this.startOperationResponses = map;
        }

        @Override // java.util.function.BiConsumer
        public void accept(Address address, Object obj) {
            LoggingUtil.logFine(MasterJobContext.this.logger, "%s received response to StartExecutionOperation from %s: %s", MasterJobContext.this.mc.jobIdString(), address, obj);
            CompletableFuture<Void> completableFuture = this.startOperationResponses.get(address);
            if (!(obj instanceof Throwable)) {
                completableFuture.complete(null);
                return;
            }
            Throwable th = (Throwable) obj;
            completableFuture.completeExceptionally(th);
            if (ExceptionUtil.peel(th) instanceof TerminatedWithSnapshotException) {
                return;
            }
            cancelInvocations(null);
        }

        void cancelInvocations(TerminationMode terminationMode) {
            if (this.invocationsCancelled.compareAndSet(false, true)) {
                MasterJobContext.this.cancelExecutionInvocations(MasterJobContext.this.mc.jobId(), this.executionId, terminationMode, null);
            }
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/MasterJobContext$SnapshotRestoreEdge.class */
    public static class SnapshotRestoreEdge extends Edge {
        SnapshotRestoreEdge(Vertex vertex, int i, Vertex vertex2, int i2) {
            super(vertex, i, vertex2, i2);
            distributed();
            partitioned(Functions.entryKey());
        }

        @Override // com.hazelcast.jet.core.Edge
        public int getPriority() {
            return MasterJobContext.SNAPSHOT_RESTORE_EDGE_PRIORITY;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/MasterJobContext$TerminationRequest.class */
    public static class TerminationRequest {
        private final TerminationMode mode;
        private final boolean userInitiated;

        public TerminationRequest(@Nonnull TerminationMode terminationMode, boolean z) {
            this.mode = terminationMode;
            this.userInitiated = z;
        }

        TerminationMode getMode() {
            return this.mode;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isUserInitiated() {
            return this.userInitiated;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MasterJobContext(MasterContext masterContext, ILogger iLogger) {
        this.mc = masterContext;
        this.logger = iLogger;
        this.defaultParallelism = this.mc.getJetServiceBackend().getJetConfig().getCooperativeThreadCount();
        this.defaultQueueSize = this.mc.getJetServiceBackend().getJetConfig().getDefaultEdgeConfig().getQueueSize();
    }

    public CompletableFuture<Void> jobCompletionFuture() {
        return this.jobCompletionFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<TerminationRequest> getTerminationRequest() {
        return Optional.ofNullable(this.terminationRequest);
    }

    Optional<TerminationMode> requestedTerminationMode() {
        return getTerminationRequest().map((v0) -> {
            return v0.getMode();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isUserInitiatedTermination() {
        return ((Boolean) getTerminationRequest().map((v0) -> {
            return v0.isUserInitiated();
        }).orElse(false)).booleanValue();
    }

    private Throwable createCancellationException() {
        return isUserInitiatedTermination() ? new CancellationByUserException() : new CancellationException();
    }

    private boolean isCancelled() {
        return ((Boolean) requestedTerminationMode().map(terminationMode -> {
            return Boolean.valueOf(terminationMode == TerminationMode.CANCEL_FORCEFUL);
        }).orElse(false)).booleanValue();
    }

    private boolean isCancelledGracefully() {
        return ((Boolean) requestedTerminationMode().map(terminationMode -> {
            return Boolean.valueOf(terminationMode == TerminationMode.CANCEL_GRACEFUL);
        }).orElse(false)).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryStartJob(Supplier<Long> supplier) {
        JobCoordinationService coordinationService = this.mc.coordinationService();
        MembersView membersView = Util.getMembersView(this.mc.nodeEngine());
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            try {
                this.executionStartTime = System.currentTimeMillis();
                JobExecutionRecord jobExecutionRecord = this.mc.jobExecutionRecord();
                jobExecutionRecord.markExecuted();
                DAG resolveDag = resolveDag(supplier);
                if (resolveDag == null) {
                    return;
                }
                String dotString = resolveDag.toDotString(this.defaultParallelism, this.defaultQueueSize);
                long snapshotId = jobExecutionRecord.snapshotId();
                String initialSnapshotName = this.mc.jobConfig().getInitialSnapshotName();
                String successfulSnapshotDataMapName = snapshotId >= 0 ? jobExecutionRecord.successfulSnapshotDataMapName(this.mc.jobId()) : initialSnapshotName != null ? JobRepository.EXPORTED_SNAPSHOTS_PREFIX + initialSnapshotName : null;
                if (successfulSnapshotDataMapName != null) {
                    rewriteDagWithSnapshotRestore(resolveDag, snapshotId, successfulSnapshotDataMapName, initialSnapshotName);
                } else {
                    this.logger.info("Didn't find any snapshot to restore for " + this.mc.jobIdString());
                }
                this.logger.info("Start executing " + this.mc.jobIdString() + ", execution graph in DOT format:\n" + dotString + "\nHINT: You can use graphviz or http://viz-js.com to visualize the printed graph.");
                this.logger.fine("Building execution plan for " + this.mc.jobIdString());
                createExecutionPlans(resolveDag, membersView).thenCompose(map -> {
                    return coordinationService.submitToCoordinatorThread(() -> {
                        initExecution(membersView, map);
                    });
                }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r4, th) -> {
                    if (th != null) {
                        finalizeExecution(ExceptionUtil.peel(th));
                    }
                });
            } catch (Throwable th2) {
                finalizeExecution(th2);
            }
        });
    }

    private CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionPlans(DAG dag, MembersView membersView) {
        return ExecutionPlanBuilder.createExecutionPlans(this.mc.nodeEngine(), membersView.getMembers(), dag, this.mc.jobId(), this.mc.executionId(), this.mc.jobConfig(), this.mc.jobExecutionRecord().ongoingSnapshotId(), false, this.mc.jobRecord().getSubject());
    }

    private void initExecution(MembersView membersView, Map<MemberInfo, ExecutionPlan> map) {
        this.mc.setExecutionPlanMap(map);
        this.logger.fine("Built execution plans for " + this.mc.jobIdString());
        Set<MemberInfo> keySet = this.mc.executionPlanMap().keySet();
        Version asVersion = this.mc.nodeEngine().getLocalMember().getVersion().asVersion();
        this.mc.invokeOnParticipants(executionPlan -> {
            return new InitExecutionOperation(this.mc.jobId(), this.mc.executionId(), membersView.getVersion(), asVersion, keySet, this.mc.nodeEngine().getSerializationService().toData(executionPlan), false);
        }, this::onInitStepCompleted, null, false);
    }

    @Nullable
    private DAG resolveDag(Supplier<Long> supplier) {
        this.mc.lock();
        try {
            if (isCancelled()) {
                this.logger.fine("Skipping init job '" + this.mc.jobName() + "': is already cancelled.");
                throw new CancellationException();
            }
            if (this.mc.jobStatus() != JobStatus.NOT_RUNNING) {
                this.logger.fine("Not starting job '" + this.mc.jobName() + "': status is " + this.mc.jobStatus());
                this.mc.unlock();
                return null;
            }
            if (this.mc.jobExecutionRecord().isSuspended()) {
                this.mc.jobExecutionRecord().clearSuspended();
                this.mc.writeJobExecutionRecord(false);
            }
            if (scheduleRestartIfQuorumAbsent() || scheduleRestartIfClusterIsNotSafe()) {
                return null;
            }
            Version clusterVersion = this.mc.jobRecord().getClusterVersion();
            Version clusterVersion2 = this.mc.nodeEngine().getClusterService().getClusterVersion();
            if (!clusterVersion.equals(clusterVersion2)) {
                throw new JetException("Cancelling job " + this.mc.jobName() + ": the cluster was upgraded since the job was submitted. Submitted to version: " + clusterVersion + ", current cluster version: " + clusterVersion2);
            }
            this.mc.setJobStatus(JobStatus.STARTING);
            this.mc.writeJobExecutionRecord(true);
            if (this.terminationRequest != null) {
                if (this.terminationRequest.mode.actionAfterTerminate() != TerminationMode.ActionAfterTerminate.RESTART) {
                    throw new JobTerminateRequestedException(this.terminationRequest.mode);
                }
                this.terminationRequest = null;
            }
            ClassLoader orCreateClassLoader = this.mc.getJetServiceBackend().getJobClassLoaderService().getOrCreateClassLoader(this.mc.jobConfig(), this.mc.jobId(), JobClassLoaderService.JobPhase.COORDINATOR);
            JobClassLoaderService jobClassLoaderService = this.mc.getJetServiceBackend().getJobClassLoaderService();
            try {
                try {
                    jobClassLoaderService.prepareProcessorClassLoaders(this.mc.jobId());
                    DAG dag = (DAG) CustomClassLoadedObject.deserializeWithCustomClassLoader(this.mc.nodeEngine().getSerializationService(), orCreateClassLoader, this.mc.jobRecord().getDag());
                    jobClassLoaderService.clearProcessorClassLoaders();
                    this.vertices = new HashSet();
                    Iterator<Vertex> it = dag.iterator();
                    Set<Vertex> set = this.vertices;
                    set.getClass();
                    it.forEachRemaining((v1) -> {
                        r1.add(v1);
                    });
                    this.mc.setExecutionId(supplier.get().longValue());
                    this.mc.snapshotContext().onExecutionStarted();
                    this.executionCompletionFuture = new CompletableFuture<>();
                    this.mc.unlock();
                    return dag;
                } catch (Throwable th) {
                    jobClassLoaderService.clearProcessorClassLoaders();
                    throw th;
                }
            } catch (Exception e) {
                throw new JetException("DAG deserialization failed", e);
            }
        } finally {
            this.mc.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public Tuple2<CompletableFuture<Void>, String> requestTermination(TerminationMode terminationMode, boolean z, boolean z2) {
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (this.mc.jobConfig().getProcessingGuarantee() == ProcessingGuarantee.NONE && terminationMode != TerminationMode.CANCEL_GRACEFUL) {
            terminationMode = terminationMode.withoutTerminalSnapshot();
        }
        this.mc.lock();
        try {
            JobStatus jobStatus = this.mc.jobStatus();
            if (jobStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT && !z) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Cannot cancel when job status is " + JobStatus.SUSPENDED_EXPORTING_SNAPSHOT);
                this.mc.unlock();
                return tuple2;
            }
            if (jobStatus == JobStatus.SUSPENDED && terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                Tuple2<CompletableFuture<Void>, String> tuple22 = Tuple2.tuple2(this.executionCompletionFuture, "Job is " + JobStatus.SUSPENDED);
                this.mc.unlock();
                return tuple22;
            }
            if (this.terminationRequest != null) {
                Tuple2<CompletableFuture<Void>, String> tuple23 = Tuple2.tuple2(this.executionCompletionFuture, (this.terminationRequest.mode == TerminationMode.CANCEL_FORCEFUL && terminationMode == TerminationMode.CANCEL_FORCEFUL) ? null : "Job is already terminating in mode: " + this.terminationRequest.mode.name());
                this.mc.unlock();
                return tuple23;
            }
            this.terminationRequest = new TerminationRequest(terminationMode, z2);
            if (jobStatus == JobStatus.SUSPENDED || jobStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                this.mc.setJobStatus(JobStatus.FAILED, terminationMode.actionAfterTerminate().description(), true);
                setFinalResult(createCancellationException());
            }
            if (terminationMode.isWithTerminalSnapshot()) {
                this.mc.snapshotContext().enqueueSnapshot(null, true, null);
            }
            Tuple2<CompletableFuture<Void>, String> tuple24 = Tuple2.tuple2(this.executionCompletionFuture, null);
            this.mc.unlock();
            if (jobStatus == JobStatus.SUSPENDED || jobStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                try {
                    this.mc.coordinationService().completeJob(this.mc, createCancellationException(), System.currentTimeMillis(), z2).get();
                } catch (Exception e) {
                    throw ExceptionUtil.rethrow(e);
                }
            } else if (jobStatus == JobStatus.RUNNING || jobStatus == JobStatus.STARTING) {
                handleTermination(terminationMode);
            }
            return tuple24;
        } catch (Throwable th) {
            this.mc.unlock();
            throw th;
        }
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long j, String str, String str2) {
        long validateSnapshot = SnapshotValidator.validateSnapshot(j, this.mc.nodeEngine().getHazelcastInstance().getMap(str), this.mc.jobIdString(), str2);
        this.logger.info(String.format("About to restore the state of %s from snapshot %d, mapName = %s", this.mc.jobIdString(), Long.valueOf(validateSnapshot), str));
        ArrayList<Vertex> arrayList = new ArrayList();
        Iterator<Vertex> it = dag.iterator();
        arrayList.getClass();
        it.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        HashMap hashMap = new HashMap();
        Vertex newVertex = dag.newVertex("__snapshot_read", SourceProcessors.readMapP(str));
        Vertex newVertex2 = dag.newVertex("__snapshot_explode", () -> {
            return new ExplodeSnapshotP(hashMap, validateSnapshot);
        });
        dag.edge(Edge.between(newVertex, newVertex2).isolated());
        int i = 0;
        for (Vertex vertex : arrayList) {
            hashMap.put(vertex.getName(), Integer.valueOf(i));
            dag.edge(new SnapshotRestoreEdge(newVertex2, i, vertex, dag.getInboundEdges(vertex.getName()).size()));
            i++;
        }
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.mc.jobExecutionRecord().getQuorumSize();
        if (this.mc.coordinationService().isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': quorum size " + quorumSize + " is not met");
        scheduleRestart("Quorum is absent");
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.mc.coordinationService().shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': cluster is not safe");
        scheduleRestart("Cluster is not safe");
        return true;
    }

    private void scheduleRestart(String str) {
        this.mc.assertLockHeld();
        JobStatus jobStatus = this.mc.jobStatus();
        if (jobStatus != JobStatus.NOT_RUNNING && jobStatus != JobStatus.STARTING && jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Restart scheduled in an unexpected state: " + jobStatus);
        }
        this.mc.setJobStatus(JobStatus.NOT_RUNNING, str, false);
        this.mc.coordinationService().scheduleRestart(this.mc.jobId());
    }

    private void onInitStepCompleted(Collection<Map.Entry<MemberInfo, Object>> collection) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Throwable errorFromResponses = getErrorFromResponses("Init", collection);
            JobStatus jobStatus = this.mc.jobStatus();
            if (errorFromResponses == null && jobStatus == JobStatus.STARTING) {
                invokeStartExecution();
            } else {
                cancelExecutionInvocations(this.mc.jobId(), this.mc.executionId(), null, () -> {
                    onStartExecutionComplete(errorFromResponses != null ? errorFromResponses : new IllegalStateException("Cannot execute " + this.mc.jobIdString() + ": status is " + jobStatus), Collections.emptyList());
                });
            }
        });
    }

    private void invokeStartExecution() {
        this.logger.fine("Executing " + this.mc.jobIdString());
        long executionId = this.mc.executionId();
        this.mc.resetStartOperationResponses();
        this.executionFailureCallback = new ExecutionFailureCallback(executionId, this.mc.startOperationResponses());
        getTerminationRequest().ifPresent(terminationRequest -> {
            handleTermination(terminationRequest.getMode());
        });
        boolean isStoreMetricsAfterJobCompletion = this.mc.jobConfig().isStoreMetricsAfterJobCompletion();
        Function<ExecutionPlan, Operation> function = executionPlan -> {
            return new StartExecutionOperation(this.mc.jobId(), executionId, isStoreMetricsAfterJobCompletion);
        };
        Consumer<Collection<Map.Entry<MemberInfo, Object>>> consumer = collection -> {
            onStartExecutionComplete(getErrorFromResponses("Execution", collection), collection);
        };
        this.mc.setJobStatus(JobStatus.RUNNING);
        this.mc.snapshotContext().tryBeginSnapshot();
        this.mc.invokeOnParticipants(function, consumer, this.executionFailureCallback, false);
        if (this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
            this.mc.coordinationService().scheduleSnapshot(this.mc, executionId);
        }
    }

    private void handleTermination(@Nonnull TerminationMode terminationMode) {
        if (terminationMode.isWithTerminalSnapshot()) {
            this.mc.snapshotContext().tryBeginSnapshot();
        } else if (this.executionFailureCallback != null) {
            this.executionFailureCallback.cancelInvocations(terminationMode);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setFinalResult(Throwable th) {
        if (th == null) {
            this.jobCompletionFuture.internalComplete();
        } else {
            this.jobCompletionFuture.internalCompleteExceptionally(th);
        }
    }

    private Throwable getErrorFromResponses(String str, Collection<Map.Entry<MemberInfo, Object>> collection) {
        if (isCancelled()) {
            this.logger.fine(this.mc.jobIdString() + " to be cancelled after " + str);
            return createCancellationException();
        }
        Map map = (Map) collection.stream().map(entry -> {
            return com.hazelcast.jet.Util.entry(((MemberInfo) entry.getKey()).getAddress(), entry.getValue());
        }).collect(Collectors.partitioningBy(entry2 -> {
            return entry2.getValue() instanceof Throwable;
        }));
        if (((List) map.getOrDefault(false, Collections.emptyList())).size() == this.mc.executionPlanMap().size()) {
            this.logger.fine(str + " of " + this.mc.jobIdString() + " was successful");
            return null;
        }
        List list = (List) map.getOrDefault(true, Collections.emptyList());
        if (!list.isEmpty()) {
            this.logger.fine(str + " of " + this.mc.jobIdString() + " has failures: " + list);
        }
        if (!list.stream().allMatch(entry3 -> {
            return entry3.getValue() instanceof TerminatedWithSnapshotException;
        })) {
            Map map2 = (Map) list.stream().collect(Collectors.partitioningBy(entry4 -> {
                return (entry4.getValue() instanceof CancellationException) || (entry4.getValue() instanceof TerminatedWithSnapshotException) || ExceptionUtil.isTopologyException((Throwable) entry4.getValue());
            }));
            List list2 = (List) map2.getOrDefault(true, Collections.emptyList());
            List list3 = (List) map2.getOrDefault(false, Collections.emptyList());
            return !list3.isEmpty() ? (Throwable) ((Map.Entry) list3.get(0)).getValue() : new TopologyChangedException("Causes from members: " + list2);
        }
        if (!$assertionsDisabled && !str.equals("Execution")) {
            throw new AssertionError("opName is '" + str + "', expected 'Execution'");
        }
        this.logger.fine(str + " of " + this.mc.jobIdString() + " terminated after a terminal snapshot");
        TerminationMode orElseThrow = requestedTerminationMode().orElseThrow(() -> {
            return new AssertionError("mode is null");
        });
        if ($assertionsDisabled || orElseThrow.isWithTerminalSnapshot()) {
            return orElseThrow == TerminationMode.CANCEL_GRACEFUL ? new CancellationException() : new JobTerminateRequestedException(orElseThrow);
        }
        throw new AssertionError("mode=" + orElseThrow);
    }

    private void logCannotComplete(Throwable th) {
        if (th != null) {
            this.logger.severe("Cannot properly complete failed " + this.mc.jobIdString() + ": status is " + this.mc.jobStatus(), th);
        } else {
            this.logger.severe("Cannot properly complete " + this.mc.jobIdString() + ": status is " + this.mc.jobStatus());
        }
    }

    private void onStartExecutionComplete(Throwable th, Collection<Map.Entry<MemberInfo, Object>> collection) {
        JobStatus jobStatus = this.mc.jobStatus();
        if (jobStatus != JobStatus.STARTING && jobStatus != JobStatus.RUNNING) {
            logCannotComplete(th);
            th = new IllegalStateException("Job coordination failed");
        }
        setJobMetrics((List) collection.stream().filter(entry -> {
            return entry.getValue() instanceof RawJobMetrics;
        }).map(entry2 -> {
            return (RawJobMetrics) entry2.getValue();
        }).collect(Collectors.toList()));
        if ((th instanceof JobTerminateRequestedException) && ((JobTerminateRequestedException) th).mode().isWithTerminalSnapshot()) {
            Throwable th2 = th;
            this.mc.snapshotContext().terminalSnapshotFuture().whenCompleteAsync(ExceptionUtil.withTryCatch(this.logger, (r5, th3) -> {
                finalizeExecution(th2);
            }));
        } else {
            if (th instanceof ExecutionNotFoundException) {
                Throwable th4 = th;
                th = (Throwable) getTerminationRequest().map(terminationRequest -> {
                    return new JobTerminateRequestedException(terminationRequest.getMode()).initCause(th4);
                }).orElse(th4);
            }
            finalizeExecution(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelExecutionInvocations(long j, long j2, TerminationMode terminationMode, Runnable runnable) {
        this.mc.nodeEngine().getExecutionService().execute(ExecutionService.ASYNC_EXECUTOR, () -> {
            this.mc.invokeOnParticipants(executionPlan -> {
                return new TerminateExecutionOperation(j, j2, terminationMode);
            }, collection -> {
                if (collection.stream().map((v0) -> {
                    return v0.getValue();
                }).filter(obj -> {
                    return !(obj instanceof JetDisabledException);
                }).anyMatch(Objects::nonNull)) {
                    this.logger.severe(this.mc.jobIdString() + ": some TerminateExecutionOperation invocations failed, execution might remain stuck: " + collection);
                }
                if (runnable != null) {
                    runnable.run();
                }
            }, null, true);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finalizeExecution(@Nullable Throwable th) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            this.mc.lock();
            JobStatus jobStatus = this.mc.jobStatus();
            if (jobStatus == JobStatus.COMPLETED || jobStatus == JobStatus.FAILED) {
                logIgnoredCompletion(th, jobStatus);
            }
            this.mc.unlock();
        }).thenComposeAsync(r5 -> {
            return completeVertices(th);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
            return this.mc.coordinationService().submitToCoordinatorThread(() -> {
                Runnable runnable;
                try {
                    this.mc.lock();
                    this.mc.getJetServiceBackend().getJobClassLoaderService().tryRemoveClassloadersForJob(this.mc.jobId(), JobClassLoaderService.JobPhase.COORDINATOR);
                    TerminationMode.ActionAfterTerminate actionAfterTerminate = th instanceof JobTerminateRequestedException ? ((JobTerminateRequestedException) th).mode().actionAfterTerminate() : null;
                    this.mc.snapshotContext().onExecutionTerminated();
                    String str = (String) requestedTerminationMode().map(terminationMode -> {
                        return terminationMode.actionAfterTerminate().description();
                    }).orElse(th != null ? th.toString() : null);
                    boolean isUserInitiatedTermination = isUserInitiatedTermination();
                    if (actionAfterTerminate == TerminationMode.ActionAfterTerminate.RESTART) {
                        this.mc.setJobStatus(JobStatus.NOT_RUNNING, str, isUserInitiatedTermination);
                        runnable = () -> {
                            this.mc.coordinationService().restartJob(this.mc.jobId());
                        };
                    } else if (!isCancelled() && ExceptionUtil.isRestartableException(th) && this.mc.jobConfig().isAutoScaling()) {
                        scheduleRestart(str);
                        runnable = NO_OP;
                    } else if (actionAfterTerminate == TerminationMode.ActionAfterTerminate.SUSPEND || !(!ExceptionUtil.isRestartableException(th) || isCancelled() || this.mc.jobConfig().isAutoScaling() || this.mc.jobConfig().getProcessingGuarantee() == ProcessingGuarantee.NONE)) {
                        this.mc.setJobStatus(JobStatus.SUSPENDED, str, isUserInitiatedTermination);
                        this.mc.jobExecutionRecord().setSuspended(null);
                        runnable = () -> {
                            this.mc.writeJobExecutionRecord(false);
                        };
                    } else if (th == null || isCancelled() || isCancelledGracefully() || !this.mc.jobConfig().isSuspendOnFailure()) {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.mc.setJobStatus(logExecutionSummary(th, currentTimeMillis) ? JobStatus.COMPLETED : JobStatus.FAILED, str, isUserInitiatedTermination);
                        if (th instanceof LocalMemberResetException) {
                            this.logger.fine("Cancelling job " + this.mc.jobIdString() + " locally: member (local or remote) reset. We don't delete job metadata: job will restart on majority cluster");
                            setFinalResult(new CancellationException());
                        } else {
                            this.mc.coordinationService().completeJob(this.mc, th, currentTimeMillis, isUserInitiatedTermination()).whenComplete(ExceptionUtil.withTryCatch(this.logger, (r6, th2) -> {
                                if (th2 != null) {
                                    this.logger.warning("Completion of " + this.mc.jobIdString() + " failed", th2);
                                } else {
                                    setFinalResult(th);
                                }
                            }));
                        }
                        runnable = NO_OP;
                    } else {
                        this.mc.setJobStatus(JobStatus.SUSPENDED, str, isUserInitiatedTermination);
                        this.mc.jobExecutionRecord().setSuspended("Execution failure:\n" + ExceptionUtil.stackTraceToString(th));
                        runnable = () -> {
                            this.mc.writeJobExecutionRecord(false);
                        };
                    }
                    this.terminationRequest = null;
                    this.executionFailureCallback = null;
                    this.mc.unlock();
                    this.executionCompletionFuture.complete(null);
                    runnable.run();
                } catch (Throwable th3) {
                    this.mc.unlock();
                    throw th3;
                }
            });
        });
    }

    private boolean logExecutionSummary(@Nullable Throwable th, long j) {
        if (th == null) {
            this.logger.info(formatExecutionSummary("completed successfully", j));
            return true;
        }
        if ((th instanceof CancellationException) || (th instanceof JobTerminateRequestedException)) {
            this.logger.info(formatExecutionSummary("got terminated, reason=" + th, j));
            return false;
        }
        if (th instanceof JetDisabledException) {
            this.logger.severe(formatExecutionSummary("failed. This is probably because the Jet engine is not enabled on all cluster members. Please enable the Jet engine for ALL members in the cluster.", j), th);
            return false;
        }
        this.logger.severe(formatExecutionSummary("failed", j), th);
        return false;
    }

    private String formatExecutionSummary(String str, long j) {
        StringBuilder sb = new StringBuilder();
        sb.append("Execution of ").append(this.mc.jobIdString()).append(' ').append(str);
        sb.append("\n\t").append("Start time: ").append(Util.toLocalDateTime(this.executionStartTime));
        sb.append("\n\t").append("Duration: ").append(Util.formatJobDuration(j - this.executionStartTime));
        if (this.jobMetrics.stream().noneMatch(rawJobMetrics -> {
            return rawJobMetrics.getBlob() != null;
        })) {
            sb.append("\n\tTo see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion");
        } else {
            JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(this.jobMetrics);
            Map<String, Long> mergeByVertex = mergeByVertex(jobMetrics.get(MetricNames.RECEIVED_COUNT));
            Map<String, Long> mergeByVertex2 = mergeByVertex(jobMetrics.get(MetricNames.EMITTED_COUNT));
            Map<String, Long> mergeByVertex3 = mergeByVertex(jobMetrics.get(MetricNames.DISTRIBUTED_BYTES_IN));
            Map<String, Long> mergeByVertex4 = mergeByVertex(jobMetrics.get(MetricNames.DISTRIBUTED_BYTES_OUT));
            sb.append("\n\tVertices:");
            for (Vertex vertex : this.vertices) {
                sb.append("\n\t\t").append(vertex.getName());
                sb.append(getValueForVertex("\n\t\t\treceivedCount", vertex, mergeByVertex));
                sb.append(getValueForVertex("\n\t\t\temittedCount", vertex, mergeByVertex2));
                sb.append(getValueForVertex("\n\t\t\tdistributedBytesIn", vertex, mergeByVertex3));
                sb.append(getValueForVertex("\n\t\t\tdistributedBytesOut", vertex, mergeByVertex4));
            }
        }
        return sb.toString();
    }

    private static Map<String, Long> mergeByVertex(List<Measurement> list) {
        return (Map) list.stream().collect(Collectors.toMap(measurement -> {
            return measurement.tag(MetricTags.VERTEX);
        }, (v0) -> {
            return v0.value();
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }));
    }

    private static String getValueForVertex(String str, Vertex vertex, Map<String, Long> map) {
        Long l = map.get(vertex.getName());
        return l == null ? "" : str + ": " + l;
    }

    private void logIgnoredCompletion(@Nullable Throwable th, JobStatus jobStatus) {
        if (th != null) {
            this.logger.severe("Ignoring failure completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + jobStatus, th);
        } else {
            this.logger.severe("Ignoring completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + jobStatus);
        }
    }

    private CompletableFuture<Void> completeVertices(@Nullable Throwable th) {
        if (this.vertices == null) {
            return CompletableFuture.completedFuture(null);
        }
        ManagedExecutorService executor = this.mc.nodeEngine().getExecutionService().getExecutor(ExecutionService.JOB_OFFLOADABLE_EXECUTOR);
        ArrayList arrayList = new ArrayList(this.vertices.size());
        JobClassLoaderService jobClassLoaderService = this.mc.getJetServiceBackend().getJobClassLoaderService();
        for (Vertex vertex : this.vertices) {
            ProcessorMetaSupplier metaSupplier = vertex.getMetaSupplier();
            arrayList.add(CompletableFuture.runAsync(() -> {
                try {
                    Util.doWithClassLoader(jobClassLoaderService.getProcessorClassLoader(this.mc.jobId(), vertex.getName()), () -> {
                        metaSupplier.close(th);
                    });
                } catch (Throwable th2) {
                    this.logger.severe(this.mc.jobIdString() + " encountered an exception in ProcessorMetaSupplier.close(), ignoring it", th2);
                }
            }, metaSupplier.closeIsCooperative() ? ConcurrencyUtil.CALLER_RUNS : executor));
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resumeJob(Supplier<Long> supplier) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.SUSPENDED) {
                this.logger.info("Not resuming " + this.mc.jobIdString() + ": not " + JobStatus.SUSPENDED + ", but " + this.mc.jobStatus());
                return;
            }
            this.mc.setJobStatus(JobStatus.NOT_RUNNING, "Resume", true);
            this.logger.fine("Resuming job " + this.mc.jobName());
            tryStartJob(supplier);
        } finally {
            this.mc.unlock();
        }
    }

    private boolean hasParticipant(UUID uuid) {
        Map<MemberInfo, ExecutionPlan> executionPlanMap = this.mc.executionPlanMap();
        return this.mc.nodeEngine().getLocalMember().getUuid().equals(uuid) || (executionPlanMap != null && executionPlanMap.keySet().stream().anyMatch(memberInfo -> {
            return memberInfo.getUuid().equals(uuid);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public CompletableFuture<Void> onParticipantGracefulShutdown(UUID uuid) {
        return hasParticipant(uuid) ? gracefullyTerminate() : CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public CompletableFuture<Void> gracefullyTerminate() {
        return this.mc.coordinationService().submitToCoordinatorThread(() -> {
            return requestTermination(TerminationMode.RESTART_GRACEFUL, false, false).f0();
        }).thenCompose(Function.identity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean maybeScaleUp(int i) {
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (!this.mc.jobConfig().isAutoScaling()) {
            return true;
        }
        if (this.mc.executionPlanMap() == null || this.mc.executionPlanMap().size() == i) {
            LoggingUtil.logFine(this.logger, "Not scaling up %s: not running or already running on all members", this.mc.jobIdString());
            return true;
        }
        if (this.mc.jobStatus() != JobStatus.RUNNING || requestTermination(TerminationMode.RESTART_GRACEFUL, false, false).f1() != null) {
            return false;
        }
        this.logger.info("Requested restart of " + this.mc.jobIdString() + " to make use of added member(s). Job was running on " + this.mc.executionPlanMap().size() + " members, cluster now has " + i + " data members with assigned partitions");
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<RawJobMetrics> jobMetrics() {
        return this.jobMetrics;
    }

    private void setJobMetrics(List<RawJobMetrics> list) {
        if (!$assertionsDisabled && !list.stream().allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            throw new AssertionError("responses=" + list);
        }
        this.jobMetrics = (List) Objects.requireNonNull(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void collectMetrics(CompletableFuture<List<RawJobMetrics>> completableFuture) {
        if (this.mc.jobStatus() != JobStatus.RUNNING) {
            completableFuture.complete(this.jobMetrics);
            return;
        }
        long jobId = this.mc.jobId();
        long executionId = this.mc.executionId();
        this.mc.invokeOnParticipants(executionPlan -> {
            return new GetLocalJobMetricsOperation(jobId, executionId);
        }, collection -> {
            completeWithMetrics(completableFuture, collection);
        }, null, false);
    }

    private void completeWithMetrics(CompletableFuture<List<RawJobMetrics>> completableFuture, Collection<Map.Entry<MemberInfo, Object>> collection) {
        if (collection.stream().anyMatch(entry -> {
            return entry.getValue() instanceof ExecutionNotFoundException;
        })) {
            LoggingUtil.logFinest(this.logger, "Rescheduling collectMetrics for %s, some members threw %s", this.mc.jobIdString(), ExecutionNotFoundException.class.getSimpleName());
            this.mc.nodeEngine().getExecutionService().schedule(() -> {
                collectMetrics(completableFuture);
            }, 100L, TimeUnit.MILLISECONDS);
            return;
        }
        Stream<R> map = collection.stream().map((v0) -> {
            return v0.getValue();
        });
        Class<Throwable> cls = Throwable.class;
        Throwable.class.getClass();
        Throwable th = (Throwable) map.filter(cls::isInstance).findFirst().orElse(null);
        if (th != null) {
            completableFuture.completeExceptionally(th);
        } else {
            completableFuture.complete(Util.toList(collection, entry2 -> {
                return (RawJobMetrics) entry2.getValue();
            }));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -654404436:
                if (implMethodName.equals("lambda$rewriteDagWithSnapshotRestore$15d4b860$1")) {
                    z = true;
                    break;
                }
                break;
            case 832077641:
                if (implMethodName.equals("lambda$completeVertices$4a03f629$1")) {
                    z = false;
                    break;
                }
                break;
            case 1237055423:
                if (implMethodName.equals("lambda$null$ef0fe15d$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/MasterJobContext") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/impl/JobClassLoaderService;Lcom/hazelcast/jet/core/Vertex;Lcom/hazelcast/jet/core/ProcessorMetaSupplier;Ljava/lang/Throwable;)V")) {
                    MasterJobContext masterJobContext = (MasterJobContext) serializedLambda.getCapturedArg(0);
                    JobClassLoaderService jobClassLoaderService = (JobClassLoaderService) serializedLambda.getCapturedArg(1);
                    Vertex vertex = (Vertex) serializedLambda.getCapturedArg(2);
                    ProcessorMetaSupplier processorMetaSupplier = (ProcessorMetaSupplier) serializedLambda.getCapturedArg(3);
                    Throwable th = (Throwable) serializedLambda.getCapturedArg(4);
                    return () -> {
                        try {
                            Util.doWithClassLoader(jobClassLoaderService.getProcessorClassLoader(this.mc.jobId(), vertex.getName()), () -> {
                                processorMetaSupplier.close(th);
                            });
                        } catch (Throwable th2) {
                            this.logger.severe(this.mc.jobIdString() + " encountered an exception in ProcessorMetaSupplier.close(), ignoring it", th2);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/MasterJobContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;J)Lcom/hazelcast/jet/core/Processor;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return () -> {
                        return new ExplodeSnapshotP(map, longValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/MasterJobContext") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/ProcessorMetaSupplier;Ljava/lang/Throwable;)V")) {
                    ProcessorMetaSupplier processorMetaSupplier2 = (ProcessorMetaSupplier) serializedLambda.getCapturedArg(0);
                    Throwable th2 = (Throwable) serializedLambda.getCapturedArg(1);
                    return () -> {
                        processorMetaSupplier2.close(th2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !MasterJobContext.class.desiredAssertionStatus();
        NO_OP = () -> {
        };
    }
}
