package com.hazelcast.jet.impl.connector;

import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/HazelcastConnector_RestartTest.class */
public class HazelcastConnector_RestartTest extends JetTestSupport {
    private HazelcastInstance instance1;
    private HazelcastInstance instance2;

    @Before
    public void setup() {
        this.instance1 = createHazelcastInstance();
        this.instance2 = createHazelcastInstance();
    }

    @Test
    public void when_iListWrittenAndMemberShutdown_then_jobRestarts() {
        IList list = this.instance1.getList("list");
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.itemStream(10)).withoutTimestamps().writeTo(Sinks.list(list));
        Job newJob = this.instance1.getJet().newJob(create);
        assertTrueEventually(() -> {
            Assert.assertTrue("no output to sink", list.size() > 0);
        }, 10L);
        Long executionId = executionId(this.instance1, newJob);
        if (executionId == null) {
            executionId = executionId(this.instance2, newJob);
        }
        this.instance2.shutdown();
        waitExecutionDoneOnMember(this.instance1, executionId.longValue());
        int size = list.size();
        assertTrueEventually(() -> {
            Assert.assertTrue("no output after migration completed", list.size() > size);
        }, 20L);
    }

    private Long executionId(HazelcastInstance hazelcastInstance, Job job) {
        return getJetServiceBackend(hazelcastInstance).getJobExecutionService().getExecutionIdForJobId(job.getId());
    }

    private void waitExecutionDoneOnMember(HazelcastInstance hazelcastInstance, long j) {
        ExecutionContext executionContext = getJetServiceBackend(hazelcastInstance).getJobExecutionService().getExecutionContext(j);
        assertTrueEventually(() -> {
            Assert.assertTrue(executionContext == null || executionContext.getExecutionFuture().isDone());
        });
    }
}
