package com.hazelcast.jet.impl;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.exception.CancellationByUserException;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/JobSummaryTest.class */
public class JobSummaryTest extends JetTestSupport {
    private static final String SOURCE_NAME = "source";
    private HazelcastInstance[] instances;
    private HazelcastInstance instance;
    private HazelcastInstance client;

    @Parameterized.Parameter
    public boolean useOldJobSummary;

    @Parameterized.Parameters(name = "useOldJobSummary={0}")
    public static Object[] parameters() {
        return new Object[]{true, false};
    }

    @Before
    public void setup() {
        Config smallInstanceConfig = smallInstanceConfig();
        MapConfig mapConfig = new MapConfig(SOURCE_NAME);
        mapConfig.getEventJournalConfig().setEnabled(true);
        smallInstanceConfig.addMapConfig(mapConfig);
        this.instances = createHazelcastInstances(smallInstanceConfig, 2);
        this.instance = this.instances[0];
        this.client = createHazelcastClient();
    }

    @Test
    public void when_noJobsRunning() {
        Assert.assertEquals(0L, getJobSummaryList().size());
    }

    @Test
    public void when_batchJob() {
        Job newJob = this.instance.getJet().newJob(newBatchPipeline(), new JobConfig().setName("jobA"));
        newJob.join();
        List<JobAndSqlSummary> jobSummaryList = getJobSummaryList();
        Assert.assertEquals(1L, jobSummaryList.size());
        JobAndSqlSummary jobAndSqlSummary = jobSummaryList.get(0);
        Assert.assertFalse(jobAndSqlSummary.isLightJob());
        Assert.assertEquals("jobA", jobAndSqlSummary.getNameOrId());
        Assert.assertEquals(newJob.getId(), jobAndSqlSummary.getJobId());
        Assert.assertEquals(JobStatus.COMPLETED, jobAndSqlSummary.getStatus());
        Assert.assertNull(jobAndSqlSummary.getFailureText());
        Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary.isUserCancelled());
    }

    @Test
    public void when_batchJob_cancelled() {
        Job newJob = this.instance.getJet().newJob(newStreamPipeline(), new JobConfig().setName("jobA"));
        String str = "";
        newJob.cancel();
        try {
            newJob.join();
            Assert.fail("Join should throw CancellationByUserException");
        } catch (CancellationByUserException e) {
            str = e.toString();
        }
        JobAndSqlSummary jobAndSqlSummary = getJobSummaryList().get(0);
        Assert.assertEquals(JobStatus.FAILED, jobAndSqlSummary.getStatus());
        Assert.assertEquals(0L, jobAndSqlSummary.getExecutionId());
        Assert.assertTrue(this.useOldJobSummary || jobAndSqlSummary.isUserCancelled());
        assertContains(jobAndSqlSummary.getFailureText(), str);
    }

    @Test
    public void when_streamingJobLifecycle() {
        Job newJob = this.instance.getJet().newJob(newStreamPipeline(), new JobConfig().setName("jobA"));
        List<JobAndSqlSummary> jobSummaryList = getJobSummaryList();
        Assert.assertEquals(1L, jobSummaryList.size());
        JobAndSqlSummary jobAndSqlSummary = jobSummaryList.get(0);
        Assert.assertFalse(jobAndSqlSummary.isLightJob());
        Assert.assertEquals("jobA", jobAndSqlSummary.getNameOrId());
        Assert.assertEquals(newJob.getId(), jobAndSqlSummary.getJobId());
        assertTrueEventually(() -> {
            JobAndSqlSummary jobAndSqlSummary2 = getJobSummaryList().get(0);
            Assert.assertEquals(JobStatus.RUNNING, jobAndSqlSummary2.getStatus());
            Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary2.isUserCancelled());
        }, 20L);
        newJob.suspend();
        assertTrueEventually(() -> {
            JobAndSqlSummary jobAndSqlSummary2 = getJobSummaryList().get(0);
            Assert.assertEquals(JobStatus.SUSPENDED, jobAndSqlSummary2.getStatus());
            Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary2.isUserCancelled());
        }, 20L);
        newJob.resume();
        assertTrueEventually(() -> {
            JobAndSqlSummary jobAndSqlSummary2 = getJobSummaryList().get(0);
            Assert.assertEquals(JobStatus.RUNNING, jobAndSqlSummary2.getStatus());
            Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary2.isUserCancelled());
        }, 20L);
        newJob.cancel();
        assertTrueEventually(() -> {
            JobAndSqlSummary jobAndSqlSummary2 = getJobSummaryList().get(0);
            Assert.assertEquals(JobStatus.FAILED, jobAndSqlSummary2.getStatus());
            Assert.assertEquals(0L, jobAndSqlSummary2.getExecutionId());
            Assert.assertTrue(this.useOldJobSummary || jobAndSqlSummary2.isUserCancelled());
            assertContains(jobAndSqlSummary2.getFailureText(), new CancellationByUserException().toString());
        }, 20L);
    }

    @Test
    public void when_lightJob() throws InterruptedException {
        Job newLightJob = this.instance.getJet().newLightJob(newStreamPipeline());
        Thread.sleep(200L);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, getJobSummaryList().size());
        });
        JobAndSqlSummary jobAndSqlSummary = getJobSummaryList().get(0);
        Assert.assertTrue(jobAndSqlSummary.isLightJob());
        Assert.assertEquals(Util.idToString(newLightJob.getId()), jobAndSqlSummary.getNameOrId());
        Assert.assertEquals(newLightJob.getId(), jobAndSqlSummary.getJobId());
        Assert.assertEquals(JobStatus.RUNNING, jobAndSqlSummary.getStatus());
        Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary.isUserCancelled());
    }

    @Test
    @Ignore("Flaky due to race condition described in JobCoordinationService.getJobAndSqlSummary(LightMasterContext)")
    public void when_lightJobIsCancelled_then_itIsNotReportedOnList() throws InterruptedException {
        Job newLightJob = this.instance.getJet().newLightJob(newStreamPipeline());
        Thread.sleep(200L);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, getJobSummaryList().size());
        });
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        Objects.requireNonNull(newLightJob);
        newSingleThreadScheduledExecutor.schedule(newLightJob::cancel, 100L, TimeUnit.MILLISECONDS);
        try {
            Assertions.assertThatThrownBy(() -> {
                getJetClientInstanceImpl(this.client).getJob(newLightJob.getId()).join();
            }).isInstanceOf(CancellationException.class);
            Assertions.assertThat(getJobSummaryList()).isEmpty();
            newSingleThreadScheduledExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void when_manyJobs_then_sortedBySubmissionTime() {
        Job newJob;
        int i = 10;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = i2 % 2 == 0;
            Pipeline newStreamPipeline = newStreamPipeline();
            if (z) {
                newJob = this.instances[(i2 % 4) / 2].getJet().newLightJob(newStreamPipeline);
                JobAssertions.assertThat(newJob).eventuallyJobRunning(this.instance, null);
            } else {
                newJob = this.instance.getJet().newJob(newStreamPipeline);
            }
            arrayList.add(newJob);
        }
        assertTrueEventually(() -> {
            ArrayList arrayList2 = new ArrayList(getJobSummaryList());
            Assert.assertEquals(i, arrayList2.size());
            Collections.reverse(arrayList2);
            for (int i3 = 0; i3 < i; i3++) {
                JobAndSqlSummary jobAndSqlSummary = (JobAndSqlSummary) arrayList2.get(i3);
                Assert.assertEquals(Util.idToString(jobAndSqlSummary.getJobId()), jobAndSqlSummary.getNameOrId());
                Assert.assertEquals(JobStatus.RUNNING, jobAndSqlSummary.getStatus());
            }
        }, 20L);
        arrayList.forEach((v0) -> {
            v0.cancel();
        });
        assertTrueEventually(() -> {
            ArrayList arrayList2 = new ArrayList(getJobSummaryList());
            Assert.assertEquals(i / 2, arrayList2.size());
            Collections.reverse(arrayList2);
            for (int i3 = 0; i3 < i; i3++) {
                if (!(i3 % 2 == 0)) {
                    JobAndSqlSummary jobAndSqlSummary = (JobAndSqlSummary) arrayList2.get(i3 / 2);
                    Assert.assertEquals(Util.idToString(jobAndSqlSummary.getJobId()), jobAndSqlSummary.getNameOrId());
                    Assert.assertEquals(JobStatus.FAILED, jobAndSqlSummary.getStatus());
                    Assert.assertNotEquals(0L, jobAndSqlSummary.getCompletionTime());
                }
            }
        }, 20L);
    }

    @Test
    public void when_job_failed() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal("invalid", JournalInitialPosition.START_FROM_OLDEST)).withoutTimestamps().writeTo(Sinks.noop());
        String str = "";
        try {
            this.instance.getJet().newJob(create, new JobConfig().setName("jobA")).join();
        } catch (CancellationException e) {
            Assert.fail("Job should not be cancelled in this test");
        } catch (Exception e2) {
            str = e2.getMessage();
        }
        List<JobAndSqlSummary> jobSummaryList = getJobSummaryList();
        Assert.assertEquals(1L, jobSummaryList.size());
        JobAndSqlSummary jobAndSqlSummary = jobSummaryList.get(0);
        assertContains(new JetException(jobAndSqlSummary.getFailureText()).toString(), str);
        Assert.assertNotEquals(0L, jobAndSqlSummary.getCompletionTime());
        Assert.assertEquals(JobStatus.FAILED, jobAndSqlSummary.getStatus());
        Assert.assertTrue(this.useOldJobSummary || !jobAndSqlSummary.isUserCancelled());
    }

    private Pipeline newStreamPipeline() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(SOURCE_NAME, JournalInitialPosition.START_FROM_OLDEST)).withoutTimestamps().writeTo(Sinks.noop());
        return create;
    }

    private Pipeline newBatchPipeline() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.map(SOURCE_NAME)).writeTo(Sinks.noop());
        return create;
    }

    private JetClientInstanceImpl getJetClientInstanceImpl(HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getJet();
    }

    @Nonnull
    private List<JobAndSqlSummary> getJobSummaryList() {
        return this.useOldJobSummary ? (List) getJetClientInstanceImpl(this.client).getJobSummaryList().stream().map(jobSummary -> {
            return new JobAndSqlSummary(jobSummary.isLightJob(), jobSummary.getJobId(), jobSummary.getExecutionId(), jobSummary.getNameOrId(), jobSummary.getStatus(), jobSummary.getSubmissionTime(), jobSummary.getCompletionTime(), jobSummary.getFailureText(), (SqlSummary) null, (String) null, false);
        }).collect(Collectors.toList()) : getJetClientInstanceImpl(this.client).getJobAndSqlSummaryList();
    }
}
