package com.hazelcast.jet.core.metrics;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ExecutionLifecycleTest;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest.class */
public class JobLifecycleMetricsTest extends JetTestSupport {
    private static final int MEMBER_COUNT = 2;
    private HazelcastInstance[] hzInstances;

    @Before
    public void before() throws Exception {
        TestProcessors.reset(2);
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.setProperty("hazelcast.jmx", "true");
        smallInstanceConfig.getMetricsConfig().setCollectionFrequencySeconds(1);
        this.hzInstances = createHazelcastInstances(smallInstanceConfig, 2);
        Assert.assertEquals(2L, this.hzInstances[0].getCluster().getMembers().size());
    }

    @After
    public void after() {
        TestProcessors.assertNoErrorsInProcessors();
    }

    @Test
    public void multipleJobsSubmittedAndCompleted() {
        Job newJob = this.hzInstances[0].getJet().newJob(batchPipeline());
        newJob.join();
        newJob.cancel();
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 1, 0);
        });
        DAG dag = new DAG();
        AssertionError assertionError = new AssertionError(ExecutionLifecycleTest.MOCK_ERROR_MESSAGE);
        dag.edge(Edge.between(dag.newVertex("source", TestProcessors.ListSource.supplier(List.of(1))), dag.newVertex("faulty", new TestProcessors.MockPMS(() -> {
            return new TestProcessors.MockPS(() -> {
                return new TestProcessors.MockP().initBlocks().setProcessError(() -> {
                    return assertionError;
                });
            }, 2);
        })).localParallelism(1)));
        Job newJob2 = this.hzInstances[0].getJet().newJob(dag);
        for (int i = 0; i < 2; i++) {
            TestProcessors.MockP.unblock();
        }
        try {
            newJob2.join();
            Assert.fail("Expected exception not thrown!");
        } catch (Exception e) {
        }
        assertTrueEventually(() -> {
            assertJobStats(2, 2, 2, 1, 1);
        });
    }

    @Test
    public void jobSuspendedThenResumed() {
        Job newJob = this.hzInstances[0].getJet().newJob(streamingPipeline());
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.RUNNING);
        });
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.SUSPENDED);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 0, 0);
        });
        newJob.resume();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.RUNNING);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 2, 1, 0, 0);
        });
    }

    @Test
    public void jobRestarted() {
        Job newJob = this.hzInstances[0].getJet().newJob(streamingPipeline());
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 0, 0, 0);
        });
        assertTrueAllTheTime(() -> {
            assertJobStats(1, 1, 0, 0, 0);
        }, 1L);
        newJob.restart();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobStats(1, 2, 1, 0, 0);
        });
    }

    @Test
    public void jobCancelled() {
        Job newJob = this.hzInstances[0].getJet().newJob(streamingPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 0, 0, 0);
        });
        assertTrueAllTheTime(() -> {
            assertJobStats(1, 1, 0, 0, 0);
        }, 1L);
        newJob.cancel();
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.FAILED, true);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 0, 1);
        });
    }

    @Test
    public void jobSuspendedThenCancelled() {
        Job newJob = this.hzInstances[0].getJet().newJob(streamingPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.SUSPENDED);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 0, 0);
        });
        newJob.cancel();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.FAILED, true);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 0, 1);
        });
    }

    @Test
    public void jobFailed() {
        Job newJob = this.hzInstances[0].getJet().newJob(failingPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        Objects.requireNonNull(newJob);
        Assertions.assertThatThrownBy(newJob::join).hasRootCauseInstanceOf(ArithmeticException.class);
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.FAILED);
        });
        assertTrueEventually(() -> {
            assertJobStats(1, 1, 1, 0, 1);
        });
    }

    @Test
    public void executionRelatedMetrics() {
        Job newJob = this.hzInstances[0].getJet().newJob(batchPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        newJob.join();
        assertTrueEventually(() -> {
            assertJobStatusMetric(newJob, JobStatus.COMPLETED);
        });
        JobMetricsChecker jobMetricsChecker = new JobMetricsChecker(newJob);
        jobMetricsChecker.assertRandomMetricValueAtLeast("executionStartTime", 1L);
        jobMetricsChecker.assertRandomMetricValueAtLeast("executionCompletionTime", 1L);
    }

    @Test
    public void jobFinishedAllMetricsPresents() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.items(new Integer[]{0, 1, 2, 3, 4, 5})).groupingKey(num -> {
            return num.intValue() % 2 == 0 ? "odds" : "evens";
        }).rollingAggregate(AggregateOperations.counting()).writeTo(Sinks.logger());
        Job newJob = this.hzInstances[0].getJet().newJob(create, createJobConfigWithEnabledMetrics("targetJob"));
        newJob.join();
        JobMetricsChecker jobMetricsChecker = new JobMetricsChecker(newJob);
        List of = List.of((Object[]) new String[]{"coalescedWm", "emittedCount", "executionCompletionTime", "executionStartTime", "lastForwardedWm", "lastForwardedWmLatency", "queuesCapacity", "queuesSize", "receivedBatches", "receivedCount", "snapshotBytes", "snapshotKeys", "status", "userCancelled", "topObservedWm", "distributedBytesIn", "distributedBytesOut", "distributedItemsIn", "distributedItemsOut", "lateEventsDropped"});
        Assert.assertEquals(newJob.getMetrics().metrics().size(), of.size());
        Objects.requireNonNull(jobMetricsChecker);
        of.forEach(jobMetricsChecker::assertMetricExists);
    }

    @Test
    public void jobFinishedReceiveMetricsFromTargetJob() {
        this.hzInstances[0].getJet().newJob(streamingPipeline(), createJobConfigWithEnabledMetrics("otherJob"));
        Job newJob = this.hzInstances[0].getJet().newJob(batchPipeline(), createJobConfigWithEnabledMetrics("targetJob"));
        newJob.join();
        String name = newJob.getName();
        JobMetrics metrics = newJob.getMetrics();
        metrics.metrics().stream().flatMap(str -> {
            return metrics.get(str).stream();
        }).forEach(measurement -> {
            Assert.assertEquals(name, measurement.tag("jobName"));
        });
    }

    private JobConfig createJobConfigWithEnabledMetrics(String str) {
        JobConfig jobConfig = new JobConfig();
        jobConfig.setMetricsEnabled(true);
        jobConfig.setStoreMetricsAfterJobCompletion(true);
        jobConfig.setName(str);
        return jobConfig;
    }

    private Pipeline batchPipeline() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.items(new Integer[]{1, 2, 3})).writeTo(Sinks.logger());
        return create;
    }

    private Pipeline failingPipeline() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.items(new Integer[]{1, 2, 3})).map(num -> {
            return Integer.valueOf(5 / (num.intValue() - 2));
        }).writeTo(Sinks.logger());
        return create;
    }

    private DAG streamingPipeline() {
        DAG dag = new DAG();
        dag.newVertex("v", () -> {
            return new TestProcessors.MockP().streaming();
        });
        return dag;
    }

    private void assertJobStats(int i, int i2, int i3, int i4, int i5) {
        assertJobStatsOnMember(this.hzInstances[0], i, i2, i3, i4, i5);
        for (int i6 = 1; i6 < this.hzInstances.length; i6++) {
            assertJobStatsOnMember(this.hzInstances[i6], 0, i2, i3, 0, 0);
        }
    }

    private void assertJobStatsOnMember(HazelcastInstance hazelcastInstance, int i, int i2, int i3, int i4, int i5) {
        try {
            JmxMetricsChecker forInstance = JmxMetricsChecker.forInstance(hazelcastInstance);
            forInstance.assertMetricValue("jobs.submitted", i);
            forInstance.assertMetricValue("jobs.executionStarted", i2);
            forInstance.assertMetricValue("jobs.executionCompleted", i3);
            forInstance.assertMetricValue("jobs.completedSuccessfully", i4);
            forInstance.assertMetricValue("jobs.completedWithFailure", i5);
        } catch (Exception e) {
            throw new AssertionError(e.getMessage(), e);
        }
    }

    private void assertJobStatusMetric(Job job, JobStatus jobStatus) {
        assertJobStatusMetric(job, jobStatus, false);
    }

    private void assertJobStatusMetric(Job job, JobStatus jobStatus, boolean z) {
        try {
            JobMetrics metrics = job.getMetrics();
            List list = metrics.get("status");
            Assert.assertEquals(jobStatus, JobStatus.getById((int) ((Measurement) list.get(list.size() - 1)).value()));
            List list2 = metrics.get("userCancelled");
            Assert.assertEquals(z ? 1L : 0L, ((Measurement) list2.get(list2.size() - 1)).value());
            if (!jobStatus.isTerminal()) {
                Assert.assertEquals(jobStatus, JobStatus.getById((int) JmxMetricsChecker.forJob(this.hzInstances[0], job).getMetricValue("status")));
            }
        } catch (Exception e) {
            throw new AssertionError(e.getMessage(), e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1580359637:
                if (implMethodName.equals("lambda$multipleJobsSubmittedAndCompleted$f1f5110$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1580359636:
                if (implMethodName.equals("lambda$multipleJobsSubmittedAndCompleted$f1f5110$2")) {
                    z = false;
                    break;
                }
                break;
            case 292847053:
                if (implMethodName.equals("lambda$multipleJobsSubmittedAndCompleted$867c72be$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1614324144:
                if (implMethodName.equals("lambda$failingPipeline$536308a$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1666509525:
                if (implMethodName.equals("lambda$streamingPipeline$d0879c0f$1")) {
                    z = true;
                    break;
                }
                break;
            case 1838226365:
                if (implMethodName.equals("lambda$jobFinishedAllMetricsPresents$a441ef18$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Lcom/hazelcast/jet/core/Processor;")) {
                    Throwable th = (Throwable) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return new TestProcessors.MockP().initBlocks().setProcessError(() -> {
                            return th;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.MockP().streaming();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Ljava/lang/Throwable;")) {
                    Throwable th2 = (Throwable) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return th2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(5 / (num.intValue() - 2));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Lcom/hazelcast/jet/core/ProcessorSupplier;")) {
                    Throwable th3 = (Throwable) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return new TestProcessors.MockPS(() -> {
                            return new TestProcessors.MockP().initBlocks().setProcessError(() -> {
                                return th3;
                            });
                        }, 2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobLifecycleMetricsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num2 -> {
                        return num2.intValue() % 2 == 0 ? "odds" : "evens";
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
