package com.hazelcast.jet.core.metrics;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/PartialJobMetricsCollectionTest.class */
public class PartialJobMetricsCollectionTest extends JetTestSupport {

    @Parameterized.Parameter
    public boolean isStoreMetricsEnabled;
    private static final int MEMBER_COUNT = 2;
    private HazelcastInstance instance;

    @Parameterized.Parameters(name = "isStoreMetricsEnabled={0}")
    public static Collection<Object> parameters() {
        return Arrays.asList(true, false);
    }

    @Before
    public void before() throws Exception {
        TestProcessors.reset(2);
        this.instance = createHazelcastInstances(smallInstanceConfig(), 2)[0];
        Assert.assertEquals(2L, r0[0].getCluster().getMembers().size());
    }

    @Test
    public void when_jobRunning_then_metricsEventuallyExist() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.itemStream(1)).withoutTimestamps().writeTo(Sinks.logger());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setStoreMetricsAfterJobCompletion(this.isStoreMetricsEnabled));
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        awaitOnlyOneJobIsRunning(newJob);
        assertExecutionMetrics(newJob);
        newJob.cancel();
    }

    @Test
    public void when_suspendAndResume_then_metricsReset() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.itemStream(1)).withoutTimestamps().writeTo(Sinks.logger());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setStoreMetricsAfterJobCompletion(this.isStoreMetricsEnabled));
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        awaitOnlyOneJobIsRunning(newJob);
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        assertTrueEventually(() -> {
            Assert.assertFalse(newJob.getMetrics().containsTag("exec"));
        });
        newJob.resume();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertExecutionMetrics(newJob);
        newJob.cancel();
    }

    @Test
    public void when_jobRestarted_then_metricsReset() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.itemStream(1)).withoutTimestamps().writeTo(Sinks.logger());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setStoreMetricsAfterJobCompletion(this.isStoreMetricsEnabled));
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        awaitOnlyOneJobIsRunning(newJob);
        long value = ((Measurement) newJob.getMetrics().get("executionCompletionTime").stream().findFirst().orElseThrow()).value();
        newJob.restart();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            List list = newJob.getMetrics().get("executionCompletionTime");
            Assertions.assertThat(list).satisfiesOnlyOnce(measurement -> {
                Assertions.assertThat(measurement.value()).isNegative();
            });
            if (this.isStoreMetricsEnabled) {
                Assertions.assertThat(list).satisfiesOnlyOnce(measurement2 -> {
                    Assertions.assertThat(measurement2.value()).isGreaterThan(value);
                });
            }
        });
        newJob.cancel();
    }

    @Test
    public void when_jobCanceled_then_terminatedJobMetricsReturn() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.itemStream(1)).withoutTimestamps().writeTo(Sinks.logger());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setStoreMetricsAfterJobCompletion(this.isStoreMetricsEnabled));
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        newJob.cancel();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
        JobMetrics metrics = newJob.getMetrics();
        if (!this.isStoreMetricsEnabled) {
            Assertions.assertThat(metrics.metrics()).isEmpty();
            return;
        }
        List list = metrics.get("executionCompletionTime");
        Assertions.assertThat(list).hasSize(1);
        Assertions.assertThat(list).satisfiesOnlyOnce(measurement -> {
            Assertions.assertThat(measurement.value()).isPositive();
        });
    }

    private void awaitOnlyOneJobIsRunning(Job job) {
        assertTrueEventually(() -> {
            Assertions.assertThat(job.getMetrics().get("executionCompletionTime")).satisfiesOnlyOnce(measurement -> {
                Assertions.assertThat(measurement.value()).isNegative();
            });
        });
    }

    private void assertExecutionMetrics(Job job) {
        assertTrueEventually(() -> {
            List list = job.getMetrics().get("executionCompletionTime");
            Assertions.assertThat(list).hasSize(this.isStoreMetricsEnabled ? 2 : 1);
            Assertions.assertThat(list).satisfiesOnlyOnce(measurement -> {
                Assertions.assertThat(measurement.value()).isNegative();
            });
            if (this.isStoreMetricsEnabled) {
                Assertions.assertThat(list).satisfiesOnlyOnce(measurement2 -> {
                    Assertions.assertThat(measurement2.value()).isPositive();
                });
            }
        });
    }
}
