/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;

@EventDriven
@SupportsBatching
@Tags(value={"command execution", "command", "stream", "execute"})
@CapabilityDescription(value="Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@WritesAttributes(value={@WritesAttribute(attribute="execution.command", description="The name of the command executed to create the new FlowFile"), @WritesAttribute(attribute="execution.command.args", description="The semi-colon delimited list of arguments"), @WritesAttribute(attribute="execution.status", description="The exit status code returned from executing the command"), @WritesAttribute(attribute="execution.error", description="Any error messages returned from executing the command")})
public class ExecuteStreamCommand
extends AbstractProcessor {
    public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder().name("original").description("FlowFiles that were successfully processed").build();
    public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder().name("output stream").description("The destination path for the flow file created from the command's output").build();
    private static final Set<Relationship> RELATIONSHIPS;
    private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
    static final PropertyDescriptor EXECUTION_COMMAND;
    static final PropertyDescriptor EXECUTION_ARGUMENTS;
    static final PropertyDescriptor WORKING_DIR;
    private static final List<PropertyDescriptor> PROPERTIES;
    private ProcessorLog logger;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected void init(ProcessorInitializationContext context) {
        this.logger = this.getLogger();
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Process process;
        FlowFile flowFile = session.get();
        if (null == flowFile) {
            return;
        }
        ArrayList<String> args = new ArrayList<String>();
        String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
        args.add(executeCommand);
        String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
        if (!StringUtils.isBlank((CharSequence)commandArguments)) {
            for (String arg : commandArguments.split(";")) {
                args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
            }
        }
        String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue();
        ProcessBuilder builder = new ProcessBuilder(new String[0]);
        this.logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
        File dir = null;
        if (!(StringUtils.isBlank((CharSequence)workingDir) || (dir = new File(workingDir)).exists() || dir.mkdirs())) {
            this.logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
        }
        builder.command(args);
        builder.directory(dir);
        builder.redirectInput(ProcessBuilder.Redirect.PIPE);
        builder.redirectOutput(ProcessBuilder.Redirect.PIPE);
        try {
            process = builder.start();
        }
        catch (IOException e) {
            this.logger.error("Could not create external process to run command", (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
        try (OutputStream pos = process.getOutputStream();
             InputStream pis = process.getInputStream();
             InputStream pes = process.getErrorStream();
             BufferedInputStream bis = new BufferedInputStream(pis);
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes));){
            int exitCode = -1;
            BufferedOutputStream bos = new BufferedOutputStream(pos);
            FlowFile outputStreamFlowFile = session.create(flowFile);
            StdInWriterCallback callback = new StdInWriterCallback((OutputStream)bos, (InputStream)bis, this.logger, session, outputStreamFlowFile, process);
            session.read(flowFile, (InputStreamCallback)callback);
            outputStreamFlowFile = callback.outputStreamFlowFile;
            exitCode = callback.exitCode;
            this.logger.debug("Execution complete for command: {}.  Exited with code: {}", new Object[]{executeCommand, exitCode});
            HashMap<String, String> attributes = new HashMap<String, String>();
            StringBuilder strBldr = new StringBuilder();
            try {
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    strBldr.append(line).append("\n");
                }
            }
            catch (IOException e) {
                strBldr.append("Unknown...could not read Process's Std Error");
            }
            int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
            attributes.put("execution.error", strBldr.substring(0, length));
            if (exitCode == 0) {
                this.logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile});
            } else {
                this.logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}", new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()});
            }
            attributes.put("execution.status", Integer.toString(exitCode));
            attributes.put("execution.command", executeCommand);
            attributes.put("execution.command.args", commandArguments);
            outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
            session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
            this.logger.info("Transferring flow file {} to original", new Object[]{flowFile});
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, ORIGINAL_RELATIONSHIP);
        }
        catch (IOException ex) {
            this.logger.warn("Problem terminating Process {}", new Object[]{process}, (Throwable)ex);
        }
        finally {
            process.destroy();
        }
    }

    static {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(OUTPUT_STREAM_RELATIONSHIP);
        rels.add(ORIGINAL_RELATIONSHIP);
        RELATIONSHIPS = Collections.unmodifiableSet(rels);
        ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true);
        EXECUTION_COMMAND = new PropertyDescriptor.Builder().name("Command Path").description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.").expressionLanguageSupported(true).addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).required(true).build();
        EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder().name("Command Arguments").description("The arguments to supply to the executable delimited by the ';' character.").expressionLanguageSupported(true).addValidator(new Validator(){

            public ValidationResult validate(String subject, String input, ValidationContext context) {
                String[] args;
                ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
                for (String arg : args = input.split(";")) {
                    ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
                    if (valResult.isValid()) continue;
                    result = valResult;
                    break;
                }
                return result;
            }
        }).build();
        WORKING_DIR = new PropertyDescriptor.Builder().name("Working Directory").description("The directory to use as the current working directory when executing the command").expressionLanguageSupported(true).addValidator(StandardValidators.createDirectoryExistsValidator((boolean)true, (boolean)true)).required(false).build();
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(EXECUTION_ARGUMENTS);
        props.add(EXECUTION_COMMAND);
        props.add(WORKING_DIR);
        PROPERTIES = Collections.unmodifiableList(props);
    }

    static class StdInWriterCallback
    implements InputStreamCallback {
        final OutputStream stdInWritable;
        final InputStream stdOutReadable;
        final ProcessorLog logger;
        final ProcessSession session;
        final Process process;
        FlowFile outputStreamFlowFile;
        int exitCode;

        public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) {
            this.stdInWritable = stdInWritable;
            this.stdOutReadable = stdOutReadable;
            this.logger = logger;
            this.session = session;
            this.outputStreamFlowFile = outputStreamFlowFile;
            this.process = process;
        }

        public void process(final InputStream incomingFlowFileIS) throws IOException {
            this.outputStreamFlowFile = this.session.write(this.outputStreamFlowFile, new OutputStreamCallback(){

                public void process(OutputStream out) throws IOException {
                    Thread writerThread = new Thread(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                StreamUtils.copy((InputStream)incomingFlowFileIS, (OutputStream)StdInWriterCallback.this.stdInWritable);
                            }
                            catch (IOException e) {
                                StdInWriterCallback.this.logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, (Throwable)e);
                            }
                            IOUtils.closeQuietly((OutputStream)StdInWriterCallback.this.stdInWritable);
                        }
                    });
                    writerThread.setDaemon(true);
                    writerThread.start();
                    StreamUtils.copy((InputStream)StdInWriterCallback.this.stdOutReadable, (OutputStream)out);
                    try {
                        StdInWriterCallback.this.exitCode = StdInWriterCallback.this.process.waitFor();
                    }
                    catch (InterruptedException e) {
                        StdInWriterCallback.this.logger.warn("Command Execution Process was interrupted", (Throwable)e);
                    }
                }
            });
        }
    }
}

