package com.hazelcast.jet.core;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.map.MapStore;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.SlowTest;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({SlowTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/GracefulShutdownTest.class */
public class GracefulShutdownTest extends JetTestSupport {
    private static final int NODE_COUNT = 2;
    private HazelcastInstance[] instances;
    private HazelcastInstance client;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/GracefulShutdownTest$BlockingMapStore.class */
    public static class BlockingMapStore implements MapStore<Object, Object> {
        private static volatile boolean shouldBlock;
        private static volatile boolean wasBlocked;

        private BlockingMapStore() {
        }

        public void store(Object obj, Object obj2) {
            block();
        }

        public void storeAll(Map<Object, Object> map) {
            block();
        }

        public void delete(Object obj) {
            block();
        }

        public void deleteAll(Collection<Object> collection) {
            block();
        }

        public Object load(Object obj) {
            return null;
        }

        public Map<Object, Object> loadAll(Collection<Object> collection) {
            return null;
        }

        public Iterable<Object> loadAllKeys() {
            return null;
        }

        private void block() {
            while (shouldBlock) {
                wasBlocked = true;
                HazelcastTestSupport.sleepMillis(100);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/GracefulShutdownTest$EmitIntegersP.class */
    public static final class EmitIntegersP extends AbstractProcessor {
        static final ConcurrentMap<Integer, Integer> savedCounters = new ConcurrentHashMap();
        private int counter;
        private int globalIndex;
        private final int numItems;

        EmitIntegersP(int i) {
            this.numItems = i;
        }

        protected void init(@Nonnull Processor.Context context) {
            this.globalIndex = context.globalProcessorIndex();
        }

        public boolean complete() {
            if (tryEmit(Integer.valueOf(this.counter))) {
                this.counter++;
            }
            return this.counter == this.numItems;
        }

        public boolean saveToSnapshot() {
            savedCounters.put(Integer.valueOf(this.globalIndex), Integer.valueOf(this.counter));
            return tryEmitToSnapshot(BroadcastKey.broadcastKey(Integer.valueOf(this.globalIndex)), Integer.valueOf(this.counter));
        }

        protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
            this.counter = Math.max(this.counter, ((Integer) obj2).intValue());
        }
    }

    @Before
    public void setup() {
        TestProcessors.reset(0);
        this.instances = createHazelcastInstances(2);
        this.client = createHazelcastClient();
        EmitIntegersP.savedCounters.clear();
    }

    @Test
    public void when_snapshottedJob_coordinatorShutDown_then_gracefully() {
        when_shutDown(true, true);
    }

    @Test
    public void when_snapshottedJob_nonCoordinatorShutDown_then_gracefully() {
        when_shutDown(false, true);
    }

    @Test
    public void when_nonSnapshottedJob_coordinatorShutDown_then_restarts() {
        when_shutDown(true, false);
    }

    @Test
    public void when_nonSnapshottedJob_nonCoordinatorShutDown_then_restarts() {
        when_shutDown(false, false);
    }

    private void when_shutDown(boolean z, boolean z2) {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("source", TestUtil.throttle((SupplierEx<Processor>) () -> {
            return new EmitIntegersP(50000);
        }, 10000L)).localParallelism(1), dag.newVertex("sink", SinkProcessors.writeListP("sink"))));
        Job newJob = this.client.getJet().newJob(dag, new JobConfig().setProcessingGuarantee(z2 ? ProcessingGuarantee.EXACTLY_ONCE : ProcessingGuarantee.NONE).setSnapshotIntervalMillis(TimeUnit.HOURS.toMillis(1L)));
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        this.logger.info("sleeping 1 sec");
        sleepSeconds(1);
        boolean z3 = !z;
        boolean z4 = z;
        this.logger.info("Shutting down instance...");
        this.instances[z3 ? 1 : 0].shutdown();
        this.logger.info("Joining job...");
        newJob.join();
        this.logger.info("Joined");
        Map map = (Map) new ArrayList((Collection) this.instances[z4 ? 1 : 0].getList("sink")).stream().collect(Collectors.toMap(Function.identity(), num -> {
            return 1;
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }));
        if (!z2) {
            Assert.assertEquals(3L, ((Integer) map.get(0)).intValue());
            Assert.assertEquals(3L, ((Integer) map.get(1)).intValue());
            Assert.assertEquals(1L, ((Integer) map.get(49999)).intValue());
        } else {
            this.logger.info("savedCounters=" + EmitIntegersP.savedCounters);
            Assert.assertEquals(EmitIntegersP.savedCounters.toString(), 2L, EmitIntegersP.savedCounters.size());
            int asInt = EmitIntegersP.savedCounters.values().stream().mapToInt((v0) -> {
                return v0.intValue();
            }).min().getAsInt();
            Assert.assertEquals((Map) IntStream.range(0, 50000).boxed().collect(Collectors.toMap(Function.identity(), num2 -> {
                return Integer.valueOf(num2.intValue() < asInt ? 2 : 1);
            })), map);
        }
    }

    @Test
    public void when_liteMemberShutDown_then_jobKeepsRunning() throws Exception {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.setLiteMember(true);
        HazelcastInstance createHazelcastInstance = createHazelcastInstance(smallInstanceConfig);
        DAG dag = new DAG();
        dag.newVertex("v", TestProcessors.NoOutputSourceP::new);
        Job newJob = this.instances[0].getJet().newJob(dag);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING, Duration.ofSeconds(10L));
        Objects.requireNonNull(createHazelcastInstance);
        Future<?> spawn = spawn(createHazelcastInstance::shutdown);
        assertTrueAllTheTime(() -> {
            Assert.assertEquals(JobStatus.RUNNING, newJob.getStatus());
        }, 5L);
        spawn.get();
    }

    @Test
    public void when_nonParticipatingMemberShutDown_then_jobKeepsRunning() throws Exception {
        DAG dag = new DAG();
        dag.newVertex("v", TestProcessors.NoOutputSourceP::new);
        Job newJob = this.instances[0].getJet().newJob(dag);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING, Duration.ofSeconds(10L));
        Future<?> spawn = spawn(() -> {
            HazelcastInstance createHazelcastInstance = createHazelcastInstance();
            sleepSeconds(1);
            createHazelcastInstance.shutdown();
        });
        assertTrueAllTheTime(() -> {
            JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        }, 5L);
        spawn.get();
    }

    @Test
    public void when_shutdownGracefulWhileRestartGraceful_then_restartsFromTerminalSnapshot() throws Exception {
        MapConfig mapConfig = new MapConfig("__jet.snapshot.*");
        mapConfig.getMapStoreConfig().setClassName(BlockingMapStore.class.getName()).setEnabled(true);
        this.instances[0].getConfig().getStaticConfig().addMapConfig(mapConfig);
        BlockingMapStore.shouldBlock = false;
        BlockingMapStore.wasBlocked = false;
        DAG dag = new DAG();
        int i = 5000;
        Vertex newVertex = dag.newVertex("source", TestUtil.throttle((SupplierEx<Processor>) () -> {
            return new EmitIntegersP(i);
        }, 500L));
        dag.edge(Edge.between(newVertex, dag.newVertex("sink", SinkProcessors.writeListP("sink"))));
        newVertex.localParallelism(1);
        Job newJob = this.instances[0].getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(2000L));
        JobRepository jobRepository = ((JetServiceBackend) Accessors.getNode(this.instances[0]).nodeEngine.getService("hz:impl:jetService")).getJobCoordinationService().jobRepository();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            Assert.assertTrue(jobRepository.getJobExecutionRecord(newJob.getId()).dataMapIndex() >= 0);
        });
        BlockingMapStore.shouldBlock = true;
        newJob.restart();
        assertTrueEventually(() -> {
            Assert.assertTrue("blocking did not happen", BlockingMapStore.wasBlocked);
        }, 5L);
        Future<?> spawn = spawn(() -> {
            this.instances[1].shutdown();
        });
        this.logger.info("savedCounters=" + EmitIntegersP.savedCounters);
        int asInt = EmitIntegersP.savedCounters.values().stream().mapToInt((v0) -> {
            return v0.intValue();
        }).min().getAsInt();
        BlockingMapStore.shouldBlock = false;
        spawn.get();
        newJob.join();
        Assert.assertEquals((Map) IntStream.range(0, 5000).boxed().collect(Collectors.toMap(Function.identity(), num -> {
            return Integer.valueOf(num.intValue() < asInt ? 2 : 1);
        })), (Map) new ArrayList((Collection) this.instances[0].getList("sink")).stream().collect(Collectors.toMap(Function.identity(), num2 -> {
            return 1;
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1126667878:
                if (implMethodName.equals("lambda$when_shutdownGracefulWhileRestartGraceful_then_restartsFromTerminalSnapshot$66d7db5d$1")) {
                    z = true;
                    break;
                }
                break;
            case -637043481:
                if (implMethodName.equals("lambda$when_shutdownGracefulWhileRestartGraceful_then_restartsFromTerminalSnapshot$70c41d3c$1")) {
                    z = false;
                    break;
                }
                break;
            case -169343402:
                if (implMethodName.equals("shutdown")) {
                    z = 5;
                    break;
                }
                break;
            case -132526395:
                if (implMethodName.equals("lambda$when_nonParticipatingMemberShutDown_then_jobKeepsRunning$77145072$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1805888248:
                if (implMethodName.equals("lambda$when_shutDown$c704679e$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/GracefulShutdownTest") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    GracefulShutdownTest gracefulShutdownTest = (GracefulShutdownTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        this.instances[1].shutdown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/GracefulShutdownTest") && serializedLambda.getImplMethodSignature().equals("(I)Lcom/hazelcast/jet/core/Processor;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return new EmitIntegersP(intValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/GracefulShutdownTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new EmitIntegersP(50000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/GracefulShutdownTest") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    GracefulShutdownTest gracefulShutdownTest2 = (GracefulShutdownTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        HazelcastInstance createHazelcastInstance = createHazelcastInstance();
                        sleepSeconds(1);
                        createHazelcastInstance.shutdown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/core/HazelcastInstance") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    HazelcastInstance hazelcastInstance = (HazelcastInstance) serializedLambda.getCapturedArg(0);
                    return hazelcastInstance::shutdown;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
