package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.OutputStream;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamSocketPTest.class */
public class StreamSocketPTest extends JetTestSupport {

    @Parameterized.Parameter
    public String input;

    @Parameterized.Parameter(1)
    public List<String> output;
    private Queue<Object> bucket;
    private TestOutbox outbox;
    private TestProcessorContext context;

    @Parameterized.Parameters(name = "input={0}, output={1}")
    public static Collection<Object[]> parameters() {
        List asList = Arrays.asList("1", "2");
        return Arrays.asList(new Object[]{"1%n2%n", asList}, new Object[]{"1%r%n2%r%n", asList}, new Object[]{"1%n2%r%n", asList}, new Object[]{"1%r%n2%n", asList}, new Object[]{"1%r2%n", asList}, new Object[]{"", Collections.emptyList()}, new Object[]{"%n", Collections.singletonList("")}, new Object[]{"1", Collections.emptyList()}, new Object[]{"1%n2", Collections.singletonList("1")}, new Object[]{"1%n%n2%n", Arrays.asList("1", "", "2")});
    }

    @Before
    public void before() {
        this.outbox = new TestOutbox(new int[]{10});
        this.context = new TestProcessorContext();
        this.bucket = this.outbox.queue(0);
        this.input = this.input.replaceAll("%n", "\n").replaceAll("%r", "\r");
    }

    @Test
    public void smokeTest() throws Exception {
        byte[] bytes = this.input.getBytes(StandardCharsets.UTF_16);
        for (int i = 0; i < bytes.length; i++) {
            this.logger.info("--------- runTest(" + i + ") ---------");
            runTest(bytes, i);
        }
    }

    private void runTest(byte[] bArr, int i) throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            Thread thread = new Thread(() -> {
                Util.uncheckRun(() -> {
                    Socket accept = serverSocket.accept();
                    OutputStream outputStream = accept.getOutputStream();
                    outputStream.write(bArr, 0, i);
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    outputStream.write(bArr, i, bArr.length - i);
                    outputStream.close();
                    accept.close();
                });
            });
            thread.start();
            Processor processor = (Processor) TestSupport.supplierFrom(SourceProcessors.streamSocketP("localhost", serverSocket.getLocalPort(), StandardCharsets.UTF_16)).get();
            processor.init(this.outbox, this.context);
            countDownLatch.await();
            for (int i2 = 0; i2 < 10; i2++) {
                processor.complete();
                sleepMillis(1);
            }
            countDownLatch2.countDown();
            while (!processor.complete()) {
                sleepMillis(1);
            }
            Iterator<String> it = this.output.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(it.next(), this.bucket.poll());
            }
            Assert.assertNull(this.bucket.poll());
            assertTrueEventually(() -> {
                Assert.assertFalse(thread.isAlive());
            });
            serverSocket.close();
        } catch (Throwable th) {
            try {
                serverSocket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1755015840:
                if (implMethodName.equals("lambda$runTest$4c481382$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamSocketPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/ServerSocket;[BILjava/util/concurrent/CountDownLatch;Ljava/util/concurrent/CountDownLatch;)V")) {
                    ServerSocket serverSocket = (ServerSocket) serializedLambda.getCapturedArg(0);
                    byte[] bArr = (byte[]) serializedLambda.getCapturedArg(1);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(3);
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(4);
                    return () -> {
                        Socket accept = serverSocket.accept();
                        OutputStream outputStream = accept.getOutputStream();
                        outputStream.write(bArr, 0, intValue);
                        countDownLatch.countDown();
                        countDownLatch2.await();
                        outputStream.write(bArr, intValue, bArr.length - intValue);
                        outputStream.close();
                        accept.close();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
