package com.hazelcast.jet.core.metrics;

import com.hazelcast.collection.IList;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.TestInClusterSupport;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.map.IMap;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StreamTest.class */
public class JobMetrics_StreamTest extends TestInClusterSupport {
    private static final String NOT_FILTER_OUT_PREFIX = "ok";
    private static final String FILTER_OUT_PREFIX = "nok";
    private static final String FLAT_MAP_AND_FILTER_VERTEX = "fused(map, filter)";
    private static String journalMapName;
    private static String sinkListName;

    @Before
    public void before() {
        journalMapName = "journaledMap." + randomString();
        sinkListName = "sinkList" + randomString();
    }

    @Test
    public void when_jobRunning_then_metricsEventuallyExist() {
        IMap map = hz().getMap(journalMapName);
        putIntoMap(map, 2, 1);
        IList list = hz().getList(sinkListName);
        Job newJob = hz().getJet().newJob(createPipeline());
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 3, 1);
        });
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(3L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 5, 2);
        });
    }

    @Test
    public void when_suspendAndResume_then_metricsReset() {
        IMap map = hz().getMap(journalMapName);
        putIntoMap(map, 2, 1);
        IList list = hz().getList(sinkListName);
        Job newJob = hz().getJet().newJob(createPipeline(), new JobConfig().setStoreMetricsAfterJobCompletion(true).setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE));
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(3L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 5, 2);
        });
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        Assert.assertTrue(newJob.getMetrics().metrics().isEmpty());
        putIntoMap(map, 1, 1);
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            Assert.assertEquals(4L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 2, 1);
        });
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(5L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 4, 2);
        });
    }

    @Test
    public void when_jobRestarted_then_metricsReset() {
        IMap map = hz().getMap(journalMapName);
        putIntoMap(map, 2, 1);
        IList list = hz().getList(sinkListName);
        Job newJob = hz().getJet().newJob(createPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 3, 1);
        });
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(3L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 5, 2);
        });
        newJob.restart();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            Assert.assertEquals(6L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 5, 2);
        });
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(7L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 7, 3);
        });
    }

    @Test
    public void when_jobRestarted_then_metricsReset_withJournal() {
        IMap map = hz().getMap(journalMapName);
        IList list = hz().getList(sinkListName);
        Job newJob = hz().getJet().newJob(createPipeline(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 0, 0);
        });
        putIntoMap(map, 2, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 3, 1);
        });
        newJob.restart();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueEventually(() -> {
            Assert.assertEquals(4L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 3, 1);
        });
        putIntoMap(map, 1, 1);
        assertTrueEventually(() -> {
            Assert.assertEquals(5L, list.size());
        });
        assertTrueEventually(() -> {
            assertMetrics(newJob.getMetrics(), 5, 2);
        });
    }

    private Pipeline createPipeline() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(journalMapName, JournalInitialPosition.START_FROM_OLDEST)).withIngestionTimestamps().map((v0) -> {
            return v0.getKey();
        }).filter(str -> {
            return !str.startsWith(FILTER_OUT_PREFIX);
        }).writeTo(Sinks.list(sinkListName));
        return create;
    }

    private void putIntoMap(Map<String, String> map, int i, int i2) {
        for (int i3 = 0; i3 < i; i3++) {
            map.put(NOT_FILTER_OUT_PREFIX + randomString(), "whateverHere");
        }
        for (int i4 = 0; i4 < i2; i4++) {
            map.put(FILTER_OUT_PREFIX + randomString(), "whateverHere");
        }
    }

    private void assertMetrics(JobMetrics jobMetrics, int i, int i2) {
        Assert.assertNotNull(jobMetrics);
        Assert.assertEquals(i, sumValueFor(jobMetrics, "mapJournalSource(" + journalMapName + ")", "emittedCount"));
        Assert.assertEquals(i, sumValueFor(jobMetrics, FLAT_MAP_AND_FILTER_VERTEX, "receivedCount"));
        Assert.assertEquals(i - i2, sumValueFor(jobMetrics, FLAT_MAP_AND_FILTER_VERTEX, "emittedCount"));
        Assert.assertEquals(i - i2, sumValueFor(jobMetrics, "listSink(" + sinkListName + ")", "receivedCount"));
    }

    private long sumValueFor(JobMetrics jobMetrics, String str, String str2) {
        return jobMetrics.filter(MeasurementPredicates.tagValueEquals("vertex", str).and(MeasurementPredicates.tagValueEquals("ordinal", "snapshot").negate())).get(str2).stream().mapToLong((v0) -> {
            return v0.value();
        }).sum();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1531633272:
                if (implMethodName.equals("lambda$createPipeline$61eded81$1")) {
                    z = true;
                    break;
                }
                break;
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobMetrics_StreamTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Z")) {
                    return str -> {
                        return !str.startsWith(FILTER_OUT_PREFIX);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
