package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.impl.util.Util;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/ReadSocketTextStreamP.class */
public class ReadSocketTextStreamP extends AbstractProcessor implements Closeable {
    private final String host;
    private final int port;
    private BufferedReader bufferedReader;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/ReadSocketTextStreamP$Supplier.class */
    private static class Supplier implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final String host;
        private final int port;
        private transient ReadSocketTextStreamP reader;

        Supplier(String str, int i) {
            this.host = str;
            this.port = i;
        }

        @Override // com.hazelcast.jet.ProcessorSupplier
        @Nonnull
        public List<Processor> get(int i) {
            ReadSocketTextStreamP.assertCountIsOne(i);
            this.reader = new ReadSocketTextStreamP(this.host, this.port);
            return Collections.singletonList(this.reader);
        }

        @Override // com.hazelcast.jet.ProcessorSupplier
        public void complete(Throwable th) {
            ReadSocketTextStreamP readSocketTextStreamP = this.reader;
            readSocketTextStreamP.getClass();
            Util.uncheckRun(readSocketTextStreamP::close);
        }
    }

    ReadSocketTextStreamP(String str, int i) {
        this.host = str;
        this.port = i;
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) throws Exception {
        getLogger().info("Connecting to socket " + hostAndPort());
        Socket socket = new Socket(this.host, this.port);
        getLogger().info("Connected to socket " + hostAndPort());
        this.bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
    }

    @Override // com.hazelcast.jet.Processor
    public boolean complete() {
        return ((Boolean) Util.uncheckCall(this::tryComplete)).booleanValue();
    }

    private boolean tryComplete() throws IOException {
        String readLine = this.bufferedReader.readLine();
        if (readLine == null) {
            return true;
        }
        emit(readLine);
        return false;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.bufferedReader != null) {
            getLogger().info("Closing socket " + hostAndPort());
            this.bufferedReader.close();
        }
    }

    @Override // com.hazelcast.jet.AbstractProcessor, com.hazelcast.jet.Processor
    public boolean isCooperative() {
        return false;
    }

    public static ProcessorSupplier supplier(String str, int i) {
        return new Supplier(str, i);
    }

    private String hostAndPort() {
        return this.host + ':' + this.port;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertCountIsOne(int i) {
        if (i != 1) {
            throw new IllegalArgumentException("count != 1");
        }
    }
}
