package com.hazelcast.jet.impl.connector;

import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
@Ignore
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamFilesP_integrationTest.class */
public class StreamFilesP_integrationTest extends JetTestSupport {
    private HazelcastInstance instance;
    private File directory;
    private IList<Map.Entry<String, String>> list;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamFilesP_integrationTest$RandomWriter.class */
    private static class RandomWriter implements Runnable {
        private final File file;
        private final int numLines;
        private final String prefix;
        private final Random random = new Random();

        RandomWriter(File file, int i, String str) {
            this.file = file;
            this.numLines = i;
            this.prefix = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                FileOutputStream fileOutputStream = new FileOutputStream(this.file);
                try {
                    OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8);
                    try {
                        BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);
                        for (int i = 0; i < this.numLines; i++) {
                            try {
                                bufferedWriter.write(this.prefix + " " + i + " Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua\n");
                                bufferedWriter.flush();
                                outputStreamWriter.flush();
                                fileOutputStream.flush();
                                if (this.random.nextInt(100) < 5) {
                                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
                                }
                            } catch (Throwable th) {
                                try {
                                    bufferedWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                                throw th;
                            }
                        }
                        bufferedWriter.close();
                        outputStreamWriter.close();
                        fileOutputStream.close();
                    } catch (Throwable th3) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                        throw th3;
                    }
                } finally {
                }
            } catch (IOException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
    }

    @Before
    public void setup() throws Exception {
        this.instance = createHazelcastInstance();
        this.directory = createTempDirectory();
        this.list = this.instance.getList("writer");
    }

    @Test
    public void when_appendingToPreexisting_then_pickupNewLines() throws Exception {
        DAG buildDag = buildDag();
        File createNewFile = createNewFile();
        appendToFile(createNewFile, "hello", "pre-existing");
        sleepAtLeastMillis(50L);
        CompletableFuture future = this.instance.getJet().newJob(buildDag).getFuture();
        sleepAtLeastSeconds(2L);
        Assert.assertEquals(0L, this.list.size());
        appendToFile(createNewFile, "third line");
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, this.list.size());
        });
        finishDirectory(future, createNewFile);
    }

    @Test
    public void when_appendingToPreexistingIncompleteLine_then_pickupCompleteLines() throws Exception {
        DAG buildDag = buildDag();
        File createNewFile = createNewFile();
        PrintWriter printWriter = new PrintWriter(new FileOutputStream(createNewFile, true));
        try {
            printWriter.write("hello");
            printWriter.close();
            sleepAtLeastMillis(50L);
            CompletableFuture future = this.instance.getJet().newJob(buildDag).getFuture();
            sleepAtLeastSeconds(2L);
            Assert.assertEquals(0L, this.list.size());
            appendToFile(createNewFile, "world", "second line");
            assertTrueEventually(() -> {
                Assert.assertEquals(1L, this.list.size());
            });
            Assert.assertEquals(Util.entry(createNewFile.getName(), "second line"), this.list.get(0));
            finishDirectory(future, createNewFile);
        } catch (Throwable th) {
            try {
                printWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void when_withCrlf_then_pickupCompleteLines() throws Exception {
        DAG buildDag = buildDag();
        File createNewFile = createNewFile();
        PrintWriter printWriter = new PrintWriter(new FileOutputStream(createNewFile, true));
        try {
            printWriter.write("hello world\r");
            printWriter.close();
            sleepAtLeastMillis(50L);
            CompletableFuture future = this.instance.getJet().newJob(buildDag).getFuture();
            sleepAtLeastSeconds(2L);
            Assert.assertEquals(0L, this.list.size());
            printWriter = new PrintWriter(new FileOutputStream(createNewFile, true));
            try {
                printWriter.write("\nsecond line\r\n");
                printWriter.close();
                assertTrueEventually(() -> {
                    Assert.assertEquals(1L, this.list.size());
                });
                Assert.assertEquals(Util.entry(createNewFile.getName(), "second line"), this.list.get(0));
                finishDirectory(future, createNewFile);
            } finally {
            }
        } finally {
        }
    }

    @Test
    @Ignore
    public void when_newAndModified_then_pickupAddition() throws Exception {
        CompletableFuture future = this.instance.getJet().newJob(buildDag()).getFuture();
        sleepAtLeastSeconds(2L);
        Assert.assertEquals(0L, this.list.size());
        File file = new File(this.directory, randomName());
        appendToFile(file, "hello", "world");
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, this.list.size());
        });
        appendToFile(file, "third line");
        assertTrueEventually(() -> {
            Assert.assertEquals(3L, this.list.size());
        });
        finishDirectory(future, file);
    }

    @Test
    public void when_fileWithManyLines_then_emitCooperatively() throws Exception {
        CompletableFuture future = this.instance.getJet().newJob(buildDag()).getFuture();
        sleepAtLeastSeconds(2L);
        Assert.assertEquals(0L, this.list.size());
        File file = new File(this.directory, "subdir");
        Assert.assertTrue(file.mkdir());
        File file2 = new File(file, randomName());
        appendToFile(file2, (String[]) IntStream.range(0, 5000).mapToObj(String::valueOf).toArray(i -> {
            return new String[i];
        }));
        File file3 = new File(this.directory, file2.getName());
        Assert.assertTrue(file2.renameTo(file3));
        assertTrueEventually(() -> {
            Assert.assertEquals(5000L, this.list.size());
        });
        finishDirectory(future, file3, file);
    }

    @Test
    @Ignore
    public void stressTest() throws Exception {
        CompletableFuture future = this.instance.getJet().newJob(buildDag()).getFuture();
        sleepAtLeastSeconds(2L);
        int i = 10000;
        File file = new File(this.directory, "file1.log");
        File file2 = new File(this.directory, "file2.log");
        Thread thread = new Thread(new RandomWriter(file, 10000, "file0"));
        Thread thread2 = new Thread(new RandomWriter(file2, 10000, "file1"));
        thread.start();
        thread2.start();
        thread.join();
        thread2.join();
        assertTrueEventually(() -> {
            Assert.assertEquals(2 * i, this.list.size());
        });
        Set[] setArr = {(Set) IntStream.range(0, 10000).boxed().collect(Collectors.toSet()), (Set) IntStream.range(0, 10000).boxed().collect(Collectors.toSet())};
        Iterator it = this.list.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            Assert.assertTrue(setArr[((String) entry.getValue()).charAt(4) - '0'].remove(Integer.valueOf(Integer.parseInt(((String) entry.getValue()).split(" ", 3)[1]))));
        }
        Assert.assertEquals(Collections.emptySet(), setArr[0]);
        Assert.assertEquals(Collections.emptySet(), setArr[1]);
        finishDirectory(future, file, file2);
    }

    private DAG buildDag() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("reader", SourceProcessors.streamFilesP(this.directory.getPath(), StandardCharsets.UTF_8, "*", false, (v0, v1) -> {
            return Util.entry(v0, v1);
        })).localParallelism(1), dag.newVertex("writer", SinkProcessors.writeListP(this.list.getName())).localParallelism(1)));
        return dag;
    }

    private File createNewFile() {
        File file = new File(this.directory, randomName());
        assertTrueEventually(() -> {
            Assert.assertTrue(file.createNewFile());
        });
        return file;
    }

    private void finishDirectory(Future<Void> future, File... fileArr) throws Exception {
        for (File file : fileArr) {
            this.logger.info("deleting " + file + "...");
            assertTrueEventually(() -> {
                Assert.assertTrue("Failed to delete " + file, file.delete());
            });
            this.logger.info("deleted " + file);
        }
        assertTrueEventually(() -> {
            Assert.assertTrue("Failed to delete " + this.directory, this.directory.delete());
        });
        assertTrueEventually(() -> {
            Assert.assertTrue("job should complete eventually", future.isDone());
        });
        future.get();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 96667762:
                if (implMethodName.equals("entry")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Util") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/Map$Entry;")) {
                    return (v0, v1) -> {
                        return Util.entry(v0, v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
