package com.amazonaws.services.kinesis.multilang;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.class */
public class MultiLangRecordProcessor implements IRecordProcessor {
    private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
    private static final int EXIT_VALUE = 1;
    private volatile boolean initialized;
    private String shardId;
    private Future<?> stderrReadTask;
    private MessageWriter messageWriter;
    private MessageReader messageReader;
    private DrainChildSTDERRTask readSTDERRTask;
    private ProcessBuilder processBuilder;
    private Process process;
    private ExecutorService executorService;
    private ProcessState state;
    private ObjectMapper objectMapper;
    private MultiLangProtocol protocol;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor$ProcessState.class */
    public enum ProcessState {
        ACTIVE,
        SHUTDOWN
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
    public void initialize(InitializationInput initializationInput) {
        try {
            this.shardId = initializationInput.getShardId();
            try {
                this.process = startProcess();
                this.messageWriter.initialize(this.process.getOutputStream(), this.shardId, this.objectMapper, this.executorService);
                this.messageReader.initialize(this.process.getInputStream(), this.shardId, this.objectMapper, this.executorService);
                this.readSTDERRTask.initialize(this.process.getErrorStream(), this.shardId, "Reading STDERR for " + this.shardId);
                this.stderrReadTask = this.executorService.submit(this.readSTDERRTask);
                this.protocol = new MultiLangProtocol(this.messageReader, this.messageWriter, initializationInput);
                if (!this.protocol.initialize()) {
                    throw new RuntimeException("Failed to initialize child process");
                }
                this.initialized = true;
            } catch (IOException e) {
                throw new IOException("Failed to start client executable", e);
            }
        } catch (Throwable th) {
            stopProcessing("Encountered an error while trying to initialize record processor", th);
        }
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        try {
            if (this.protocol.processRecords(processRecordsInput)) {
            } else {
                throw new RuntimeException("Child process failed to process records");
            }
        } catch (Throwable th) {
            stopProcessing("Encountered an error while trying to process records", th);
        }
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
    public void shutdown(ShutdownInput shutdownInput) {
        if (!this.initialized) {
            LOG.info("Record processor was not initialized and will not have a child process, so not invoking child process shutdown.");
            this.state = ProcessState.SHUTDOWN;
            return;
        }
        try {
            if (!ProcessState.ACTIVE.equals(this.state)) {
                LOG.warn("Shutdown was called but this processor is already shutdown. Not doing anything.");
            } else {
                if (!this.protocol.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason())) {
                    throw new RuntimeException("Child process failed to shutdown");
                }
                childProcessShutdownSequence();
            }
        } catch (Throwable th) {
            if (ProcessState.ACTIVE.equals(this.state)) {
                stopProcessing("Encountered an error while trying to shutdown child process", th);
            } else {
                stopProcessing("Encountered an error during shutdown, but it appears the processor has already been shutdown", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper) {
        this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), new DrainChildSTDERRTask());
    }

    MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask drainChildSTDERRTask) {
        this.executorService = executorService;
        this.processBuilder = processBuilder;
        this.objectMapper = objectMapper;
        this.messageWriter = messageWriter;
        this.messageReader = messageReader;
        this.readSTDERRTask = drainChildSTDERRTask;
        this.state = ProcessState.ACTIVE;
    }

    private void childProcessShutdownSequence() {
        try {
            if (this.messageWriter.isOpen()) {
                this.messageWriter.close();
            }
        } catch (IOException e) {
            LOG.error("Encountered exception while trying to close output stream.", e);
        }
        safelyWaitOnFuture(this.messageReader.drainSTDOUT(), "draining STDOUT");
        safelyWaitOnFuture(this.stderrReadTask, "draining STDERR");
        safelyCloseInputStream(this.process.getErrorStream(), "STDERR");
        safelyCloseInputStream(this.process.getInputStream(), "STDOUT");
        try {
            LOG.info("Child process exited with value: " + this.process.waitFor());
        } catch (InterruptedException e2) {
            LOG.error("Interrupted before process finished exiting. Attempting to kill process.");
            this.process.destroy();
        }
        this.state = ProcessState.SHUTDOWN;
    }

    private void safelyCloseInputStream(InputStream inputStream, String str) {
        try {
            inputStream.close();
        } catch (IOException e) {
            LOG.error("Encountered exception while trying to close " + str + " stream.", e);
        }
    }

    private void safelyWaitOnFuture(Future<?> future, String str) {
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Encountered error while " + str + " for shard " + this.shardId, e);
        }
    }

    private void stopProcessing(String str, Throwable th) {
        try {
            LOG.error(str, th);
            if (!this.state.equals(ProcessState.SHUTDOWN)) {
                childProcessShutdownSequence();
            }
        } catch (Throwable th2) {
            LOG.error("Encountered error while trying to shutdown", th2);
        }
        exit();
    }

    void exit() {
        System.exit(1);
    }

    Process startProcess() throws IOException {
        return this.processBuilder.start();
    }
}
