package com.hazelcast.jet.core;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IndeterminateOperationStateException;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.internal.partition.InternalPartition;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.partition.impl.PartitionDataSerializerHook;
import com.hazelcast.internal.server.OperationPacketFilter;
import com.hazelcast.internal.server.PacketFilter;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.JobProxy;
import com.hazelcast.jet.impl.SnapshotValidationRecord;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.spi.impl.SpiDataSerializerHook;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.PacketFiltersUtil;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.Repeat;
import com.hazelcast.test.annotation.SlowTest;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;

@RunWith(Enclosed.class)
/* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest.class */
public class IndeterminateSnapshotTest {
    public static final int NODE_COUNT = 5;
    private static final int BASE_PORT = 5701;
    private static final int LOCAL_PARALLELISM = 10;

    /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$IndeterminateSnapshotTestBase.class */
    public static abstract class IndeterminateSnapshotTestBase extends JetTestSupport {
        protected HazelcastInstance[] instances;
        protected CountDownLatch snapshotDone;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.hazelcast.test.HazelcastTestSupport
        public Config getConfig() {
            return smallInstanceConfig();
        }

        @Before
        public void setup() {
            initSnapshotDoneCounter();
            SnapshotInstrumentationP.reset();
            this.instances = createHazelcastInstances(getConfig(), 5);
        }

        private int getActiveNodesCount() {
            return 5 - ((this.instances == null || !this.instances[0].getLifecycleService().isRunning()) ? 1 : 0);
        }

        protected void initSnapshotDoneCounter() {
            this.snapshotDone = new CountDownLatch(getActiveNodesCount() * 10);
        }

        protected static <T> Consumer<T> firstExecutionConsumer(Consumer<T> consumer) {
            boolean[] zArr = {false};
            return obj -> {
                synchronized (zArr) {
                    if (!zArr[0]) {
                        zArr[0] = true;
                        consumer.accept(obj);
                    }
                }
            };
        }

        protected <T> Consumer<T> lastExecutionConsumer(Consumer<T> consumer) {
            AtomicInteger atomicInteger = new AtomicInteger();
            return obj -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                int activeNodesCount = getActiveNodesCount() * 10;
                if (!$assertionsDisabled && incrementAndGet > activeNodesCount) {
                    throw new AssertionError();
                }
                if (incrementAndGet == activeNodesCount) {
                    consumer.accept(obj);
                }
            };
        }

        protected <T> Consumer<T> breakSnapshotConsumer(String str, Runnable runnable) {
            return firstExecutionConsumer(obj -> {
                this.logger.info("Breaking replication in " + str + " for " + obj);
                runnable.run();
                this.logger.finest("Proceeding with " + str + " " + obj + " after breaking replication");
            });
        }

        protected void waitForSnapshot() {
            this.logger.info("Waiting for selected snapshot");
            try {
                Assert.assertTrue(this.snapshotDone.await(30L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                Assertions.fail("Interrupted", e);
            }
            this.logger.info("Got selected snapshot");
        }

        protected void assertSnapshotNotCommitted() {
            Assertions.assertThat(this.snapshotDone.getCount()).as("Snapshot must not be committed when indeterminate", new Object[0]).isEqualTo(getActiveNodesCount() * 10);
        }

        protected static void assertRestoredFromSnapshot(int i) {
            if (i < 0) {
                Assert.assertFalse("Should not be restored from snapshot", SnapshotInstrumentationP.restoredFromSnapshot);
            } else {
                Assert.assertTrue("Should be restored from snapshot", SnapshotInstrumentationP.restoredFromSnapshot);
                Assertions.assertThat(SnapshotInstrumentationP.restoredCounters.values()).as("Should restore from last known good snapshot", new Object[0]).containsOnly(new Integer[]{Integer.valueOf(i)});
            }
        }

        protected static void assumeLastSnapshotPresent(int i) {
            if (i >= 0) {
                Assumptions.assumeThat(SnapshotInstrumentationP.savedCounters.values()).as("Current snapshot is different than expected", new Object[0]).containsOnly(new Integer[]{Integer.valueOf(i)});
            } else {
                Assumptions.assumeThat(SnapshotInstrumentationP.savedCounters).as("Unexpected snapshot", new Object[0]).isEmpty();
            }
        }

        protected static void assertJobNotRestarted() {
            Assertions.assertThat(SnapshotInstrumentationP.executions).as("Job should not be restarted during test", new Object[0]).hasSize(1);
        }

        protected static void assertJobRestarted() {
            Assertions.assertThat(SnapshotInstrumentationP.executions).as("Job should be restarted during test", new Object[0]).hasSizeGreaterThan(1);
        }

        @Nonnull
        protected Job createJob(int i) {
            DAG dag = new DAG();
            dag.edge(Edge.between(dag.newVertex("source", TestUtil.throttle((SupplierEx<Processor>) () -> {
                return new SnapshotInstrumentationP(i);
            }, 1L)).localParallelism(10), dag.newVertex("sink", Processors.noopP()).localParallelism(1)));
            return this.instances[0].getJet().newJob(dag, customizeJobConfig(new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(1000L)));
        }

        protected JobConfig customizeJobConfig(JobConfig jobConfig) {
            return jobConfig;
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1288960982:
                    if (implMethodName.equals("lambda$createJob$56302144$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/IndeterminateSnapshotTest$IndeterminateSnapshotTestBase") && serializedLambda.getImplMethodSignature().equals("(I)Lcom/hazelcast/jet/core/Processor;")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return () -> {
                            return new SnapshotInstrumentationP(intValue);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }

        static {
            $assertionsDisabled = !IndeterminateSnapshotTest.class.desiredAssertionStatus();
        }
    }

    @RunWith(HazelcastSerialClassRunner.class)
    @Category({SlowTest.class, ParallelJVMTest.class})
    /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$ReplicationBreakingTests.class */
    public static class ReplicationBreakingTests extends IndeterminateSnapshotTestBase {
        private int masterKeyPartitionInstanceIdx;
        private int backupKeyPartitionInstanceIdx;
        private Integer masterJobPartitionInstanceIdx;
        private Integer backupJobPartitionInstanceIdx;
        private CompletableFuture<Integer> failingInstanceFuture;
        private boolean test1Succeeded;
        private boolean test2Succeeded;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$ReplicationBreakingTests$MapBackupPacketDropFilter.class */
        public static class MapBackupPacketDropFilter extends OperationPacketFilter {
            private final ILogger logger;
            private final Address sourceAddress;
            private final float blockRatio;

            MapBackupPacketDropFilter(HazelcastInstance hazelcastInstance, float f) {
                super(Accessors.getNode(hazelcastInstance).getSerializationService());
                this.logger = Logger.getLogger(getClass());
                this.sourceAddress = hazelcastInstance.getCluster().getLocalMember().getAddress();
                this.blockRatio = f;
            }

            @Override // com.hazelcast.internal.server.OperationPacketFilter
            protected PacketFilter.Action filterOperation(Address address, int i, int i2) {
                if (i == PartitionDataSerializerHook.F_ID && i2 == 25) {
                    return PacketFilter.Action.DROP;
                }
                if (i != SpiDataSerializerHook.F_ID || i2 != 1) {
                    return PacketFilter.Action.ALLOW;
                }
                PacketFilter.Action action = Math.random() >= ((double) this.blockRatio) ? PacketFilter.Action.ALLOW : PacketFilter.Action.DROP;
                this.logger.info(this.sourceAddress + " sending backup packet to " + address + ", action: " + action);
                return action;
            }
        }

        @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.IndeterminateSnapshotTestBase, com.hazelcast.test.HazelcastTestSupport
        public Config getConfig() {
            Config config = super.getConfig();
            config.setProperty(ClusterProperty.PARTITION_BACKUP_SYNC_INTERVAL.getName(), "120");
            return config;
        }

        @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.IndeterminateSnapshotTestBase
        @Before
        public void setup() {
            super.setup();
            this.failingInstanceFuture = new CompletableFuture<>();
        }

        protected void setupJetTests(int i) {
            SnapshotInstrumentationP.allowedSnapshotsCount = i;
            SnapshotInstrumentationP.snapshotCommitFinishConsumer = bool -> {
                SnapshotInstrumentationP.saveSnapshotConsumer = null;
                SnapshotInstrumentationP.snapshotCommitPrepareConsumer = null;
                this.snapshotDone.countDown();
            };
            setupPartitions();
        }

        @Test
        public void mapShouldBeCorrupted_whenReplicationNetworkFailure() throws ExecutionException, InterruptedException, TimeoutException {
            this.instances[0].getMap("dummy").put("1", "2");
            IMap map = this.instances[0].getMap("testMapPutAndGet");
            InternalPartition partitionForKey = getPartitionForKey("Hello");
            this.logger.fine("KEY partition id:" + partitionForKey.getPartitionId() + " owner: " + partitionForKey.getOwnerOrNull());
            for (int i = 0; i < 3; i++) {
                this.logger.fine("KEY partition addr:" + partitionForKey.getReplica(i));
            }
            int port = partitionForKey.getReplica(0).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            int port2 = partitionForKey.getReplica(1).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            int port3 = partitionForKey.getReplica(2).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            disableBackupsFrom(port);
            String str = (String) map.putAsync("Hello", "World").toCompletableFuture().get();
            Assert.assertEquals("World", map.get("Hello"));
            Assert.assertEquals(1L, map.size());
            Assert.assertNull(str);
            CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, 60).mapToObj(i2 -> {
                return map.putAsync("Hello" + i2, "World");
            }).map((v0) -> {
                return v0.toCompletableFuture();
            }).toArray(i3 -> {
                return new CompletableFuture[i3];
            })).get(10L, TimeUnit.SECONDS);
            this.logger.info("Killing master: " + port);
            this.instances[port].getLifecycleService().terminate();
            assertTrueEventually(() -> {
                boolean z = this.instances[port2].getPartitionService().isLocalMemberSafe() && this.instances[port3].getPartitionService().isLocalMemberSafe();
                if (!z) {
                    this.logger.fine("Partition replication is not in safe state");
                }
                Assert.assertTrue(z);
            });
            Assert.assertNull("Should lose write to master", this.instances[port3].getMap("testMapPutAndGet").get("Hello"));
            assertGreaterOrEquals("Should keep some writes", r0.size(), 2L);
            Assert.assertNotEquals("Should lose some writes", 61L, r0.size());
            Assert.assertNull("Should lose write to master", this.instances[port2].getMap("testMapPutAndGet").get("Hello"));
            assertGreaterOrEquals("Should keep some writes", r0.size(), 2L);
            Assert.assertNotEquals("Should lose some writes", 61L, r0.size());
        }

        @Test
        public void mapShouldNotBeFullyCleared_whenReplicationNetworkFailure() {
            this.instances[0].getMap("dummy").put("1", "2");
            IMap map = this.instances[0].getMap("testMapPutAndGet");
            InternalPartition partitionForKey = getPartitionForKey("Hello");
            this.logger.fine("KEY partition id:" + partitionForKey.getPartitionId() + " owner: " + partitionForKey.getOwnerOrNull());
            for (int i = 0; i < 3; i++) {
                this.logger.fine("KEY partition addr:" + partitionForKey.getReplica(i));
            }
            int port = partitionForKey.getReplica(0).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            int port2 = partitionForKey.getReplica(1).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            int port3 = partitionForKey.getReplica(2).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            String str = (String) map.put("Hello", "World");
            Assert.assertEquals("World", map.get("Hello"));
            Assert.assertEquals(1L, map.size());
            Assert.assertNull(str);
            IntStream.range(0, 60).forEach(i2 -> {
                map.put("Hello" + i2, "World");
            });
            Assert.assertEquals(61L, map.size());
            disableBackupsFrom(port);
            map.clear();
            this.logger.info("Killing master: " + port);
            this.instances[port].getLifecycleService().terminate();
            assertTrueEventually(() -> {
                boolean z = this.instances[port2].getPartitionService().isLocalMemberSafe() && this.instances[port3].getPartitionService().isLocalMemberSafe();
                if (!z) {
                    this.logger.fine("Partition replication is not in safe state");
                }
                Assert.assertTrue(z);
            });
            IMap map2 = this.instances[port3].getMap("testMapPutAndGet");
            this.logger.fine("Size after clear: " + map2.size());
            assertGreaterOrEquals("Should lose some clear executions", map2.size(), 1L);
            Assert.assertEquals("Should lose clear on master", "World", map2.get("Hello"));
            Assert.assertNotEquals("Should clear some entries", 61L, map2.size());
            Assert.assertEquals("Should lose clear on master", "World", this.instances[port2].getMap("testMapPutAndGet").get("Hello"));
            Assert.assertNotEquals("Should clear some entries", 61L, r0.size());
        }

        @Test
        public void whenFirstSnapshotPossiblyCorrupted_thenRestartWithoutSnapshot() {
            when_shutDown(0);
        }

        @Test
        public void whenNextSnapshotPossiblyCorrupted_thenRestartFromLastGoodSnapshot() {
            when_shutDown(3);
        }

        private void setupPartitions() {
            InternalPartition partitionForKey = getPartitionForKey(SnapshotValidationRecord.KEY);
            this.logger.info("SVR KEY partition id: " + partitionForKey.getPartitionId());
            for (int i = 0; i < 3; i++) {
                this.logger.info("SVR KEY partition addr: " + partitionForKey.getReplica(i));
            }
            this.masterKeyPartitionInstanceIdx = partitionForKey.getReplica(0).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
            this.backupKeyPartitionInstanceIdx = partitionForKey.getReplica(1).address().getPort() - IndeterminateSnapshotTest.BASE_PORT;
        }

        private <K> InternalPartition getPartitionForKey(K k) {
            InternalPartitionService partitionService = Accessors.getNode(this.instances[0]).getPartitionService();
            return partitionService.getPartition(partitionService.getPartitionId(k));
        }

        private void when_shutDown(int i) {
            Assert.assertTrue(true);
            setupJetTests(i);
            SnapshotInstrumentationP.saveSnapshotConsumer = breakSnapshotConsumer("snapshot", this::breakFailingInstance);
            Job createJob = createJob(i + 5);
            int chooseFailingInstance = chooseFailingInstance(Integer.valueOf(this.masterKeyPartitionInstanceIdx), Integer.valueOf(this.backupKeyPartitionInstanceIdx), this.masterJobPartitionInstanceIdx, this.backupJobPartitionInstanceIdx);
            int i2 = this.masterKeyPartitionInstanceIdx;
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            waitForSnapshot();
            sleepMillis(500);
            this.logger.info("Shutting down instance... " + chooseFailingInstance);
            this.instances[chooseFailingInstance].getLifecycleService().terminate();
            this.logger.info("Removed instance " + chooseFailingInstance + " from cluster");
            restoreNetwork();
            this.logger.info("Joining job...");
            this.instances[i2].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i - 1);
        }

        @Test
        @Repeat(5)
        public void whenFirstSnapshotPossiblyCorruptedAfter1stPhase_thenRestartWithoutSnapshot() {
            ((AbstractBooleanAssert) Assumptions.assumeThat(this.test1Succeeded).as("Test already succeeded", new Object[0])).isFalse();
            when_shutDownAfter1stPhase(0);
            this.test1Succeeded = true;
        }

        @Test
        @Repeat(5)
        public void whenNextSnapshotPossiblyCorruptedAfter1stPhase_thenRestartFromLastGoodSnapshot() {
            ((AbstractBooleanAssert) Assumptions.assumeThat(this.test2Succeeded).as("Test already succeeded", new Object[0])).isFalse();
            when_shutDownAfter1stPhase(3);
            this.test2Succeeded = true;
        }

        private void when_shutDownAfter1stPhase(int i) {
            setupJetTests(i);
            SnapshotInstrumentationP.snapshotCommitPrepareConsumer = breakSnapshotConsumer("commit prepare", this::breakFailingInstance);
            Job createJob = createJob(i + 5);
            Assumptions.assumeThat(this.masterKeyPartitionInstanceIdx).as("Should not damage job data", new Object[0]).isNotEqualTo(this.masterJobPartitionInstanceIdx).as("Should not restart Jet master", new Object[0]).isNotEqualTo(0);
            int chooseConstantFailingInstance = chooseConstantFailingInstance(this.masterKeyPartitionInstanceIdx);
            int i2 = this.backupKeyPartitionInstanceIdx;
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            waitForSnapshot();
            sleepMillis(500);
            this.logger.info("Shutting down instance... " + chooseConstantFailingInstance);
            this.instances[chooseConstantFailingInstance].getLifecycleService().terminate();
            this.logger.info("Removed instance " + chooseConstantFailingInstance + " from cluster");
            restoreNetwork();
            this.logger.info("Joining job...");
            this.instances[i2].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i - 1);
        }

        @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.IndeterminateSnapshotTestBase
        @Nonnull
        protected Job createJob(int i) {
            Job createJob = super.createJob(i);
            InternalPartition partitionForKey = getPartitionForKey(Long.valueOf(createJob.getId()));
            this.logger.info("Job partition id:" + partitionForKey.getPartitionId());
            for (int i2 = 0; i2 < 3; i2++) {
                this.logger.info("Job partition addr:" + partitionForKey.getReplica(i2));
            }
            this.masterJobPartitionInstanceIdx = Integer.valueOf(partitionForKey.getReplica(0).address().getPort() - IndeterminateSnapshotTest.BASE_PORT);
            this.backupJobPartitionInstanceIdx = Integer.valueOf(partitionForKey.getReplica(1).address().getPort() - IndeterminateSnapshotTest.BASE_PORT);
            return createJob;
        }

        private int chooseFailingInstance(Integer... numArr) {
            Set set = (Set) IntStream.range(0, 5).boxed().collect(Collectors.toSet());
            List asList = Arrays.asList(numArr);
            Objects.requireNonNull(set);
            asList.forEach((v1) -> {
                r1.remove(v1);
            });
            this.logger.info("Replicas that can be broken: " + set);
            Assertions.assertThat(set).isNotEmpty();
            return chooseConstantFailingInstance(((Integer) set.stream().findAny().get()).intValue());
        }

        private int chooseConstantFailingInstance(int i) {
            this.logger.info("Failing instance selected: " + i);
            this.failingInstanceFuture.complete(Integer.valueOf(i));
            return i;
        }

        private void breakFailingInstance() {
            try {
                disableBackupsFrom(this.failingInstanceFuture.get().intValue());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        private void restoreNetwork() {
            for (int i = 0; i < 5; i++) {
                if (this.instances[i].getLifecycleService().isRunning()) {
                    PacketFiltersUtil.resetPacketFiltersFrom(this.instances[i]);
                }
            }
        }

        private void disableBackupsFrom(int i) {
            int i2 = 0;
            while (i2 < this.instances.length) {
                PacketFiltersUtil.setCustomFilter(this.instances[i2], new MapBackupPacketDropFilter(this.instances[i2], i == i2 ? 1.0f : 0.0f));
                i2++;
            }
        }
    }

    @Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
    @RunWith(HazelcastParametrizedRunner.class)
    @Category({SlowTest.class, ParallelJVMTest.class})
    /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests.class */
    public static class SnapshotFailureTests extends IndeterminateSnapshotTestBase {

        @Parameterized.Parameter
        public boolean suspendOnFailure;
        private final MapInterceptor registerMockInClassloader = (MapInterceptor) Mockito.mock(MapInterceptor.class, Mockito.withSettings().serializable());

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests$AbstractScenarioStep.class */
        public abstract class AbstractScenarioStep {

            @Nullable
            private AbstractScenarioStep next;

            @Nullable
            protected AbstractScenarioStep prev;

            @Nullable
            private Integer repetitions = 1;
            static final /* synthetic */ boolean $assertionsDisabled;

            private AbstractScenarioStep() {
            }

            public AbstractScenarioStep repeat(@Nullable Integer num) {
                if (num != null && num.intValue() < 0) {
                    throw new IllegalArgumentException("must be >0, but is " + num);
                }
                this.repetitions = num;
                return this;
            }

            public <T extends AbstractScenarioStep> T then(T t) {
                this.next = t;
                t.prev = this;
                return t;
            }

            public final void apply() {
                if (this.repetitions != null && this.repetitions.intValue() <= 0) {
                    if (!$assertionsDisabled && this.repetitions.intValue() != 0) {
                        throw new AssertionError();
                    }
                    SnapshotFailureTests.this.logger.info("Skipping scenario step " + this);
                    goToNextStep();
                    return;
                }
                SnapshotFailureTests.this.logger.info("Applying scenario step " + this);
                if (this.repetitions != null) {
                    boolean z = SnapshotInstrumentationP.allowedSnapshotsCount == 0;
                    SnapshotInstrumentationP.allowedSnapshotsCount += this.repetitions.intValue();
                    if (z) {
                        SnapshotInstrumentationP.allowedSnapshotsCount--;
                    }
                }
                SnapshotInstrumentationP.saveSnapshotConsumer = null;
                SnapshotInstrumentationP.snapshotCommitPrepareConsumer = null;
                Consumer consumer = bool -> {
                    SnapshotFailureTests.this.snapshotDone.countDown();
                };
                SnapshotInstrumentationP.snapshotCommitFinishConsumer = consumer.andThen(SnapshotFailureTests.this.lastExecutionConsumer(bool2 -> {
                    goToNextStep();
                }));
                doApply();
            }

            protected abstract void doApply();

            protected void goToNextStep() {
                if (this.next != null) {
                    this.next.apply();
                } else {
                    SnapshotFailureTests.this.logger.info("End of scenario");
                }
            }

            public final void start() {
                getFirst().apply();
            }

            @Nonnull
            private AbstractScenarioStep getFirst() {
                AbstractScenarioStep abstractScenarioStep = this;
                while (true) {
                    AbstractScenarioStep abstractScenarioStep2 = abstractScenarioStep;
                    if (abstractScenarioStep2.prev == null) {
                        return abstractScenarioStep2;
                    }
                    abstractScenarioStep = abstractScenarioStep2.prev;
                }
            }

            @Nullable
            public Integer getRepetitions() {
                return this.repetitions;
            }

            static {
                $assertionsDisabled = !IndeterminateSnapshotTest.class.desiredAssertionStatus();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests$IndeterminateLostPut.class */
        public class IndeterminateLostPut extends AbstractScenarioStep {
            public IndeterminateLostPut() {
                super();
                repeat(null);
            }

            @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.SnapshotFailureTests.AbstractScenarioStep
            protected void doApply() {
                SnapshotFailureTests.this.initSnapshotDoneCounter();
                SnapshotFailureTests snapshotFailureTests = SnapshotFailureTests.this;
                SnapshotFailureTests snapshotFailureTests2 = SnapshotFailureTests.this;
                SnapshotInstrumentationP.saveSnapshotConsumer = snapshotFailureTests.breakSnapshotConsumer("snapshot save", snapshotFailureTests2::singleIndeterminatePutLost).andThen(SnapshotFailureTests.this.lastExecutionConsumer(num -> {
                    goToNextStep();
                }));
            }

            public String toString() {
                return "IndeterminateLostPut";
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests$IndeterminateLostPutsUntil.class */
        public class IndeterminateLostPutsUntil extends AbstractScenarioStep {
            private final CompletableFuture<String> registration;

            public <T> IndeterminateLostPutsUntil(CompletableFuture<T> completableFuture) {
                super();
                this.registration = new CompletableFuture<>();
                repeat(null);
                completableFuture.thenCombineAsync((CompletionStage) this.registration, (BiFunction) (obj, str) -> {
                    SnapshotFailureTests.this.logger.info("Fixing IMap replication");
                    SnapshotFailureTests.this.getJobExecutionRecordIMap().removeInterceptor(str);
                    goToNextStep();
                    return null;
                });
            }

            @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.SnapshotFailureTests.AbstractScenarioStep
            protected void doApply() {
                SnapshotFailureTests.this.initSnapshotDoneCounter();
                SnapshotInstrumentationP.snapshotCommitPrepareConsumer = SnapshotFailureTests.this.breakSnapshotConsumer("snapshot commit prepare", () -> {
                    this.registration.complete(SnapshotFailureTests.this.allIndeterminatePutsLost());
                });
            }

            public String toString() {
                return "IndeterminateLostPutsUntil";
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests$SuccessfulIgnoredSnapshots.class */
        public class SuccessfulIgnoredSnapshots extends AbstractScenarioStep {
            SuccessfulIgnoredSnapshots() {
                super();
            }

            public SuccessfulIgnoredSnapshots(SnapshotFailureTests snapshotFailureTests, int i) {
                this();
                repeat(Integer.valueOf(i));
            }

            @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.SnapshotFailureTests.AbstractScenarioStep
            protected void doApply() {
            }

            public String toString() {
                return String.format("Successful %d Snapshots - do not count", getRepetitions());
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotFailureTests$SuccessfulSnapshots.class */
        public class SuccessfulSnapshots extends AbstractScenarioStep {
            SuccessfulSnapshots() {
                super();
            }

            public SuccessfulSnapshots(SnapshotFailureTests snapshotFailureTests, int i) {
                this();
                repeat(Integer.valueOf(i));
            }

            public SuccessfulSnapshots reusedCounter() {
                repeat(getRepetitions().intValue() > 1 ? Integer.valueOf(getRepetitions().intValue() - 1) : null);
                return this;
            }

            @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.SnapshotFailureTests.AbstractScenarioStep
            protected void doApply() {
                SnapshotFailureTests.this.initSnapshotDoneCounter();
                Consumer consumer = bool -> {
                    SnapshotFailureTests.this.snapshotDone.countDown();
                };
                SnapshotInstrumentationP.snapshotCommitFinishConsumer = consumer.andThen(SnapshotFailureTests.this.lastExecutionConsumer(bool2 -> {
                    goToNextStep();
                }));
            }

            public String toString() {
                return String.format("Successful %d Snapshots", getRepetitions());
            }
        }

        @Parameterized.Parameters(name = "suspendOnFailure:{0}")
        public static Object[] parameters() {
            return new Object[]{false, true};
        }

        @Override // com.hazelcast.jet.core.IndeterminateSnapshotTest.IndeterminateSnapshotTestBase
        protected JobConfig customizeJobConfig(JobConfig jobConfig) {
            return jobConfig.setSuspendOnFailure(this.suspendOnFailure);
        }

        @Test
        public void whenNextSnapshotUpdateLostSameCoordinator_thenRestartFromFirstSnapshot() {
            whenSnapshotUpdateLostSameCoordinator(0);
        }

        @Test
        public void whenNextSnapshotUpdateLostSameCoordinator_thenRestartFromLastGoodSnapshot() {
            whenSnapshotUpdateLostSameCoordinator(3);
        }

        private void whenSnapshotUpdateLostSameCoordinator(int i) {
            ((SuccessfulIgnoredSnapshots) ((IndeterminateLostPut) new SuccessfulSnapshots(this, i).then(new IndeterminateLostPut())).then(new SuccessfulIgnoredSnapshots(this, 100))).start();
            Job createJob = createJob(i + 3);
            this.logger.info("Joining job...");
            this.instances[0].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i);
        }

        @Test
        public void whenSnapshotUpdateLostChangedCoordinatorNoOtherSnapshot_thenRestartWithoutSnapshot() {
            whenSnapshotUpdateLostChangedCoordinator(0);
        }

        @Test
        public void whenSnapshotUpdateLostChangedCoordinator_thenRestartFromLastGoodSnapshot() {
            whenSnapshotUpdateLostChangedCoordinator(3);
        }

        private void whenSnapshotUpdateLostChangedCoordinator(int i) {
            CompletableFuture completableFuture = new CompletableFuture();
            ((SuccessfulIgnoredSnapshots) ((SuccessfulSnapshots) ((IndeterminateLostPutsUntil) new SuccessfulSnapshots(this, i).then(new IndeterminateLostPutsUntil(completableFuture))).then(new SuccessfulSnapshots(this, 1).reusedCounter())).then(new SuccessfulIgnoredSnapshots(this, 100))).start();
            Job createJob = createJob(i + 3);
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            if (i > 0) {
                waitForSnapshot();
            }
            this.logger.info("Waiting for job restart...");
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.STARTING);
            assertSnapshotNotCommitted();
            this.logger.info("Terminating coordinator...");
            this.instances[0].getLifecycleService().terminate();
            completableFuture.complete(null);
            this.logger.info("Joining job...");
            this.instances[1].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            Assertions.assertThat(this.snapshotDone.getCount()).as("Snapshot should be ultimately committed", new Object[0]).isZero();
            assertRestoredFromSnapshot(i - 1);
        }

        @Test
        public void whenSuspendSnapshotUpdateLostSameCoordinatorAndNoOtherSnapshots_thenRestartFromSuspendSnapshot() {
            whenSuspendSnapshotUpdateLostSameCoordinator(0);
        }

        @Test
        public void whenSuspendSnapshotUpdateLostSameCoordinator_thenRestartFromSuspendSnapshot() {
            whenSuspendSnapshotUpdateLostSameCoordinator(3);
        }

        private void whenSuspendSnapshotUpdateLostSameCoordinator(int i) {
            ((SuccessfulIgnoredSnapshots) ((IndeterminateLostPut) new SuccessfulSnapshots(this, i).then(new IndeterminateLostPut())).then(new SuccessfulIgnoredSnapshots(this, 100))).start();
            Job createJob = createJob(i + 3);
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            if (i > 0) {
                waitForSnapshot();
            }
            assumeLastSnapshotPresent(i - 1);
            createJob.suspend();
            this.logger.info("Joining job...");
            this.instances[0].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i);
            assertJobRestarted();
        }

        @Test
        public void whenSuspendSnapshotUpdateLostChangedCoordinatorAndNoOtherSnapshots_thenRestartWithoutSnapshot() {
            whenSuspendSnapshotUpdateLostChangedCoordinator(0);
        }

        @Test
        public void whenSuspendSnapshotUpdateLostChangedCoordinator_thenRestartFromRegularSnapshot() {
            whenSuspendSnapshotUpdateLostChangedCoordinator(3);
        }

        private void whenSuspendSnapshotUpdateLostChangedCoordinator(int i) {
            CompletableFuture completableFuture = new CompletableFuture();
            ((SuccessfulIgnoredSnapshots) ((IndeterminateLostPutsUntil) new SuccessfulSnapshots(this, i).then(new IndeterminateLostPutsUntil(completableFuture))).then(new SuccessfulIgnoredSnapshots(this, 100))).start();
            Job createJob = createJob(i + 3);
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            if (i > 0) {
                waitForSnapshot();
            }
            createJob.suspend();
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.STARTING);
            this.logger.info("Suspend failed and job restarted");
            assertSnapshotNotCommitted();
            this.instances[0].getLifecycleService().terminate();
            completableFuture.complete(null);
            Job job = this.instances[1].getJet().getJob(createJob.getId());
            JobAssertions.assertThat(job).eventuallyHasStatus(JobStatus.RUNNING);
            this.logger.info("Joining job...");
            job.join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i - 1);
        }

        @Test
        public void whenRestartGracefulSnapshotUpdateLostSameCoordinatorAndNoOtherSnapshots_thenRestartFromRestartSnapshot() throws InterruptedException {
            whenRestartGracefulSnapshotUpdateLostSameCoordinator(0);
        }

        @Test
        public void whenRestartGracefulSnapshotUpdateLostSameCoordinator_thenRestartFromRestartSnapshot() throws InterruptedException {
            whenRestartGracefulSnapshotUpdateLostSameCoordinator(3);
        }

        private void whenRestartGracefulSnapshotUpdateLostSameCoordinator(int i) throws InterruptedException {
            ((SuccessfulIgnoredSnapshots) ((IndeterminateLostPut) new SuccessfulSnapshots(this, i).then(new IndeterminateLostPut())).then(new SuccessfulIgnoredSnapshots(this, 100))).start();
            JobProxy createJob = createJob(i + 3);
            JobAssertions.assertThat(createJob).eventuallyHasStatus(JobStatus.RUNNING);
            if (i > 0) {
                this.snapshotDone.await();
            }
            assumeLastSnapshotPresent(i - 1);
            createJob.restart(true);
            this.logger.info("Joining job...");
            this.instances[0].getJet().getJob(createJob.getId()).join();
            this.logger.info("Joined");
            assertRestoredFromSnapshot(i);
        }

        @Nonnull
        private IMap<Object, Object> getJobExecutionRecordIMap() {
            return this.instances[4].getMap("__jet.executionRecords");
        }

        private void singleIndeterminatePutLost() {
            MapInterceptor mapInterceptor = (MapInterceptor) Mockito.mock(MapInterceptor.class, Mockito.withSettings().serializable());
            Mockito.when(mapInterceptor.interceptPut(ArgumentMatchers.any(), ArgumentMatchers.any())).thenThrow(new Throwable[]{new IndeterminateOperationStateException("Simulated lost IMap update")}).thenAnswer(AdditionalAnswers.returnsSecondArg());
            getJobExecutionRecordIMap().addInterceptor(mapInterceptor);
        }

        private String allIndeterminatePutsLost() {
            MapInterceptor mapInterceptor = (MapInterceptor) Mockito.mock(MapInterceptor.class, Mockito.withSettings().serializable());
            Mockito.when(mapInterceptor.interceptPut(ArgumentMatchers.any(), ArgumentMatchers.any())).thenAnswer(AdditionalAnswers.answersWithDelay(1000L, new ThrowsException(new IndeterminateOperationStateException("Simulated lost IMap update"))));
            return getJobExecutionRecordIMap().addInterceptor(mapInterceptor);
        }
    }

    @Ignore("test processor used by EE tests")
    /* loaded from: input_file:com/hazelcast/jet/core/IndeterminateSnapshotTest$SnapshotInstrumentationP.class */
    public static final class SnapshotInstrumentationP extends AbstractProcessor {
        private int globalIndex;
        private final int numItems;
        private int snapshotCounter;

        @Nullable
        public static volatile Consumer<Integer> saveSnapshotConsumer;

        @Nullable
        public static volatile Consumer<Integer> snapshotCommitPrepareConsumer;
        static final ConcurrentMap<Integer, Integer> savedCounters = new ConcurrentHashMap();
        static final ConcurrentMap<Integer, Integer> restoredCounters = new ConcurrentHashMap();
        static final Set<Long> executions = ConcurrentHashMap.newKeySet();
        public static volatile int allowedSnapshotsCount = 0;

        @Nonnull
        public static volatile Consumer<Boolean> snapshotCommitFinishConsumer = bool -> {
        };
        public static volatile boolean restoredFromSnapshot = false;

        SnapshotInstrumentationP(int i) {
            this.numItems = i;
        }

        public static void reset() {
            savedCounters.clear();
            restoredCounters.clear();
            executions.clear();
            allowedSnapshotsCount = 0;
            saveSnapshotConsumer = null;
            snapshotCommitPrepareConsumer = null;
            snapshotCommitFinishConsumer = bool -> {
            };
            restoredFromSnapshot = false;
        }

        protected void init(@Nonnull Processor.Context context) {
            this.globalIndex = context.globalProcessorIndex();
            executions.add(Long.valueOf(context.executionId()));
        }

        public boolean complete() {
            return tryEmit(Integer.valueOf((10000 * (this.globalIndex + 1)) + this.snapshotCounter)) && this.snapshotCounter == this.numItems;
        }

        public boolean saveToSnapshot() {
            if (!tryEmitToSnapshot(BroadcastKey.broadcastKey(Integer.valueOf(this.globalIndex)), Integer.valueOf(this.snapshotCounter))) {
                return false;
            }
            if (saveSnapshotConsumer != null && this.snapshotCounter >= allowedSnapshotsCount) {
                saveSnapshotConsumer.accept(Integer.valueOf(this.globalIndex));
            }
            savedCounters.put(Integer.valueOf(this.globalIndex), Integer.valueOf(this.snapshotCounter));
            return true;
        }

        public boolean snapshotCommitPrepare() {
            if (snapshotCommitPrepareConsumer == null || this.snapshotCounter < allowedSnapshotsCount) {
                return true;
            }
            snapshotCommitPrepareConsumer.accept(Integer.valueOf(this.globalIndex));
            return true;
        }

        public boolean snapshotCommitFinish(boolean z) {
            int i = this.snapshotCounter;
            this.snapshotCounter = i + 1;
            if (i < allowedSnapshotsCount) {
                return true;
            }
            snapshotCommitFinishConsumer.accept(Boolean.valueOf(z));
            return true;
        }

        protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
            if ((obj instanceof BroadcastKey) && ((BroadcastKey) obj).key().equals(Integer.valueOf(this.globalIndex))) {
                restoredCounters.put(Integer.valueOf(this.globalIndex), (Integer) obj2);
                this.snapshotCounter = ((Integer) obj2).intValue();
            }
            restoredFromSnapshot = true;
        }
    }
}
