package com.hazelcast.jet;

import com.hazelcast.client.impl.clientside.ClientTestUtil;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.ExecutionLifecycleTest;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceTest;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.SlowTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({SlowTest.class})
/* loaded from: input_file:com/hazelcast/jet/JobStatusListenerTest.class */
public class JobStatusListenerTest extends SimpleTestInClusterSupport {
    private static final Function<String, String> SIMPLIFY = str -> {
        return str.replaceAll("(?<=\\().*: ", "");
    };

    @Parameterized.Parameter(0)
    public String mode;

    @Parameterized.Parameter(1)
    public Supplier<HazelcastInstance> instance;
    protected String jobIdString;
    protected UUID registrationId;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/hazelcast/jet/JobStatusListenerTest$JobStatusLogger.class */
    public static class JobStatusLogger implements JobStatusListener {
        public final List<String> log = Collections.synchronizedList(new ArrayList());
        public final UUID registrationId;
        final Job job;
        Thread originalThread;

        public JobStatusLogger(Job job) {
            this.job = job;
            this.registrationId = job.addStatusListener(this);
        }

        public void jobStatusChanged(JobStatusEvent jobStatusEvent) {
            String str = "";
            if (this.originalThread == null) {
                this.originalThread = Thread.currentThread();
            } else if (this.originalThread != Thread.currentThread()) {
                str = String.format(" (invoked from different thread: expected %s but invoked on %s)", this.originalThread.getName(), Thread.currentThread().getName());
            }
            List<String> list = this.log;
            Object[] objArr = new Object[5];
            objArr[0] = jobStatusEvent.isUserRequested() ? "User" : "Jet";
            objArr[1] = jobStatusEvent.getPreviousStatus();
            objArr[2] = jobStatusEvent.getNewStatus();
            objArr[3] = jobStatusEvent.getDescription() == null ? "" : " (" + jobStatusEvent.getDescription() + ")";
            objArr[4] = str;
            list.add(String.format("%s: %s -> %s%s%s", objArr));
        }

        public void deregister() {
            this.job.removeStatusListener(this.registrationId);
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/JobStatusListenerTest$TestCase.class */
    protected class TestCase {
        private final DAG dag;
        private JobConfig config;
        private BiConsumerEx<Job, JobStatusLogger> test;
        private String[] log;

        public TestCase(DAG dag) {
            this.dag = dag;
        }

        public TestCase config(JobConfig jobConfig) {
            this.config = jobConfig;
            return this;
        }

        public TestCase when(BiConsumerEx<Job, JobStatusLogger> biConsumerEx) {
            this.test = biConsumerEx;
            return this;
        }

        public TestCase when(ConsumerEx<Job> consumerEx) {
            this.test = (job, jobStatusLogger) -> {
                consumerEx.accept(job);
            };
            return this;
        }

        public TestCase expect(String... strArr) {
            this.log = strArr;
            return this;
        }

        public void runJob() {
            run((jetService, dag) -> {
                return jetService.newJob(dag, this.config != null ? this.config : new JobConfig());
            }, (v0, v1) -> {
                JobStatusListenerTest.assertTailEqualsEventually(v0, v1);
            });
        }

        public void runLightJob() {
            run((jetService, dag) -> {
                Job newLightJob = jetService.newLightJob(dag);
                JobAssertions.assertThat(newLightJob).isVisible(JobStatusListenerTest.this.instance.get());
                return newLightJob;
            }, (v0, v1) -> {
                JobStatusListenerTest.assertEqualsEventually(v0, v1);
            });
        }

        private void run(BiFunction<JetService, DAG, Job> biFunction, BiConsumer<List<String>, String[]> biConsumer) {
            Assert.assertNotNull("Use when() to specify a test", this.test);
            Job apply = biFunction.apply(JobStatusListenerTest.this.instance.get().getJet(), this.dag);
            JobStatusLogger jobStatusLogger = new JobStatusLogger(apply);
            TestProcessors.MockPS.unblock();
            JobStatusListenerTest.this.jobIdString = apply.getIdString();
            JobStatusListenerTest.this.registrationId = jobStatusLogger.registrationId;
            this.test.accept(apply, jobStatusLogger);
            if (this.log != null) {
                biConsumer.accept(jobStatusLogger.log, this.log);
            }
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 42621121:
                    if (implMethodName.equals("lambda$when$fa9ebb99$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest$TestCase") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/function/ConsumerEx;Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                        ConsumerEx consumerEx = (ConsumerEx) serializedLambda.getCapturedArg(0);
                        return (job, jobStatusLogger) -> {
                            consumerEx.accept(job);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList(mode("onMaster", () -> {
            return instances()[0];
        }), mode("onNonMaster", () -> {
            return instances()[1];
        }), mode("onClient", () -> {
            return client();
        }));
    }

    static Object[] mode(String str, Supplier<HazelcastInstance> supplier) {
        return new Object[]{str, supplier};
    }

    @BeforeClass
    public static void setup() {
        initializeWithClient(2, null, null);
    }

    @Before
    public void reset() {
        TestProcessors.reset(1);
    }

    @Test
    public void testListener_waitForCompletion() {
        new TestCase(batchSource()).when((v0) -> {
            v0.join();
        }).expect("Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> COMPLETED").runJob();
    }

    @Test
    public void testListener_suspend_resume_restart_cancelJob() {
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(3);
        new TestCase(streamSource()).when(job -> {
            JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.RUNNING);
            job.suspend();
            JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.SUSPENDED);
            job.resume();
            TestProcessors.MockPS.unblock();
            JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.RUNNING);
            job.restart();
            TestProcessors.MockPS.unblock();
            TestProcessors.NoOutputSourceP.executionStarted.await();
            cancelAndJoin(job);
        }).expect("Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "User: RUNNING -> SUSPENDED (Suspend)", "User: SUSPENDED -> NOT_RUNNING (Resume)", "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "User: RUNNING -> NOT_RUNNING (Restart)", "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "User: RUNNING -> FAILED (Cancel)").runJob();
    }

    @Test
    public void testListener_jobFails() {
        new TestCase(batchSource(new JetException(ExecutionLifecycleTest.MOCK_ERROR_MESSAGE))).when((job, jobStatusLogger) -> {
            Throwable th = null;
            try {
                job.join();
            } catch (CompletionException e) {
                th = e.getCause();
            }
            Assert.assertNotNull(th);
            assertTailEqualsEventually(jobStatusLogger.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> FAILED (" + th + ")");
        }).runJob();
    }

    @Test
    public void testListener_restartOnException() {
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(2);
        new TestCase(streamSource(new RestartableException())).config(new JobConfig().setAutoScaling(true)).when((job, jobStatusLogger) -> {
            TestProcessors.MockPS.unblock();
            TestProcessors.MockPS.unblock();
            TestProcessors.NoOutputSourceP.executionStarted.await();
            cancelAndJoin(job);
            assertTailEqualsEventually((List) jobStatusLogger.log.stream().map(SIMPLIFY).collect(Collectors.toList()), "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> NOT_RUNNING (com.hazelcast.jet.RestartableException)", "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "User: RUNNING -> FAILED (Cancel)");
        }).runJob();
    }

    @Test
    public void testListener_suspendOnFailure() {
        new TestCase(batchSource(new JetException(ExecutionLifecycleTest.MOCK_ERROR_MESSAGE))).config(new JobConfig().setSuspendOnFailure(true)).when((job, jobStatusLogger) -> {
            JobAssertions.assertThat(job).eventuallySuspended();
            String str = job.getSuspensionCause().errorCause().split("\n", 3)[1];
            cancelAndJoin(job);
            assertTailEqualsEventually(jobStatusLogger.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> SUSPENDED (" + str + ")", "User: SUSPENDED -> FAILED (Cancel)");
        }).runJob();
    }

    @Test
    public void testListenerDeregistration() {
        new TestCase(streamSource()).when((job, jobStatusLogger) -> {
            assertTailEqualsEventually(jobStatusLogger.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING");
            jobStatusLogger.deregister();
            cancelAndJoin(job);
            assertHasNoListenerEventually(job.getIdString(), jobStatusLogger.registrationId);
        }).expect("Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING").runJob();
    }

    @Test
    public void testListenerLateRegistration() {
        testListenerLateRegistration((v0, v1) -> {
            return v0.newJob(v1);
        });
    }

    @Test
    public void testLightListener_waitForCompletion() {
        new TestCase(batchSource()).when((v0) -> {
            v0.join();
        }).expect("Jet: RUNNING -> COMPLETED").runLightJob();
    }

    @Test
    public void testLightListener_cancelJob() {
        new TestCase(streamSource()).when((v0) -> {
            v0.cancel();
        }).expect("User: RUNNING -> FAILED (Cancel)").runLightJob();
    }

    @Test
    public void testLightListener_jobFails() {
        new TestCase(batchSource(new JetException(ExecutionLifecycleTest.MOCK_ERROR_MESSAGE))).when((job, jobStatusLogger) -> {
            Throwable th = null;
            try {
                job.getFuture().get();
            } catch (InterruptedException | ExecutionException e) {
                th = e.getCause();
            }
            Assert.assertNotNull(th);
            assertEqualsEventually(jobStatusLogger.log, "Jet: RUNNING -> FAILED (" + th + ")");
        }).runLightJob();
    }

    @Test
    public void testLightListenerDeregistration() {
        new TestCase(streamSource()).when((job, jobStatusLogger) -> {
            jobStatusLogger.deregister();
            job.cancel();
            assertHasNoListenerEventually(job.getIdString(), jobStatusLogger.registrationId);
            Assertions.assertThat(jobStatusLogger.log).isEmpty();
        }).runLightJob();
    }

    @Test
    public void testLightListenerLateRegistration() {
        testListenerLateRegistration((v0, v1) -> {
            return v0.newLightJob(v1);
        });
    }

    private void testListenerLateRegistration(BiFunction<JetService, DAG, Job> biFunction) {
        Job apply = biFunction.apply(this.instance.get().getJet(), batchSource());
        TestProcessors.MockPS.unblock();
        this.jobIdString = apply.getIdString();
        apply.join();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            apply.addStatusListener(jobStatusEvent -> {
            });
        }).hasMessage("Cannot add status listener to a COMPLETED job");
    }

    @After
    public void testListenerDeregistration_onCompletion() {
        assertHasNoListenerEventually(this.jobIdString, this.registrationId);
    }

    protected static void assertHasNoListenerEventually(String str, UUID uuid) {
        assertTrueEventually(() -> {
            Assert.assertTrue(Arrays.stream(instances()).allMatch(hazelcastInstance -> {
                return EventServiceTest.getEventService(hazelcastInstance).getRegistrations("hz:impl:jobEventService", str).isEmpty();
            }));
            Assert.assertTrue(uuid == null || !ClientTestUtil.getHazelcastClientInstanceImpl(client()).getListenerService().getRegistrations().containsKey(uuid));
        });
    }

    @SafeVarargs
    protected static <T> void assertEqualsEventually(List<T> list, T... tArr) {
        assertTrueEventually(() -> {
            ArrayList arrayList = new ArrayList(list);
            Assert.assertEquals("length", arrayList.size(), tArr.length);
            Assert.assertEquals(arrayList, Arrays.asList(tArr));
        });
    }

    @SafeVarargs
    protected static <T> void assertTailEqualsEventually(List<T> list, T... tArr) {
        assertTrueEventually(() -> {
            ArrayList arrayList = new ArrayList(list);
            assertBetween("length", arrayList.size(), tArr.length - 1, tArr.length);
            Assert.assertEquals(Arrays.asList(tArr).subList(tArr.length - arrayList.size(), tArr.length), arrayList);
        });
    }

    protected static DAG batchSource() {
        return batchSource(null);
    }

    protected static DAG batchSource(RuntimeException runtimeException) {
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        TestProcessors.NoOutputSourceP.failure.set(runtimeException);
        return new DAG().vertex(new Vertex("batchSource", ProcessorMetaSupplier.forceTotalParallelismOne(new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 1).initBlocks())));
    }

    protected static DAG streamSource() {
        return streamSource(null);
    }

    protected static DAG streamSource(RuntimeException runtimeException) {
        TestProcessors.NoOutputSourceP.failure.set(runtimeException);
        return new DAG().vertex(new Vertex("streamSource", ProcessorMetaSupplier.forceTotalParallelismOne(new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 1).initBlocks())));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1435822567:
                if (implMethodName.equals("lambda$testListener_restartOnException$c8cc76b3$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1382405477:
                if (implMethodName.equals("lambda$testListener_suspend_resume_restart_cancelJob$e51c823a$1")) {
                    z = true;
                    break;
                }
                break;
            case -1367724422:
                if (implMethodName.equals("cancel")) {
                    z = false;
                    break;
                }
                break;
            case -870649086:
                if (implMethodName.equals("lambda$testListener_jobFails$c8cc76b3$1")) {
                    z = 6;
                    break;
                }
                break;
            case -495122389:
                if (implMethodName.equals("lambda$testListener_suspendOnFailure$c8cc76b3$1")) {
                    z = 4;
                    break;
                }
                break;
            case 3267882:
                if (implMethodName.equals("join")) {
                    z = 7;
                    break;
                }
                break;
            case 437230049:
                if (implMethodName.equals("lambda$testLightListenerDeregistration$c8cc76b3$1")) {
                    z = 9;
                    break;
                }
                break;
            case 1108327065:
                if (implMethodName.equals("lambda$testListenerDeregistration$c8cc76b3$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 8;
                    break;
                }
                break;
            case 2104403642:
                if (implMethodName.equals("lambda$testLightListener_jobFails$c8cc76b3$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Job") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.cancel();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;)V")) {
                    return job -> {
                        JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.RUNNING);
                        job.suspend();
                        JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.SUSPENDED);
                        job.resume();
                        TestProcessors.MockPS.unblock();
                        JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.RUNNING);
                        job.restart();
                        TestProcessors.MockPS.unblock();
                        TestProcessors.NoOutputSourceP.executionStarted.await();
                        cancelAndJoin(job);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job2, jobStatusLogger) -> {
                        TestProcessors.MockPS.unblock();
                        TestProcessors.MockPS.unblock();
                        TestProcessors.NoOutputSourceP.executionStarted.await();
                        cancelAndJoin(job2);
                        assertTailEqualsEventually((List) jobStatusLogger.log.stream().map(SIMPLIFY).collect(Collectors.toList()), "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> NOT_RUNNING (com.hazelcast.jet.RestartableException)", "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "User: RUNNING -> FAILED (Cancel)");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job3, jobStatusLogger2) -> {
                        Throwable th = null;
                        try {
                            job3.getFuture().get();
                        } catch (InterruptedException | ExecutionException e) {
                            th = e.getCause();
                        }
                        Assert.assertNotNull(th);
                        assertEqualsEventually(jobStatusLogger2.log, "Jet: RUNNING -> FAILED (" + th + ")");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job4, jobStatusLogger3) -> {
                        JobAssertions.assertThat(job4).eventuallySuspended();
                        String str = job4.getSuspensionCause().errorCause().split("\n", 3)[1];
                        cancelAndJoin(job4);
                        assertTailEqualsEventually(jobStatusLogger3.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> SUSPENDED (" + str + ")", "User: SUSPENDED -> FAILED (Cancel)");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job5, jobStatusLogger4) -> {
                        assertTailEqualsEventually(jobStatusLogger4.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING");
                        jobStatusLogger4.deregister();
                        cancelAndJoin(job5);
                        assertHasNoListenerEventually(job5.getIdString(), jobStatusLogger4.registrationId);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job6, jobStatusLogger5) -> {
                        Throwable th = null;
                        try {
                            job6.join();
                        } catch (CompletionException e) {
                            th = e.getCause();
                        }
                        Assert.assertNotNull(th);
                        assertTailEqualsEventually(jobStatusLogger5.log, "Jet: NOT_RUNNING -> STARTING", "Jet: STARTING -> RUNNING", "Jet: RUNNING -> FAILED (" + th + ")");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Job") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.join();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Job") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.join();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/JobStatusListenerTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/Job;Lcom/hazelcast/jet/JobStatusListenerTest$JobStatusLogger;)V")) {
                    return (job7, jobStatusLogger6) -> {
                        jobStatusLogger6.deregister();
                        job7.cancel();
                        assertHasNoListenerEventually(job7.getIdString(), jobStatusLogger6.registrationId);
                        Assertions.assertThat(jobStatusLogger6.log).isEmpty();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
