package org.voltdb.importclient.socket;

import com.google_voltpatches.common.base.Optional;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hsqldb_voltpatches.ErrorCode;
import org.voltcore.logging.Level;
import org.voltdb.importer.AbstractImporter;
import org.voltdb.importer.Invocation;
import org.voltdb.importer.formatter.FormatException;
import org.voltdb.importer.formatter.Formatter;

/* loaded from: input_file:org/voltdb/importclient/socket/PullSocketImporter.class */
public class PullSocketImporter extends AbstractImporter {
    private PullSocketImporterConfig m_config;
    private final AtomicBoolean m_eos = new AtomicBoolean(false);
    private volatile Optional<Thread> m_thread = Optional.absent();
    private volatile Socket m_socket = null;
    private final Object m_socketLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullSocketImporter(PullSocketImporterConfig pullSocketImporterConfig) {
        this.m_config = pullSocketImporterConfig;
    }

    @Override // org.voltdb.importer.AbstractImporter
    public URI getResourceID() {
        return this.m_config.getResourceID();
    }

    @Override // org.voltdb.importer.AbstractImporter
    public void accept() {
        susceptibleRun();
    }

    @Override // org.voltdb.importer.AbstractImporter, org.voltdb.importer.ImporterLifecycle
    public void stop() {
        close();
    }

    @Override // org.voltdb.InternalConnectionContext
    public String getName() {
        return "PullSocketImporter";
    }

    private void closeSocket(Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                error(e, "Unexpected exception closing socket", new Object[0]);
            }
        }
    }

    private void replaceSocket(Socket socket) {
        synchronized (this.m_socketLock) {
            closeSocket(this.m_socket);
            if (this.m_eos.get()) {
                closeSocket(socket);
                this.m_socket = null;
            } else {
                this.m_socket = socket;
            }
        }
    }

    private void susceptibleRun() {
        String readLine;
        if (this.m_eos.get()) {
            return;
        }
        info(null, "Starting socket puller for " + this.m_config.getResourceID(), new Object[0]);
        this.m_thread = Optional.of(Thread.currentThread());
        Formatter create = this.m_config.getFormatterBuilder().create();
        while (!this.m_eos.get()) {
            try {
                Optional<BufferedReader> attemptBufferedReader = attemptBufferedReader();
                if (attemptBufferedReader.isPresent()) {
                    info(null, "Socket puller for " + this.m_config.getResourceID() + " connected.", new Object[0]);
                    BufferedReader bufferedReader = attemptBufferedReader.get();
                    while (true) {
                        readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        try {
                            if (!callProcedure(new Invocation(this.m_config.getProcedure(), create.transform(ByteBuffer.wrap(readLine.getBytes())))) && isDebugEnabled()) {
                                debug(null, "Failed to process Invocation possibly bad data: " + readLine, new Object[0]);
                            }
                        } catch (FormatException e) {
                            rateLimitedLog(Level.ERROR, e, "Failed to tranform data: %s", readLine);
                        }
                    }
                    if (readLine == null) {
                        warn(null, this.m_config.getResourceID() + " peer terminated stream", new Object[0]);
                    }
                } else {
                    sleep(2000);
                }
            } catch (EOFException e2) {
                rateLimitedLog(Level.WARN, e2, this.m_config.getResourceID() + " peer terminated stream", new Object[0]);
            } catch (InterruptedIOException e3) {
                if (this.m_eos.get()) {
                    return;
                } else {
                    rateLimitedLog(Level.ERROR, e3, "Socket puller for %s was interrupted", this.m_config.getResourceID());
                }
            } catch (IOException e4) {
                if (this.m_eos.get() && e4.getMessage().contains("Socket closed")) {
                    return;
                } else {
                    rateLimitedLog(Level.ERROR, e4, "Read fault for %s", this.m_config.getResourceID());
                }
            } catch (InterruptedException e5) {
                if (this.m_eos.get()) {
                    return;
                } else {
                    rateLimitedLog(Level.ERROR, e5, "Socket puller %s was interrupted", this.m_config.getResourceID());
                }
            }
        }
        info(null, "Stopping socket puller for " + this.m_config.getResourceID(), new Object[0]);
    }

    private Optional<BufferedReader> attemptBufferedReader() {
        Optional<BufferedReader> absent = Optional.absent();
        if (this.m_eos.get()) {
            return absent;
        }
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.m_config.getResourceID().getHost(), this.m_config.getResourceID().getPort());
        Socket socket = new Socket();
        try {
            try {
                try {
                    socket.connect(inetSocketAddress, ErrorCode.X_28000);
                    absent = Optional.of(new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8), 8192));
                    replaceSocket(socket);
                } catch (InterruptedIOException e) {
                    if (this.m_eos.get()) {
                        Optional<BufferedReader> optional = absent;
                        replaceSocket(socket);
                        return optional;
                    }
                    if (isDebugEnabled()) {
                        rateLimitedLog(Level.DEBUG, e, "Unable to connect to " + this.m_config.getResourceID(), new Object[0]);
                    }
                    replaceSocket(socket);
                }
            } catch (IOException e2) {
                if (isDebugEnabled()) {
                    rateLimitedLog(Level.DEBUG, e2, "Unable to connect to " + this.m_config.getResourceID(), new Object[0]);
                }
                replaceSocket(socket);
            }
            return absent;
        } catch (Throwable th) {
            replaceSocket(socket);
            throw th;
        }
    }

    private boolean sleep(int i) throws InterruptedException {
        if (this.m_eos.get()) {
            return true;
        }
        try {
            Thread.sleep(i);
            return false;
        } catch (InterruptedException e) {
            if (this.m_eos.get()) {
                return true;
            }
            throw e;
        }
    }

    public void close() {
        if (this.m_eos.compareAndSet(false, true) && this.m_thread.isPresent()) {
            this.m_thread.get().interrupt();
        }
        replaceSocket(null);
    }
}
