package com.hazelcast.jet.core.metrics;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.TestInClusterSupport;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_MiscTest.class */
public class JobMetrics_MiscTest extends TestInClusterSupport {

    /* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_MiscTest$BlockingInInitMetaSupplier.class */
    private static class BlockingInInitMetaSupplier implements ProcessorMetaSupplier {
        static CountDownLatch latch;

        private BlockingInInitMetaSupplier() {
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            latch.await();
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> list) {
            return address -> {
                return new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 1);
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1818100338:
                    if (implMethodName.equals("<init>")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                        return TestProcessors.NoOutputSourceP::new;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Before
    public void setup() {
        TestProcessors.reset(2 * parallelism);
    }

    @Test
    public void when_jobMetricsDisabled_then_emptyMetrics() throws Throwable {
        DAG dag = new DAG();
        dag.newVertex("v1", TestProcessors.MockP::new);
        dag.newVertex("v2", TestProcessors.NoOutputSourceP::new);
        Job newJob = hz().getJet().newJob(dag, new JobConfig().setMetricsEnabled(true).setStoreMetricsAfterJobCompletion(false));
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertEmptyJobMetrics(newJob, false);
    }

    @Test
    public void when_jobRunning_then_nonEmptyMetrics() throws Throwable {
        DAG dag = new DAG();
        dag.newVertex("v1", TestProcessors.MockP::new);
        dag.newVertex("v2", TestProcessors.NoOutputSourceP::new);
        Job newJob = hz().getJet().newJob(dag, JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertJobHasMetrics(newJob, true);
        Assert.assertTrue(hz().getMap("__jet.results.metrics").containsKey(Long.valueOf(newJob.getId())));
    }

    @Test
    public void when_jobNotYetRunning_then_emptyMetrics() {
        DAG dag = new DAG();
        BlockingInInitMetaSupplier.latch = new CountDownLatch(1);
        dag.newVertex("v1", new BlockingInInitMetaSupplier());
        Job newJob = hz().getJet().newJob(dag, JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        assertTrueAllTheTime(() -> {
            assertEmptyJobMetrics(newJob, false);
        }, 2L);
        BlockingInInitMetaSupplier.latch.countDown();
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
    }

    @Test
    public void test_duplicateMetricsFromMembers() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", Processors.noopP()), dag.newVertex("v2", Processors.noopP())).distributed());
        Job newJob = hz().getJet().newJob(dag, JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        newJob.join();
        assertJobHasMetrics(newJob, true);
    }

    @Test
    public void when_jobSuspended_andMetricsNotStored_then_onlyPeriodicMetricsReturned() throws Throwable {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", TestProcessors.MockP::new), dag.newVertex("v2", TestProcessors.NoOutputSourceP::new)));
        Job newJob = hz().getJet().newJob(dag, new JobConfig().setMetricsEnabled(true).setStoreMetricsAfterJobCompletion(false));
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        assertTrueEventually(() -> {
            assertEmptyJobMetrics(newJob, false);
        });
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertEmptyJobMetrics(newJob, false);
    }

    @Test
    public void when_jobSuspended_andMetricsStored_then_onlyPeriodicAndFinalMetricsReturned() throws Throwable {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", TestProcessors.MockP::new), dag.newVertex("v2", TestProcessors.NoOutputSourceP::new)));
        Job newJob = hz().getJet().newJob(dag, new JobConfig().setMetricsEnabled(true).setStoreMetricsAfterJobCompletion(true));
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        assertTrueEventually(() -> {
            assertEmptyJobMetrics(newJob, false);
        });
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, true);
        });
    }

    @Test
    public void when_jobRestarted_then_metricsRepopulate() throws Throwable {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", TestProcessors.MockP::new), dag.newVertex("v2", TestProcessors.NoOutputSourceP::new)));
        Job newJob = hz().getJet().newJob(dag, JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.restart();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertJobHasMetrics(newJob, true);
    }

    @Test
    public void when_jobCancelled_then_emptyMetrics() {
        DAG dag = new DAG();
        dag.newVertex("v1", () -> {
            return new TestProcessors.MockP().streaming();
        });
        Job newJob = hz().getJet().newJob(dag, new JobConfig().setMetricsEnabled(true).setStoreMetricsAfterJobCompletion(true));
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertJobHasMetrics(newJob, false);
        });
        newJob.cancel();
        newJob.getClass();
        assertThrows(CancellationException.class, newJob::join);
        assertJobStatusEventually(newJob, JobStatus.FAILED);
        assertTrueEventually(() -> {
            assertEmptyJobMetrics(newJob, true);
        });
    }

    @Test
    public void when_metricsForJobDisabled_then_emptyMetrics() throws Throwable {
        DAG dag = new DAG();
        dag.newVertex("v1", TestProcessors.MockP::new);
        dag.newVertex("v2", TestProcessors.NoOutputSourceP::new);
        Job newJob = hz().getJet().newJob(dag, new JobConfig().setMetricsEnabled(false).setStoreMetricsAfterJobCompletion(true));
        TestProcessors.NoOutputSourceP.executionStarted.await();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueAllTheTime(() -> {
            assertEmptyJobMetrics(newJob, false);
        }, 2L);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        assertJobStatusEventually(newJob, JobStatus.COMPLETED);
        assertEmptyJobMetrics(newJob, true);
    }

    private void assertJobHasMetrics(Job job, boolean z) {
        Assert.assertFalse(job.getMetrics().metrics().isEmpty());
        Assert.assertFalse(job.getMetrics().get("queuesSize").isEmpty());
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(hz().getMap("__jet.results.metrics").containsKey(Long.valueOf(job.getId()))));
    }

    private void assertEmptyJobMetrics(Job job, boolean z) {
        Assert.assertTrue("Should have been empty, but contained: " + job.getMetrics().metrics(), job.getMetrics().metrics().isEmpty());
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(hz().getMap("__jet.results.metrics").containsKey(Long.valueOf(job.getId()))));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1013798204:
                if (implMethodName.equals("lambda$when_jobCancelled_then_emptyMetrics$fb1a34a4$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobMetrics_MiscTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.MockP().streaming();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$MockP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.MockP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
