package com.hazelcast.jet.core.metrics;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StressTest.class */
public class JobMetrics_StressTest extends JetTestSupport {
    private static final int RESTART_COUNT = 10;
    private static final int TOTAL_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final int WAIT_FOR_METRICS_COLLECTION_TIME = 1200;
    private static volatile Throwable restartThreadException;
    private static volatile Throwable obtainMetricsThreadException;
    private HazelcastInstance instance;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StressTest$IncrementingProcessor.class */
    public static final class IncrementingProcessor extends AbstractProcessor {

        @Probe(name = "initCount")
        static final AtomicInteger initCount = new AtomicInteger();

        @Probe(name = "completeCount")
        static final AtomicInteger completeCount = new AtomicInteger();

        private IncrementingProcessor() {
        }

        protected void init(@Nonnull Processor.Context context) throws Exception {
            initCount.incrementAndGet();
        }

        public boolean complete() {
            completeCount.incrementAndGet();
            return false;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StressTest$JobRestartThread.class */
    private static class JobRestartThread implements Runnable {
        private final Job job;
        private int restartCount;

        JobRestartThread(Job job) {
            this.job = job;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.restartCount < 10) {
                try {
                    this.job.restart();
                    HazelcastTestSupport.sleepMillis(JobMetrics_StressTest.WAIT_FOR_METRICS_COLLECTION_TIME);
                    this.restartCount++;
                    HazelcastTestSupport.assertTrueEventually(() -> {
                        Assert.assertEquals(JobStatus.RUNNING, this.job.getStatus());
                    });
                } catch (Throwable th) {
                    Throwable unused = JobMetrics_StressTest.restartThreadException = th;
                    throw th;
                }
            }
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StressTest$JobSuspendResumeThread.class */
    private static class JobSuspendResumeThread implements Runnable {
        private final Job job;
        private int restartCount;

        JobSuspendResumeThread(Job job) {
            this.job = job;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.restartCount < 10) {
                try {
                    this.job.suspend();
                    HazelcastTestSupport.assertTrueEventually(() -> {
                        Assert.assertEquals(JobStatus.SUSPENDED, this.job.getStatus());
                    });
                    HazelcastTestSupport.sleepMillis(JobMetrics_StressTest.WAIT_FOR_METRICS_COLLECTION_TIME);
                    this.job.resume();
                    this.restartCount++;
                    HazelcastTestSupport.assertTrueEventually(() -> {
                        Assert.assertEquals(JobStatus.RUNNING, this.job.getStatus());
                    });
                } catch (Throwable th) {
                    Throwable unused = JobMetrics_StressTest.restartThreadException = th;
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_StressTest$ObtainMetricsThread.class */
    public static class ObtainMetricsThread implements Runnable {
        volatile boolean stop;
        private final Job job;

        ObtainMetricsThread(Job job) {
            this.job = job;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            long j2 = 0;
            while (!this.stop) {
                try {
                    Assert.assertNotNull(this.job.getMetrics());
                    List list = this.job.getMetrics().get("initCount");
                    if (list.size() == JobMetrics_StressTest.TOTAL_PROCESSORS) {
                        long sum = list.stream().mapToLong((v0) -> {
                            return v0.value();
                        }).sum();
                        Assert.assertTrue("Metrics value should be increasing, current: " + sum + ", previous: " + j, sum >= j);
                        j = sum;
                        List list2 = this.job.getMetrics().get("completeCount");
                        if (list2.size() == JobMetrics_StressTest.TOTAL_PROCESSORS) {
                            long sum2 = list2.stream().mapToLong((v0) -> {
                                return v0.value();
                            }).sum();
                            Assert.assertTrue("Metrics value should be increasing, current: " + sum2 + ", previous: " + j2, sum2 >= j2);
                            j2 = sum2;
                        }
                    }
                } catch (Throwable th) {
                    Throwable unused = JobMetrics_StressTest.obtainMetricsThreadException = th;
                    throw th;
                }
            }
            HazelcastTestSupport.assertTrueEventually(() -> {
                Assert.assertNotNull(this.job.getMetrics());
                List list3 = this.job.getMetrics().get("initCount");
                Assert.assertEquals(JobMetrics_StressTest.TOTAL_PROCESSORS, list3.size());
                Assert.assertEquals(11 * JobMetrics_StressTest.TOTAL_PROCESSORS * JobMetrics_StressTest.TOTAL_PROCESSORS, list3.stream().mapToLong((v0) -> {
                    return v0.value();
                }).sum());
            }, 3L);
        }
    }

    @Before
    public void setup() {
        restartThreadException = null;
        obtainMetricsThreadException = null;
        IncrementingProcessor.initCount.set(0);
        IncrementingProcessor.completeCount.set(0);
        Config defaultInstanceConfigWithJetEnabled = defaultInstanceConfigWithJetEnabled();
        defaultInstanceConfigWithJetEnabled.getMetricsConfig().setCollectionFrequencySeconds(1);
        this.instance = createHazelcastInstance(defaultInstanceConfigWithJetEnabled);
    }

    @Test
    public void restart_stressTest() throws Throwable {
        stressTest(JobRestartThread::new);
    }

    @Test
    public void suspend_resume_stressTest() throws Throwable {
        stressTest(JobSuspendResumeThread::new);
    }

    private void stressTest(Function<Job, Runnable> function) throws Throwable {
        Job newJob = this.instance.getJet().newJob(buildDag(), JobMetrics_BatchTest.JOB_CONFIG_WITH_METRICS);
        try {
            assertTrueEventually(() -> {
                Assert.assertEquals(JobStatus.RUNNING, newJob.getStatus());
            });
            ObtainMetricsThread obtainMetricsThread = new ObtainMetricsThread(newJob);
            Thread thread = new Thread(function.apply(newJob));
            Thread thread2 = new Thread(obtainMetricsThread);
            thread.start();
            thread2.start();
            thread.join();
            obtainMetricsThread.stop = true;
            thread2.join();
            if (restartThreadException != null) {
                throw restartThreadException;
            }
            if (obtainMetricsThreadException != null) {
                throw obtainMetricsThreadException;
            }
            ditchJob(newJob, this.instance);
        } catch (Throwable th) {
            ditchJob(newJob, this.instance);
            throw th;
        }
    }

    private DAG buildDag() {
        DAG dag = new DAG();
        dag.newVertex("p", () -> {
            return new IncrementingProcessor();
        });
        return dag;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2141561507:
                if (implMethodName.equals("lambda$buildDag$d0879c0f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobMetrics_StressTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new IncrementingProcessor();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
