package com.hazelcast.jet.core;

import com.hazelcast.jet.Job;
import com.hazelcast.jet.TestInClusterSupport;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.map.IMap;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/SuspendExecutionOnFailureTest.class */
public class SuspendExecutionOnFailureTest extends TestInClusterSupport {
    private static final Throwable MOCK_ERROR = new AssertionError("mock error");
    private JobConfig jobConfig;

    @Before
    public void before() {
        this.jobConfig = new JobConfig().setSuspendOnFailure(true);
        TestProcessors.reset(0);
    }

    @Test
    public void when_jobRunning_then_suspensionCauseThrows() {
        Job newJob = hz().getJet().newJob(new DAG().vertex(new Vertex("test", () -> {
            return new TestProcessors.NoOutputSourceP();
        })), this.jobConfig);
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.getClass();
        Assertions.assertThatThrownBy(newJob::getSuspensionCause).isInstanceOf(IllegalStateException.class).hasMessage("Job not suspended");
        cancelAndJoin(newJob);
    }

    @Test
    public void when_jobCompleted_then_suspensionCauseThrows() {
        Job newJob = hz().getJet().newJob(new DAG().vertex(new Vertex("test", Processors.noopP())), this.jobConfig);
        newJob.join();
        Assert.assertEquals(JobStatus.COMPLETED, newJob.getStatus());
        newJob.getClass();
        Assertions.assertThatThrownBy(newJob::getSuspensionCause).isInstanceOf(IllegalStateException.class).hasMessage("Job not suspended");
    }

    @Test
    public void when_jobFailed_then_suspensionCauseThrows() {
        Job newJob = hz().getJet().newJob(new DAG().vertex(new Vertex("test", () -> {
            return new TestProcessors.NoOutputSourceP();
        })), this.jobConfig);
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.cancel();
        assertJobStatusEventually(newJob, JobStatus.FAILED);
        newJob.getClass();
        Assertions.assertThatThrownBy(newJob::getSuspensionCause).isInstanceOf(IllegalStateException.class).hasMessage("Job not suspended");
    }

    @Test
    public void when_jobSuspendedByUser_then_suspensionCauseSaysSo() {
        Job newJob = hz().getJet().newJob(new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 2))), this.jobConfig);
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        Assertions.assertThat(newJob.getSuspensionCause()).matches((v0) -> {
            return v0.requestedByUser();
        });
        Assertions.assertThat(newJob.getSuspensionCause().description()).isEqualTo("Requested by user");
        JobSuspensionCause suspensionCause = newJob.getSuspensionCause();
        suspensionCause.getClass();
        Assertions.assertThatThrownBy(suspensionCause::errorCause).isInstanceOf(IllegalStateException.class).hasMessage("Suspension not caused by an error");
        cancelAndJoin(newJob);
    }

    @Test
    public void when_jobSuspendedDueToFailure_then_suspensionCauseDescribeProblem() {
        DAG dag = new DAG();
        dag.newVertex("faulty", () -> {
            return new TestProcessors.MockP().setCompleteError(MOCK_ERROR);
        });
        this.jobConfig.setName("faultyJob");
        Job newJob = hz().getJet().newJob(dag, this.jobConfig);
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        Assertions.assertThat(newJob.getSuspensionCause()).matches((v0) -> {
            return v0.dueToError();
        });
        Assertions.assertThat(newJob.getSuspensionCause().errorCause()).isNotNull().matches(str -> {
            return str.matches("(?s)Execution failure:\ncom.hazelcast.jet.JetException: Exception in ProcessorTasklet\\{faultyJob/faulty#[0-9]+}: java.lang.AssertionError: mock error.*");
        });
        cancelAndJoin(newJob);
    }

    @Test
    public void when_jobSuspendedDueToFailure_then_canBeResumed() {
        int i = 100;
        int i2 = 50;
        StreamSource build = ((SourceBuilder.Stream) SourceBuilder.stream("src", context -> {
            return new int[1];
        }).fillBufferFn((iArr, sourceBuffer) -> {
            if (iArr[0] < i) {
                int i3 = iArr[0];
                iArr[0] = i3 + 1;
                sourceBuffer.add(Integer.valueOf(i3));
                sleepMillis(5);
            }
        }).createSnapshotFn(iArr2 -> {
            return Integer.valueOf(iArr2[0]);
        }).restoreSnapshotFn((iArr3, list) -> {
            iArr3[0] = ((Integer) list.get(0)).intValue();
        })).build();
        Pipeline create = Pipeline.create();
        create.readFrom(build).withoutTimestamps().mapUsingIMap("SuspendExecutionOnFailureTest_failureMap", num -> {
            return "key";
        }, (num2, bool) -> {
            if (bool.booleanValue() && num2.intValue() == i2) {
                throw new RuntimeException("Fail deliberately");
            }
            return Util.entry(num2, num2);
        }).setLocalParallelism(1).writeTo(Sinks.map("SuspendExecutionOnFailureTest_sinkMap"));
        IMap map = hz().getMap("SuspendExecutionOnFailureTest_failureMap");
        map.put("key", true);
        this.jobConfig.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE).setSnapshotIntervalMillis(50L);
        Job newJob = hz().getJet().newJob(create, this.jobConfig);
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        IMap map2 = hz().getMap("SuspendExecutionOnFailureTest_sinkMap");
        map.put("key", false);
        newJob.resume();
        assertTrueEventually(() -> {
            Assert.assertEquals(i, map2.size());
        });
        assertTrueEventually(() -> {
            Assert.assertEquals(JobStatus.RUNNING, newJob.getStatus());
        });
        cancelAndJoin(newJob);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1839642582:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$be1c016a$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1839642581:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$be1c016a$2")) {
                    z = true;
                    break;
                }
                break;
            case -1620611306:
                if (implMethodName.equals("lambda$when_jobRunning_then_suspensionCauseThrows$c4163097$1")) {
                    z = false;
                    break;
                }
                break;
            case -1568599906:
                if (implMethodName.equals("lambda$when_jobFailed_then_suspensionCauseThrows$c4163097$1")) {
                    z = 9;
                    break;
                }
                break;
            case -1345621793:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$f7047d1e$1")) {
                    z = 3;
                    break;
                }
                break;
            case 285365447:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$ee51c56f$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1636972292:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$e63957ac$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 8;
                    break;
                }
                break;
            case 2039419909:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_suspensionCauseDescribeProblem$fb1a34a4$1")) {
                    z = 6;
                    break;
                }
                break;
            case 2096849569:
                if (implMethodName.equals("lambda$when_jobSuspendedDueToFailure_then_canBeResumed$a441ef18$1")) {
                    z = 7;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("([I)Ljava/lang/Integer;")) {
                    return iArr2 -> {
                        return Integer.valueOf(iArr2[0]);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)[I")) {
                    return context -> {
                        return new int[1];
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Integer;Ljava/lang/Boolean;)Ljava/util/Map$Entry;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return (num2, bool) -> {
                        if (bool.booleanValue() && num2.intValue() == intValue) {
                            throw new RuntimeException("Fail deliberately");
                        }
                        return Util.entry(num2, num2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("([ILjava/util/List;)V")) {
                    return (iArr3, list) -> {
                        iArr3[0] = ((Integer) list.get(0)).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("(I[ILcom/hazelcast/jet/pipeline/SourceBuilder$SourceBuffer;)V")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return (iArr, sourceBuffer) -> {
                        if (iArr[0] < intValue2) {
                            int i3 = iArr[0];
                            iArr[0] = i3 + 1;
                            sourceBuffer.add(Integer.valueOf(i3));
                            sleepMillis(5);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.MockP().setCompleteError(MOCK_ERROR);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num -> {
                        return "key";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/SuspendExecutionOnFailureTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
