package com.hazelcast.jet.impl.connector;

import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamSocketP_integrationTest.class */
public class StreamSocketP_integrationTest extends JetTestSupport {
    private static final String HOST = "localhost";
    private static final int PORT = 8888;
    private HazelcastInstance instance;

    @Before
    public void setupEngine() {
        this.instance = createHazelcastInstance();
    }

    @After
    public void teardownEngine() {
        this.instance.shutdown();
    }

    @Test
    public void when_dataWrittenToSocket_then_dataImmediatelyEmitted() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ServerSocket serverSocket = new ServerSocket(PORT);
        Throwable th = null;
        try {
            try {
                spawn(() -> {
                    Util.uncheckRun(() -> {
                        Socket accept = serverSocket.accept();
                        Socket accept2 = serverSocket.accept();
                        PrintWriter printWriter = new PrintWriter(accept.getOutputStream());
                        printWriter.write("hello1 \n");
                        printWriter.flush();
                        PrintWriter printWriter2 = new PrintWriter(accept2.getOutputStream());
                        printWriter2.write("hello2 \n");
                        printWriter2.flush();
                        assertOpenEventually(countDownLatch);
                        printWriter.write("world1 \n");
                        printWriter.write("jet1 \n");
                        printWriter.flush();
                        printWriter2.write("world2 \n");
                        printWriter2.write("jet2 \n");
                        printWriter2.flush();
                        accept.close();
                        accept2.close();
                    });
                });
                DAG dag = new DAG();
                dag.edge(Edge.between(dag.newVertex("producer", SourceProcessors.streamSocketP(HOST, PORT, StandardCharsets.UTF_8)).localParallelism(2), dag.newVertex("consumer", SinkProcessors.writeListP("consumer")).localParallelism(1)));
                Job newJob = this.instance.getJet().newJob(dag);
                IList list = this.instance.getList("consumer");
                assertTrueEventually(() -> {
                    Assert.assertEquals(2L, list.size());
                });
                countDownLatch.countDown();
                newJob.join();
                Assert.assertEquals(6L, list.size());
                if (serverSocket != null) {
                    if (0 == 0) {
                        serverSocket.close();
                        return;
                    }
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (serverSocket != null) {
                if (th != null) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void when_jobCancelled_then_readerClosed() throws Exception {
        ServerSocket serverSocket = new ServerSocket(PORT);
        Throwable th = null;
        try {
            AtomicReference atomicReference = new AtomicReference();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            spawn(() -> {
                Util.uncheckRun(() -> {
                    atomicReference.set(serverSocket.accept());
                    countDownLatch.countDown();
                    byte[] bytes = "jet\n".getBytes();
                    OutputStream outputStream = ((Socket) atomicReference.get()).getOutputStream();
                    Throwable th2 = null;
                    while (true) {
                        try {
                            try {
                                outputStream.write(bytes);
                                outputStream.flush();
                                Thread.sleep(1000L);
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (outputStream != null) {
                                if (th2 != null) {
                                    try {
                                        outputStream.close();
                                    } catch (Throwable th4) {
                                        th2.addSuppressed(th4);
                                    }
                                } else {
                                    outputStream.close();
                                }
                            }
                            throw th3;
                        }
                    }
                });
            });
            Vertex localParallelism = new Vertex("producer", SourceProcessors.streamSocketP(HOST, PORT, StandardCharsets.UTF_8)).localParallelism(1);
            Vertex localParallelism2 = new Vertex("sink", Processors.noopP()).localParallelism(1);
            Job newJob = this.instance.getJet().newJob(new DAG().vertex(localParallelism).vertex(localParallelism2).edge(Edge.between(localParallelism, localParallelism2)));
            countDownLatch.await();
            newJob.cancel();
            assertTrueEventually(() -> {
                Assert.assertTrue("Socket not closed", ((Socket) atomicReference.get()).isClosed());
            });
            if (serverSocket != null) {
                if (0 == 0) {
                    serverSocket.close();
                    return;
                }
                try {
                    serverSocket.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1507824971:
                if (implMethodName.equals("lambda$null$8aa4006b$1")) {
                    z = true;
                    break;
                }
                break;
            case -1267178774:
                if (implMethodName.equals("lambda$null$f66b1794$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamSocketP_integrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/ServerSocket;Ljava/util/concurrent/CountDownLatch;)V")) {
                    ServerSocket serverSocket = (ServerSocket) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return () -> {
                        Socket accept = serverSocket.accept();
                        Socket accept2 = serverSocket.accept();
                        PrintWriter printWriter = new PrintWriter(accept.getOutputStream());
                        printWriter.write("hello1 \n");
                        printWriter.flush();
                        PrintWriter printWriter2 = new PrintWriter(accept2.getOutputStream());
                        printWriter2.write("hello2 \n");
                        printWriter2.flush();
                        assertOpenEventually(countDownLatch);
                        printWriter.write("world1 \n");
                        printWriter.write("jet1 \n");
                        printWriter.flush();
                        printWriter2.write("world2 \n");
                        printWriter2.write("jet2 \n");
                        printWriter2.flush();
                        accept.close();
                        accept2.close();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamSocketP_integrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicReference;Ljava/net/ServerSocket;Ljava/util/concurrent/CountDownLatch;)V")) {
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(0);
                    ServerSocket serverSocket2 = (ServerSocket) serializedLambda.getCapturedArg(1);
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(2);
                    return () -> {
                        atomicReference.set(serverSocket2.accept());
                        countDownLatch2.countDown();
                        byte[] bytes = "jet\n".getBytes();
                        OutputStream outputStream = ((Socket) atomicReference.get()).getOutputStream();
                        Throwable th2 = null;
                        while (true) {
                            try {
                                try {
                                    outputStream.write(bytes);
                                    outputStream.flush();
                                    Thread.sleep(1000L);
                                } finally {
                                }
                            } catch (Throwable th3) {
                                if (outputStream != null) {
                                    if (th2 != null) {
                                        try {
                                            outputStream.close();
                                        } catch (Throwable th4) {
                                            th2.addSuppressed(th4);
                                        }
                                    } else {
                                        outputStream.close();
                                    }
                                }
                                throw th3;
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
