package com.hazelcast.jet.impl.deployment;

import childfirstclassloader.TestProcessor;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.JarUtil;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.starter.HazelcastAPIDelegatingClassloader;
import com.hazelcast.test.starter.HazelcastStarter;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.example.jet.impl.deployment.ResourceCollector;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import uk.org.webcompere.systemstubs.rules.SystemPropertiesRule;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/deployment/ProcessorClassLoaderTest.class */
public class ProcessorClassLoaderTest extends JetTestSupport {

    @Rule
    public final SystemPropertiesRule systemProperties = new SystemPropertiesRule(ClusterProperty.PROCESSOR_CUSTOM_LIB_DIR.getName(), tempDir.getRoot().getAbsolutePath(), new String[0]);
    private static final String SOURCE_NAME = "test-source";
    private HazelcastInstance member;
    private HazelcastInstance client;
    private JetService jet;
    private static File jarFile;
    private static File resourcesJarFile;

    @ClassRule
    public static final TemporaryFolder tempDir = new TemporaryFolder();
    private static final ILogger LOGGER = Logger.getLogger(ProcessorClassLoaderTest.class);

    @BeforeClass
    public static void beforeClass() throws Exception {
        jarFile = tempDir.newFile("source_" + System.currentTimeMillis() + ".jar");
        JarUtil.createJarFile("target/test-classes/", (List<String>) Stream.of((Object[]) new Class[]{TestProcessor.ResourceReader.class, TestProcessor.TestProcessorMetaSupplier.class, TestProcessor.TestProcessorSupplier.class, TestProcessor.class, SourceWithClassLoader.class}).map(ReflectionUtils::toClassResourceId).collect(Collectors.toList()), jarFile.getAbsolutePath());
        LOGGER.fine("Jar file path: %s", jarFile);
        resourcesJarFile = tempDir.newFile("resources_ " + System.currentTimeMillis() + ".jar");
        JarUtil.createResourcesJarFile(resourcesJarFile);
        Assertions.assertThat(tempDir.getRoot()).exists().isReadable();
    }

    @Before
    public void setUp() throws Exception {
        ResourceCollector.items().clear();
        this.member = createHazelcastMember();
        this.client = HazelcastClient.newHazelcastClient();
        this.jet = this.client.getJet();
        Assertions.assertThat(resourcesJarFile).exists().isReadable();
        Assertions.assertThat(jarFile).exists().isReadable();
    }

    @After
    public void tearDown() throws Exception {
        if (this.client != null) {
            this.client.shutdown();
        }
        if (this.member != null) {
            this.member.shutdown();
        }
    }

    private HazelcastInstance createHazelcastMember() throws MalformedURLException {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getJetConfig().setResourceUploadEnabled(true);
        return HazelcastStarter.newHazelcastInstance(smallInstanceConfig, new HazelcastAPIDelegatingClassloader(new URL[]{new File("target/classes/").toURI().toURL(), new File("../hazelcast-tpc-engine/target/classes/").toURI().toURL()}, getClass().getClassLoader()));
    }

    @Test
    public void testClassLoaderForBatchSource() {
        Pipeline create = Pipeline.create();
        BatchSource<String> batchSource = SourceWithClassLoader.batchSource(SOURCE_NAME);
        create.readFrom(batchSource).setLocalParallelism(1).writeTo(Sinks.list("test"));
        JobConfig jobConfig = new JobConfig();
        jobConfig.addCustomClasspath(batchSource.name(), resourcesJarFile.getName());
        jobConfig.addCustomClasspath(batchSource.name(), jarFile.getName());
        this.jet.newJob(create, jobConfig).join();
        Assertions.assertThat(this.member.getList("test")).contains(new Object[]{"resource in jar"});
        Assertions.assertThat(ResourceCollector.items()).containsExactly(new String[]{"Processor init resource in jar", "Processor complete resource in jar"});
    }

    @Test
    public void testClassLoaderForStreamSource() {
        Pipeline create = Pipeline.create();
        StreamSource<String> streamSource = SourceWithClassLoader.streamSource(SOURCE_NAME);
        create.readFrom(streamSource).withoutTimestamps().setLocalParallelism(1).writeTo(Sinks.list("test"));
        JobConfig jobConfig = new JobConfig();
        jobConfig.addCustomClasspath(streamSource.name(), resourcesJarFile.getName());
        jobConfig.addCustomClasspath(streamSource.name(), jarFile.getName());
        this.jet.newJob(create, jobConfig).join();
        Assertions.assertThat(this.member.getList("test")).contains(new Object[]{"resource in jar"});
        Assertions.assertThat(ResourceCollector.items()).containsExactly(new String[]{"Processor init resource in jar", "Processor complete resource in jar"});
    }

    @Test
    public void testClassLoaderSetForSupplierDAG() {
        DAG dag = new DAG();
        dag.newVertex(SOURCE_NAME, TestProcessor.TestProcessorMetaSupplier.create()).localParallelism(1);
        JobConfig jobConfig = new JobConfig();
        jobConfig.addCustomClasspath(SOURCE_NAME, resourcesJarFile.getName());
        jobConfig.addCustomClasspath(SOURCE_NAME, jarFile.getName());
        this.jet.newJob(dag, jobConfig).join();
        Assertions.assertThat(ResourceCollector.items()).containsExactly(new String[]{"ProcessorMetaSupplier init resource in jar", "ProcessorMetaSupplier get resource in jar", "ProcessorMetaSupplier create resource in jar", "ProcessorSupplier init resource in jar", "ProcessorSupplier get resource in jar", "Processor init resource in jar", "Processor complete resource in jar", "ProcessorSupplier close resource in jar", "ProcessorMetaSupplier close resource in jar"});
    }
}
