package com.hazelcast.jet.impl;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/JobRepositoryTest.class */
public class JobRepositoryTest extends JetTestSupport {
    private static final long RESOURCES_EXPIRATION_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1);
    private static final int MAX_JOB_RESULTS_COUNT = 2;
    private final JobConfig jobConfig = new JobConfig();
    private HazelcastInstance instance;
    private JobRepository jobRepository;

    /* loaded from: input_file:com/hazelcast/jet/impl/JobRepositoryTest$DummyClass.class */
    private static class DummyClass {
        private DummyClass() {
        }
    }

    @Before
    public void setup() {
        Config config = new Config();
        config.setProperty(ClusterProperty.JOB_RESULTS_MAX_SIZE.getName(), Integer.toString(2));
        config.getJetConfig().setEnabled(true);
        this.instance = createHazelcastInstance(config);
        this.jobRepository = new JobRepository(this.instance);
        this.jobRepository.setResourcesExpirationMillis(RESOURCES_EXPIRATION_TIME_MILLIS);
        TestProcessors.reset(2);
    }

    @Test
    public void when_jobIsRunning_then_expiredJobIsNotCleanedUp() {
        long uploadResourcesForNewJob = uploadResourcesForNewJob();
        this.jobRepository.putNewJobRecord(createJobRecord(uploadResourcesForNewJob, createDagData()));
        sleepUntilJobExpires();
        cleanup();
        Assert.assertNotNull(this.jobRepository.getJobRecord(uploadResourcesForNewJob));
        Assert.assertFalse("job repository should not be empty", this.jobRepository.getJobResources(uploadResourcesForNewJob).isEmpty());
    }

    @Test
    public void when_jobRecordIsPresentForExpiredJob_then_jobIsNotCleanedUp() {
        long uploadResourcesForNewJob = uploadResourcesForNewJob();
        this.jobRepository.putNewJobRecord(createJobRecord(uploadResourcesForNewJob, createDagData()));
        sleepUntilJobExpires();
        cleanup();
        Assert.assertNotNull(this.jobRepository.getJobRecord(uploadResourcesForNewJob));
        Assert.assertFalse(this.jobRepository.getJobResources(uploadResourcesForNewJob).isEmpty());
    }

    @Test
    public void when_onlyJobResourcesExist_then_jobResourcesClearedAfterExpiration() {
        Assert.assertNotNull(this.instance.getMap("__jet.records"));
        long uploadResourcesForNewJob = uploadResourcesForNewJob();
        sleepUntilJobExpires();
        cleanup();
        Assert.assertTrue(this.jobRepository.getJobResources(uploadResourcesForNewJob).isEmpty());
    }

    @Test
    public void when_jobJarUploadFails_then_jobResourcesCleanedUp() throws Exception {
        this.jobConfig.addJar(new URL("http://site/nonexistent"));
        testResourceCleanup();
    }

    @Test
    public void when_jobZipUploadFails_then_jobResourcesCleanedUp() throws Exception {
        this.jobConfig.addJarsInZip(new URL("http://site/nonexistent"));
        testResourceCleanup();
    }

    @Test
    public void when_jobClasspathResourceUploadFails_then_jobResourcesCleanedUp() throws Exception {
        this.jobConfig.addClasspathResource(new URL("http://site/nonexistent"));
        testResourceCleanup();
    }

    @Test
    public void when_jobFileUploadFails_then_jobResourcesCleanedUp() throws Exception {
        this.jobConfig.attachFile(new URL("http://site/nonexistent"));
        testResourceCleanup();
    }

    @Test
    public void when_jobDirectoryUploadFails_then_jobResourcesCleanedUp() throws Exception {
        File createTempDirectory = createTempDirectory();
        try {
            this.jobConfig.attachDirectory(createTempDirectory);
            testResourceCleanup();
        } finally {
            delete(createTempDirectory);
        }
    }

    private void testResourceCleanup() {
        try {
            this.jobRepository.uploadJobResources(this.jobRepository.newJobId(), this.jobConfig);
            Assert.fail();
        } catch (JetException e) {
            Assert.assertTrue(this.instance.getDistributedObjects().stream().noneMatch(distributedObject -> {
                return distributedObject.getName().startsWith("__jet.resources.");
            }));
        }
    }

    private void delete(File file) {
        Assert.assertTrue("Couldn't delete " + file, file.delete());
    }

    @Test
    public void test_getJobRecordFromClient() {
        HazelcastInstance createHazelcastClient = createHazelcastClient();
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.streamFromProcessor("source", ProcessorMetaSupplier.of(() -> {
            return new TestProcessors.NoOutputSourceP();
        }))).withoutTimestamps().writeTo(Sinks.logger());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(100L));
        JobRepository jobRepository = new JobRepository(createHazelcastClient);
        assertTrueEventually(() -> {
            Assert.assertNotNull(jobRepository.getJobRecord(newJob.getId()));
        });
        createHazelcastClient.shutdown();
    }

    @Test
    public void test_maxNumberOfJobResults() {
        DAG dag = new DAG();
        dag.newVertex("v", Processors.noopP());
        for (int i = 0; i < 3; i++) {
            this.instance.getJet().newJob(dag).join();
        }
        this.jobRepository.cleanup(getNodeEngineImpl(this.instance));
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, this.jobRepository.getJobResults().size());
        });
    }

    private void cleanup() {
        this.jobRepository.cleanup(getNodeEngineImpl(this.instance));
    }

    private long uploadResourcesForNewJob() {
        this.jobConfig.addClass(new Class[]{DummyClass.class});
        long newJobId = this.jobRepository.newJobId();
        this.jobRepository.uploadJobResources(newJobId, this.jobConfig);
        return newJobId;
    }

    private Data createDagData() {
        DAG dag = new DAG();
        dag.newVertex("v", () -> {
            return new TestProcessors.MockP().streaming();
        });
        return getNodeEngineImpl(this.instance).toData(dag);
    }

    private JobRecord createJobRecord(long j, Data data) {
        return new JobRecord(this.instance.getCluster().getLocalMember().getVersion().asVersion(), j, data, "", this.jobConfig, Collections.emptySet(), (Subject) null);
    }

    private void sleepUntilJobExpires() {
        sleepAtLeastMillis(2 * RESOURCES_EXPIRATION_TIME_MILLIS);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1009743027:
                if (implMethodName.equals("lambda$createDagData$84c4358$1")) {
                    z = true;
                    break;
                }
                break;
            case 1217409748:
                if (implMethodName.equals("lambda$test_getJobRecordFromClient$fb1a34a4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/JobRepositoryTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/JobRepositoryTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.MockP().streaming();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
