package com.hazelcast.jet.core;

import com.hazelcast.config.Config;
import com.hazelcast.config.ConfigAccessor;
import com.hazelcast.config.ServiceConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.services.GracefulShutdownAwareService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.DiagnosticProcessors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.starter.ReflectionUtils;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.NotThrownAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/GracefulShutdownTest_SlowShutdown.class */
public class GracefulShutdownTest_SlowShutdown extends JetTestSupport {
    private static final int NODE_COUNT = 3;
    private HazelcastInstance[] instances;
    private final CountDownLatch shutdownInitiated = new CountDownLatch(1);
    private final CountDownLatch shutdownProceed = new CountDownLatch(1);
    private final GracefulShutdownAwareService slowShutdownService = (j, timeUnit) -> {
        this.shutdownInitiated.countDown();
        try {
            this.shutdownProceed.await();
            return true;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    };

    @Before
    public void setup() {
        this.instances = createHazelcastInstances(newConfigWithSlowShutdownService(), 3);
    }

    private Config newConfigWithSlowShutdownService() {
        Config smallInstanceConfig = smallInstanceConfig();
        ConfigAccessor.getServicesConfig(smallInstanceConfig).addServiceConfig(new ServiceConfig().setEnabled(true).setName("slow-shutdown-service").setImplementation(this.slowShutdownService));
        return smallInstanceConfig;
    }

    @After
    public void unblockShutdown() {
        this.shutdownProceed.countDown();
    }

    @Test
    public void when_shutdownWithRaceCondition_then_failJobSubmittedToMaster() throws IllegalAccessException {
        when_shutdownWithRaceCondition_then_failJob(this.instances[0]);
    }

    @Test
    public void when_shutdownWithRaceCondition_then_failJobSubmittedToNonMaster() throws IllegalAccessException {
        when_shutdownWithRaceCondition_then_failJob(this.instances[2]);
    }

    private void when_shutdownWithRaceCondition_then_failJob(HazelcastInstance hazelcastInstance) throws IllegalAccessException {
        DAG dag = getDag();
        AtomicBoolean atomicBoolean = (AtomicBoolean) ReflectionUtils.getFieldValueReflectively(Accessors.getNode(this.instances[1]), "shuttingDown");
        try {
            atomicBoolean.set(true);
            Job submit = submit(dag, hazelcastInstance);
            Objects.requireNonNull(submit);
            Assertions.assertThatThrownBy(submit::join).hasRootCauseInstanceOf(TopologyChangedException.class);
            atomicBoolean.set(false);
        } catch (Throwable th) {
            atomicBoolean.set(false);
            throw th;
        }
    }

    @Test
    public void when_slowShutdown_thenNoException() throws InterruptedException {
        DAG dag = getDag();
        Executors.newSingleThreadExecutor().submit(() -> {
            this.instances[1].shutdown();
        });
        this.shutdownInitiated.await();
        UUID uuid = this.instances[1].getCluster().getLocalMember().getUuid();
        assertShuttingDownMembers(Collections.emptySet(), this.instances[1]);
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.singleton(uuid), this.instances[0]);
        });
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.singleton(uuid), this.instances[2]);
        });
        Job submit = submit(dag);
        NotThrownAssert assertThatNoException = Assertions.assertThatNoException();
        Objects.requireNonNull(submit);
        assertThatNoException.isThrownBy(submit::join);
    }

    @Test
    public void when_shutDown_allMembersAddMemberShuttingDownListAndThenRemove() throws InterruptedException {
        Executors.newSingleThreadExecutor().submit(() -> {
            this.instances[1].shutdown();
        });
        this.shutdownInitiated.await();
        UUID uuid = this.instances[1].getCluster().getLocalMember().getUuid();
        assertShuttingDownMembers(Collections.emptySet(), this.instances[1]);
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.singleton(uuid), this.instances[0]);
        });
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.singleton(uuid), this.instances[2]);
        });
        this.shutdownProceed.countDown();
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.emptySet(), this.instances[0]);
        });
        assertTrueEventually(() -> {
            assertShuttingDownMembers(Collections.emptySet(), this.instances[2]);
        });
    }

    @Nonnull
    private static DAG getDag() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("source", TestProcessors.ListSource.supplier(Collections.singletonList(1))), dag.newVertex("faulty", DiagnosticProcessors.writeLoggerP())));
        return dag;
    }

    private Job submit(DAG dag) {
        return submit(dag, this.instances[2]);
    }

    private Job submit(DAG dag, HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getJet().newLightJob(dag);
    }

    private void assertShuttingDownMembers(Set<UUID> set, HazelcastInstance hazelcastInstance) {
        try {
            Assertions.assertThat(((Map) ReflectionUtils.getFieldValueReflectively(((JetServiceBackend) Accessors.getService(hazelcastInstance, "hz:impl:jetService")).getJobCoordinationService(), "membersShuttingDown")).keySet()).isEqualTo(set);
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
    }
}
