package com.hazelcast.jet.core;

import com.hazelcast.client.map.helpers.AMapStore;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JobRestartWithSnapshotTest;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.PacketFiltersUtil;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/ManualRestartTest.class */
public class ManualRestartTest extends JetTestSupport {
    private static final int NODE_COUNT = 2;
    private static final int LOCAL_PARALLELISM = 1;
    private DAG dag;
    private HazelcastInstance[] instances;

    /* loaded from: input_file:com/hazelcast/jet/core/ManualRestartTest$FailingMapStore.class */
    public static class FailingMapStore extends AMapStore implements Serializable {
        private static volatile boolean fail;
        private static volatile boolean failed;

        @Override // com.hazelcast.client.map.helpers.AMapStore
        public void store(Object obj, Object obj2) {
            System.err.println("o : " + obj + ", o2: " + obj2);
            if (fail) {
                failed = true;
                throw new RuntimeException();
            }
        }
    }

    @Before
    public void setup() {
        TestProcessors.reset(2);
        this.dag = new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 2)));
        this.instances = createHazelcastInstances(2);
    }

    @Test
    public void when_jobIsRunning_then_itRestarts() {
        testJobRestartWhenJobIsRunning(true);
    }

    @Test
    public void when_autoScalingDisabled_then_jobRestarts() {
        testJobRestartWhenJobIsRunning(false);
    }

    private void testJobRestartWhenJobIsRunning(boolean z) {
        Job newJob = createHazelcastClient().getJet().newJob(this.dag, new JobConfig().setAutoScaling(z));
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, TestProcessors.MockPS.initCount.get());
        });
        for (int i = 0; i < 2; i++) {
            createHazelcastInstance();
        }
        assertTrueAllTheTime(() -> {
            Assert.assertEquals(2L, TestProcessors.MockPS.initCount.get());
        }, 3L);
        newJob.restart();
        int i2 = 4 + 2;
        assertTrueEventually(() -> {
            Assert.assertEquals(i2, TestProcessors.MockPS.initCount.get());
        });
    }

    @Test
    public void when_jobIsNotBeingExecuted_then_itCannotBeRestarted() {
        PacketFiltersUtil.rejectOperationsBetween(this.instances[0], this.instances[1], JetInitDataSerializerHook.FACTORY_ID, (List<Integer>) Collections.singletonList(5));
        Job newJob = createHazelcastClient().getJet().newJob(this.dag);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.STARTING);
        Objects.requireNonNull(newJob);
        Assertions.assertThatThrownBy(newJob::restart).isInstanceOf(IllegalStateException.class).hasMessageContainingAll(new CharSequence[]{"Cannot RESTART"});
        PacketFiltersUtil.resetPacketFiltersFrom(this.instances[0]);
    }

    @Test
    public void when_jobIsCompleted_then_itCannotBeRestarted() {
        Job newJob = createHazelcastClient().getJet().newJob(this.dag);
        newJob.cancel();
        cancelAndJoin(newJob);
        Objects.requireNonNull(newJob);
        Assertions.assertThatThrownBy(newJob::restart).isInstanceOf(IllegalStateException.class).hasMessageContaining("Cannot RESTART");
    }

    @Test
    public void when_terminalSnapshotFails_then_previousSnapshotUsed() {
        MapConfig mapConfig = new MapConfig("__jet.snapshot.*");
        mapConfig.getMapStoreConfig().setClassName(FailingMapStore.class.getName()).setEnabled(true);
        this.instances[0].getConfig().getStaticConfig().addMapConfig(mapConfig);
        FailingMapStore.fail = false;
        FailingMapStore.failed = false;
        DAG dag = new DAG();
        Vertex newVertex = dag.newVertex("source", TestUtil.throttle((SupplierEx<Processor>) () -> {
            return new JobRestartWithSnapshotTest.SequencesInPartitionsGeneratorP(2, 10000, true);
        }, 1000L));
        dag.edge(Edge.between(newVertex, dag.newVertex("sink", SinkProcessors.writeListP("sink"))));
        newVertex.localParallelism(1);
        Job newJob = this.instances[0].getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(2000L));
        JobRepository jobRepository = ((JetServiceBackend) Accessors.getNode(this.instances[0]).nodeEngine.getService("hz:impl:jetService")).getJobCoordinationService().jobRepository();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            Assert.assertTrue(jobRepository.getJobExecutionRecord(newJob.getId()).dataMapIndex() >= 0);
        });
        sleepMillis(100);
        FailingMapStore.fail = true;
        newJob.restart();
        assertTrueEventually(() -> {
            Assert.assertTrue(FailingMapStore.failed);
        });
        FailingMapStore.fail = false;
        newJob.join();
        Map map = (Map) new ArrayList((Collection) this.instances[0].getList("sink")).stream().filter(entry -> {
            return ((Integer) entry.getKey()).intValue() == 0;
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return 1;
        }, (num3, num4) -> {
            return Integer.valueOf(num3.intValue() + num4.intValue());
        }, TreeMap::new));
        Assert.assertEquals("first item != 1, " + map, 1, map.get(0));
        Assert.assertEquals("last item != 1, " + map, 1, map.get(9999));
        boolean z = false;
        boolean z2 = false;
        for (Integer num5 : map.values()) {
            if (num5.intValue() == 1) {
                if (z) {
                    z2 = true;
                }
            } else if (num5.intValue() == 2) {
                Assert.assertFalse("got a 2 in another group", z2);
                z = true;
            } else {
                Assert.fail("v=" + num5);
            }
        }
        Assert.assertTrue("didn't see any 2s", z);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1229702142:
                if (implMethodName.equals("lambda$when_terminalSnapshotFails_then_previousSnapshotUsed$480a65e$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/ManualRestartTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new JobRestartWithSnapshotTest.SequencesInPartitionsGeneratorP(2, 10000, true);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
