package com.hazelcast.jet.core;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.DeltaJobConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/SplitBrainTest.class */
public class SplitBrainTest extends JetSplitBrainTestSupport {
    @Override // com.hazelcast.jet.core.JetSplitBrainTestSupport
    protected void onBeforeSetup() {
        TestProcessors.reset(1);
    }

    @Override // com.hazelcast.jet.core.JetSplitBrainTestSupport
    protected void onConfigCreated(Config config) {
        config.getJetConfig().setBackupCount(6);
        config.getJetConfig().setScaleUpDelayMillis(3000L);
    }

    @Test
    public void when_quorumIsLostOnMinority_then_jobDoesNotRestartOnMinorityAndCancelledAfterMerge() {
        int i = 3;
        int i2 = 3 + 2;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(i2 * 4);
        Job[] jobArr = new Job[1];
        Consumer<HazelcastInstance[]> consumer = hazelcastInstanceArr -> {
            jobArr[0] = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i2))), new JobConfig().setSplitBrainProtection(true));
            assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        };
        AtomicReference atomicReference = new AtomicReference();
        testSplitBrain(3, 2, consumer, (hazelcastInstanceArr2, hazelcastInstanceArr3) -> {
            assertTrueEventually(() -> {
                Assert.assertEquals(i2, TestProcessors.MockPS.closeCount.get());
            });
            TestProcessors.NoOutputSourceP.proceedLatch.countDown();
            assertTrueEventually(() -> {
                Assert.assertEquals(i2 + i, TestProcessors.MockPS.initCount.get());
            });
            long id = jobArr[0].getId();
            assertTrueEventually(() -> {
                JobCoordinationService jobCoordinationService = getJetServiceBackend(hazelcastInstanceArr2[0]).getJobCoordinationService();
                Assert.assertEquals(JobStatus.COMPLETED, jobCoordinationService.getJobStatus(id).get());
                Assertions.assertThat((Boolean) jobCoordinationService.isJobUserCancelled(id).get()).isFalse();
            });
            JetServiceBackend jetServiceBackend = getJetServiceBackend(hazelcastInstanceArr3[0]);
            assertTrueEventually(() -> {
                MasterContext masterContext = jetServiceBackend.getJobCoordinationService().getMasterContext(id);
                Assert.assertNotNull(masterContext);
                atomicReference.set(masterContext.jobContext().jobCompletionFuture());
            });
            assertTrueAllTheTime(() -> {
                assertStatusNotRunningOrStarting((JobStatus) jetServiceBackend.getJobCoordinationService().getJobStatus(id).get());
            }, 20L);
        }, hazelcastInstanceArr4 -> {
            assertTrueEventually(() -> {
                Assert.assertEquals(i2 + i, TestProcessors.MockPS.initCount.get());
                Assert.assertEquals(i2 + i, TestProcessors.MockPS.closeCount.get());
            });
            Assert.assertEquals(i2, TestProcessors.MockPS.receivedCloseErrors.size());
            TestProcessors.MockPS.receivedCloseErrors.forEach(th -> {
                Assert.assertTrue("received " + th, th instanceof CancellationException);
            });
            Future future = (Future) atomicReference.get();
            Objects.requireNonNull(future);
            Assertions.assertThatThrownBy(future::get).isInstanceOf(CancellationException.class);
        });
    }

    @Test
    public void when_quorumIsLostOnBothSides_then_jobRestartsAfterMerge() {
        int i = 2 + 2;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(i * 4);
        Job[] jobArr = new Job[1];
        testSplitBrain(2, 2, hazelcastInstanceArr -> {
            jobArr[0] = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i))), new JobConfig().setSplitBrainProtection(true));
            assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        }, (hazelcastInstanceArr2, hazelcastInstanceArr3) -> {
            TestProcessors.NoOutputSourceP.proceedLatch.countDown();
            long id = jobArr[0].getId();
            assertTrueEventually(() -> {
                JetServiceBackend jetServiceBackend = getJetServiceBackend(hazelcastInstanceArr2[0]);
                JetServiceBackend jetServiceBackend2 = getJetServiceBackend(hazelcastInstanceArr3[0]);
                Assert.assertNotNull(jetServiceBackend.getJobCoordinationService().getMasterContext(id));
                Assert.assertNotNull(jetServiceBackend2.getJobCoordinationService().getMasterContext(id));
            });
            assertTrueAllTheTime(() -> {
                JetServiceBackend jetServiceBackend = getJetServiceBackend(hazelcastInstanceArr2[0]);
                JetServiceBackend jetServiceBackend2 = getJetServiceBackend(hazelcastInstanceArr3[0]);
                JobStatus jobStatus = (JobStatus) jetServiceBackend.getJobCoordinationService().getJobStatus(id).get();
                JobStatus jobStatus2 = (JobStatus) jetServiceBackend2.getJobCoordinationService().getJobStatus(id).get();
                assertStatusNotRunningOrStarting(jobStatus);
                assertStatusNotRunningOrStarting(jobStatus2);
            }, 20L);
        }, hazelcastInstanceArr4 -> {
            assertTrueEventually(() -> {
                Assertions.assertThat(TestProcessors.MockPS.initCount.get()).isBetween(Integer.valueOf(i + (i / 2) + 1), Integer.valueOf(i * 2));
                Assertions.assertThat(TestProcessors.MockPS.closeCount.get()).isEqualTo(TestProcessors.MockPS.initCount.get());
            });
            Assert.assertEquals(i, TestProcessors.MockPS.receivedCloseErrors.size());
            TestProcessors.MockPS.receivedCloseErrors.forEach(th -> {
                Assert.assertTrue("received " + th, th instanceof CancellationException);
            });
        });
    }

    @Test
    public void when_splitBrainProtectionIsDisabled_then_jobCompletesOnBothSides() {
        int i = 2 + 2;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(i * 4);
        Job[] jobArr = new Job[1];
        testSplitBrain(2, 2, hazelcastInstanceArr -> {
            jobArr[0] = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i))), new JobConfig());
            assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        }, (hazelcastInstanceArr2, hazelcastInstanceArr3) -> {
            TestProcessors.NoOutputSourceP.proceedLatch.countDown();
            long id = jobArr[0].getId();
            assertTrueEventually(() -> {
                JetServiceBackend jetServiceBackend = getJetServiceBackend(hazelcastInstanceArr2[0]);
                JetServiceBackend jetServiceBackend2 = getJetServiceBackend(hazelcastInstanceArr3[0]);
                Assert.assertEquals(JobStatus.COMPLETED, jetServiceBackend.getJobCoordinationService().getJobStatus(id).get());
                Assert.assertEquals(JobStatus.COMPLETED, jetServiceBackend2.getJobCoordinationService().getJobStatus(id).get());
            });
        }, hazelcastInstanceArr4 -> {
            assertTrueEventually(() -> {
                Assert.assertEquals("init count", i * 2, TestProcessors.MockPS.initCount.get());
                Assert.assertEquals("close count", i * 2, TestProcessors.MockPS.closeCount.get());
            });
            Assert.assertEquals(i, TestProcessors.MockPS.receivedCloseErrors.size());
            TestProcessors.MockPS.receivedCloseErrors.forEach(th -> {
                Assert.assertTrue("received " + th, th instanceof CancellationException);
            });
        });
    }

    @Test
    public void when_jobIsSubmittedToMinoritySide_then_jobIsCancelledDuringMerge() {
        int i = 2;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(2 * 4);
        Job[] jobArr = new Job[1];
        testSplitBrain(3, 2, null, (hazelcastInstanceArr, hazelcastInstanceArr2) -> {
            jobArr[0] = startJob(hazelcastInstanceArr2[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i))), new JobConfig().setSplitBrainProtection(true));
            assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        }, hazelcastInstanceArr3 -> {
            assertTrueEventually(() -> {
                Assert.assertEquals(i, TestProcessors.MockPS.receivedCloseErrors.size());
            }, 20L);
            TestProcessors.MockPS.receivedCloseErrors.forEach(th -> {
                Assert.assertTrue("received: " + th, th instanceof CancellationException);
            });
            Assertions.assertThatThrownBy(() -> {
                jobArr[0].getFuture().get(30L, TimeUnit.SECONDS);
            }).isInstanceOf(CancellationException.class);
        });
    }

    @Test
    public void when_newMemberJoinsToCluster_then_jobQuorumSizeIsUpdated() {
        HazelcastInstance[] hazelcastInstanceArr = new HazelcastInstance[3];
        for (int i = 0; i < 3; i++) {
            hazelcastInstanceArr[i] = createHazelcastInstance(createConfig());
        }
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(3 * 4);
        Job startJob = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 3)).localParallelism(4)), new JobConfig().setSplitBrainProtection(true));
        assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        createHazelcastInstance(createConfig());
        assertTrueEventually(() -> {
            JetServiceBackend jetServiceBackend = getJetServiceBackend(hazelcastInstanceArr[0]);
            Assert.assertEquals(3L, jetServiceBackend.getJobRepository().getJobExecutionRecord(startJob.getId()).getQuorumSize());
            Assert.assertEquals(3L, jetServiceBackend.getJobCoordinationService().getMasterContext(startJob.getId()).jobExecutionRecord().getQuorumSize());
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
    }

    @Test
    public void when_newMemberIsAddedAfterClusterSizeFallsBelowQuorumSize_then_jobRestartDoesNotSucceed() {
        HazelcastInstance[] hazelcastInstanceArr = new HazelcastInstance[5];
        for (int i = 0; i < 5; i++) {
            hazelcastInstanceArr[i] = createHazelcastInstance(createConfig());
        }
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(5 * 4);
        Job startJob = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 5)).localParallelism(4)), new JobConfig().setSplitBrainProtection(true));
        assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        for (int i2 = 1; i2 < 5; i2++) {
            hazelcastInstanceArr[i2].shutdown();
        }
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.NOT_RUNNING, Duration.ofSeconds(10L));
        HazelcastInstance createHazelcastInstance = createHazelcastInstance(createConfig());
        assertTrueAllTheTime(() -> {
            assertStatusNotRunningOrStarting(startJob.getStatus());
        }, 5L);
        waitAllForSafeState(Lists.newArrayList(new HazelcastInstance[]{hazelcastInstanceArr[0], createHazelcastInstance, createHazelcastInstance(createConfig())}));
        assertTrueEventually(() -> {
            assertStatusRunningOrCompleted(startJob.getStatus());
        }, 5L);
    }

    private void assertStatusNotRunningOrStarting(JobStatus jobStatus) {
        Assert.assertTrue("status=" + jobStatus, jobStatus == JobStatus.NOT_RUNNING || jobStatus == JobStatus.STARTING);
    }

    private void assertStatusRunningOrCompleted(JobStatus jobStatus) {
        Assert.assertTrue("status=" + jobStatus, jobStatus == JobStatus.RUNNING || jobStatus == JobStatus.COMPLETED);
    }

    @Test
    public void when_minorityMasterBecomesMajorityMaster_then_jobKeepsRunning() {
        int i = 2 + 1;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(1 * 4);
        Job[] jobArr = new Job[1];
        testSplitBrain(2, 1, hazelcastInstanceArr -> {
            jobArr[0] = startJob(hazelcastInstanceArr[2], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i))), new JobConfig());
            assertOpenEventually(TestProcessors.NoOutputSourceP.executionStarted);
        }, null, hazelcastInstanceArr2 -> {
            Assert.assertEquals(i, hazelcastInstanceArr2.length);
            this.logger.info("Shutting down 1st instance");
            hazelcastInstanceArr2[0].shutdown();
            this.logger.info("1st instance down, starting another instance");
            createHazelcastInstance(createConfig());
            this.logger.info("Shutting down 2nd instance");
            hazelcastInstanceArr2[1].shutdown();
            Assert.assertTrue(hazelcastInstanceArr2[2].getCluster().isMaster());
            JobAssertions.assertThat(jobArr[0]).eventuallyHasStatus(JobStatus.RUNNING, Duration.ofSeconds(10L));
            assertTrueAllTheTime(() -> {
                Assert.assertEquals(JobStatus.RUNNING, jobArr[0].getStatus());
            }, 5L);
        });
    }

    @Test
    public void when_splitBrainProtectionDisabled_then_jobRunsTwiceAndAgainOnceAfterHeal() {
        int i = 3 + 2;
        TestProcessors.NoOutputSourceP.executionStarted = new CountDownLatch(2 * 4);
        Job[] jobArr = new Job[1];
        testSplitBrain(3, 2, hazelcastInstanceArr -> {
            jobArr[0] = startJob(hazelcastInstanceArr[0], new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, i))), new JobConfig().setSplitBrainProtection(false));
            assertTrueEventually(() -> {
                Assert.assertEquals("initCount", i, TestProcessors.MockPS.initCount.get());
            }, 10L);
            assertOpenEventually("executionStarted", TestProcessors.NoOutputSourceP.executionStarted);
        }, (hazelcastInstanceArr2, hazelcastInstanceArr3) -> {
            Job job = hazelcastInstanceArr2[0].getJet().getJob(jobArr[0].getId());
            Job job2 = hazelcastInstanceArr3[0].getJet().getJob(jobArr[0].getId());
            Assert.assertNotNull("jobRef1", job);
            Assert.assertNotNull("jobRef2", job2);
            assertTrueEventually(() -> {
                Assert.assertEquals("job not running on subcluster 1", JobStatus.RUNNING, job.getStatus());
            });
            assertTrueEventually(() -> {
                Assert.assertEquals("job not running on subcluster 2", JobStatus.RUNNING, job2.getStatus());
            });
            assertTrueEventually(() -> {
                Assert.assertEquals("initCount", i * 2, TestProcessors.MockPS.initCount.get());
            });
        }, hazelcastInstanceArr4 -> {
            assertTrueEventually(() -> {
                Assert.assertEquals(i * 3, TestProcessors.MockPS.initCount.get());
            }, 20L);
        });
    }

    @Test
    public void when_splitBrainProtectionDisabledLater_then_jobRestarts() {
        HazelcastInstance[] startInitialCluster = startInitialCluster(createConfig(), createConfig().setLiteMember(true), 2);
        Job startJob = startJob(startInitialCluster[0], TestProcessors.streamingDag(), new JobConfig().setSplitBrainProtection(true));
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.RUNNING);
        startJob.suspend();
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        startJob.updateConfig(new DeltaJobConfig().setSplitBrainProtection(false));
        startJob.resume();
        long eventuallyJobRunning = JobAssertions.assertThat(startJob).eventuallyJobRunning(startInitialCluster[0], null);
        startInitialCluster[1].getLifecycleService().terminate();
        JobAssertions.assertThat(startJob).eventuallyJobRunning(startInitialCluster[0], Long.valueOf(eventuallyJobRunning));
    }

    @Test
    public void when_splitBrainProtectionEnabledLater_then_jobDoesNotRestartOnMinority() {
        HazelcastInstance[] startInitialCluster = startInitialCluster(createConfig(), createConfig().setLiteMember(true), 2);
        Job startJob = startJob(startInitialCluster[0], TestProcessors.streamingDag(), new JobConfig().setSplitBrainProtection(false));
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.RUNNING);
        startJob.suspend();
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        startJob.updateConfig(new DeltaJobConfig().setSplitBrainProtection(true));
        startJob.resume();
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.RUNNING);
        startInitialCluster[1].getLifecycleService().terminate();
        JobAssertions.assertThat(startJob).eventuallyHasStatus(JobStatus.NOT_RUNNING);
    }

    protected Job startJob(HazelcastInstance hazelcastInstance, DAG dag, JobConfig jobConfig) {
        return hazelcastInstance.getJet().newJob(dag, jobConfig);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
