/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"command execution", "command", "stream", "execute"})
@CapabilityDescription(value="Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@DynamicProperty(name="An environment variable name", value="An environment variable value", description="These environment variables are passed to the process spawned by this Processor")
@WritesAttributes(value={@WritesAttribute(attribute="execution.command", description="The name of the command executed"), @WritesAttribute(attribute="execution.command.args", description="The semi-colon delimited list of arguments"), @WritesAttribute(attribute="execution.status", description="The exit status code returned from executing the command"), @WritesAttribute(attribute="execution.error", description="Any error messages returned from executing the command")})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXECUTE_CODE, explanation="Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")})
public class ExecuteStreamCommand
extends AbstractProcessor {
    public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder().name("original").description("FlowFiles that were successfully processed.").build();
    public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder().name("output stream").description("The destination path for the flow file created from the command's output, if the returned status code is zero.").build();
    public static final Relationship NONZERO_STATUS_RELATIONSHIP = new Relationship.Builder().name("nonzero status").description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. All flow files routed to this relationship will be penalized.").build();
    private AtomicReference<Set<Relationship>> relationships = new AtomicReference();
    private static final Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
    private static final Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
    private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
    static final PropertyDescriptor EXECUTION_COMMAND;
    static final PropertyDescriptor EXECUTION_ARGUMENTS;
    static final PropertyDescriptor WORKING_DIR;
    static final PropertyDescriptor IGNORE_STDIN;
    static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE;
    static final PropertyDescriptor PUT_ATTRIBUTE_MAX_LENGTH;
    private static final Validator characterValidator;
    static final PropertyDescriptor ARG_DELIMITER;
    private static final List<PropertyDescriptor> PROPERTIES;
    private ComponentLog logger;

    public Set<Relationship> getRelationships() {
        return this.relationships.get();
    }

    protected void init(ProcessorInitializationContext context) {
        this.logger = this.getLogger();
        this.relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)PUT_OUTPUT_IN_ATTRIBUTE)) {
            if (newValue != null) {
                this.relationships.set(ATTRIBUTE_RELATIONSHIP_SET);
            } else {
                this.relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
            }
        }
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment").dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Process process;
        FlowFile inputFlowFile = session.get();
        if (null == inputFlowFile) {
            return;
        }
        ArrayList<String> args = new ArrayList<String>();
        boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
        Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
        String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
        String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
        args.add(executeCommand);
        String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
        boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
        if (!StringUtils.isBlank((CharSequence)commandArguments)) {
            for (String arg : ArgumentUtils.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
                args.add(arg);
            }
        }
        String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
        ProcessBuilder builder = new ProcessBuilder(new String[0]);
        this.logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
        File dir = null;
        if (!(StringUtils.isBlank((CharSequence)workingDir) || (dir = new File(workingDir)).exists() || dir.mkdirs())) {
            this.logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
        }
        HashMap environment = new HashMap();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            environment.put(((PropertyDescriptor)entry.getKey()).getName(), entry.getValue());
        }
        builder.environment().putAll(environment);
        builder.command(args);
        builder.directory(dir);
        builder.redirectInput(ProcessBuilder.Redirect.PIPE);
        builder.redirectOutput(ProcessBuilder.Redirect.PIPE);
        try {
            process = builder.start();
        }
        catch (IOException e) {
            this.logger.error("Could not create external process to run command", (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
        try (OutputStream pos = process.getOutputStream();
             InputStream pis = process.getInputStream();
             InputStream pes = process.getErrorStream();
             BufferedInputStream bis = new BufferedInputStream(pis);
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes));){
            Relationship outputFlowFileRelationship;
            int exitCode = -1;
            BufferedOutputStream bos = new BufferedOutputStream(pos);
            FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
            ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, this.logger, attributeName, session, outputFlowFile, process, putToAttribute, attributeSize);
            session.read(inputFlowFile, (InputStreamCallback)callback);
            outputFlowFile = callback.outputFlowFile;
            if (putToAttribute) {
                outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
            }
            exitCode = callback.exitCode;
            this.logger.debug("Execution complete for command: {}.  Exited with code: {}", new Object[]{executeCommand, exitCode});
            HashMap<String, String> attributes = new HashMap<String, String>();
            StringBuilder strBldr = new StringBuilder();
            try {
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    strBldr.append(line).append("\n");
                }
            }
            catch (IOException e) {
                strBldr.append("Unknown...could not read Process's Std Error");
            }
            int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
            attributes.put("execution.error", strBldr.substring(0, length));
            Relationship relationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (outputFlowFileRelationship = exitCode != 0 ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP);
            if (exitCode == 0) {
                this.logger.info("Transferring flow file {} to {}", new Object[]{outputFlowFile, outputFlowFileRelationship.getName()});
            } else {
                this.logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}", new Object[]{outputFlowFile, outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()});
            }
            attributes.put("execution.status", Integer.toString(exitCode));
            attributes.put("execution.command", executeCommand);
            attributes.put("execution.command.args", commandArguments);
            outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
            if (NONZERO_STATUS_RELATIONSHIP.equals((Object)outputFlowFileRelationship)) {
                outputFlowFile = session.penalize(outputFlowFile);
            }
            session.transfer(outputFlowFile, outputFlowFileRelationship);
            if (!putToAttribute) {
                this.logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
                inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
                session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
            }
        }
        catch (IOException ex) {
            this.logger.warn("Problem terminating Process {}", new Object[]{process}, (Throwable)ex);
        }
        finally {
            process.destroy();
        }
    }

    private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable, final ComponentLog logger, final InputStream incomingFlowFileIS) throws IOException {
        Thread writerThread = new Thread(new Runnable(){

            @Override
            public void run() {
                if (!ignoreStdin) {
                    try {
                        StreamUtils.copy((InputStream)incomingFlowFileIS, (OutputStream)stdinWritable);
                    }
                    catch (IOException e) {
                        logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, (Throwable)e);
                    }
                }
                IOUtils.closeQuietly((OutputStream)stdinWritable);
            }
        });
        writerThread.setDaemon(true);
        writerThread.start();
    }

    static {
        ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true);
        EXECUTION_COMMAND = new PropertyDescriptor.Builder().name("Command Path").description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.").expressionLanguageSupported(true).addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).required(true).build();
        EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder().name("Command Arguments").description("The arguments to supply to the executable delimited by the ';' character.").expressionLanguageSupported(true).addValidator(new Validator(){

            public ValidationResult validate(String subject, String input, ValidationContext context) {
                ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
                List<String> args = ArgumentUtils.splitArgs(input, context.getProperty(ARG_DELIMITER).getValue().charAt(0));
                for (String arg : args) {
                    ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
                    if (valResult.isValid()) continue;
                    result = valResult;
                    break;
                }
                return result;
            }
        }).build();
        WORKING_DIR = new PropertyDescriptor.Builder().name("Working Directory").description("The directory to use as the current working directory when executing the command").expressionLanguageSupported(true).addValidator(StandardValidators.createDirectoryExistsValidator((boolean)true, (boolean)true)).required(false).build();
        IGNORE_STDIN = new PropertyDescriptor.Builder().name("Ignore STDIN").description("If true, the contents of the incoming flowfile will not be passed to the executing command").addValidator(Validator.VALID).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
        PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder().name("Output Destination Attribute").description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate FlowFile. There will no longer be a relationship for 'output stream' or 'nonzero status'. The value of this property will be the key for the output attribute.").addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).build();
        PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder().name("Max Attribute Length").description("If routing the output of the stream command to an attribute, the number of characters put to the attribute value will be at most this amount. This is important because attributes are held in memory and large attributes will quickly cause out of memory issues. If the output goes longer than this value, it will truncated to fit. Consider making this smaller if able.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("256").build();
        characterValidator = new StandardValidators.StringLengthValidator(1, 1);
        ARG_DELIMITER = new PropertyDescriptor.Builder().name("Argument Delimiter").description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character").addValidator(Validator.VALID).addValidator(characterValidator).required(true).defaultValue(";").build();
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(EXECUTION_ARGUMENTS);
        props.add(EXECUTION_COMMAND);
        props.add(IGNORE_STDIN);
        props.add(WORKING_DIR);
        props.add(ARG_DELIMITER);
        props.add(PUT_OUTPUT_IN_ATTRIBUTE);
        props.add(PUT_ATTRIBUTE_MAX_LENGTH);
        PROPERTIES = Collections.unmodifiableList(props);
        HashSet<Relationship> outputStreamRelationships = new HashSet<Relationship>();
        outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
        outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
        outputStreamRelationships.add(NONZERO_STATUS_RELATIONSHIP);
        OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships);
        HashSet<Relationship> attributeRelationships = new HashSet<Relationship>();
        attributeRelationships.add(ORIGINAL_RELATIONSHIP);
        ATTRIBUTE_RELATIONSHIP_SET = Collections.unmodifiableSet(attributeRelationships);
    }

    static class ProcessStreamWriterCallback
    implements InputStreamCallback {
        final boolean ignoreStdin;
        final OutputStream stdinWritable;
        final InputStream stdoutReadable;
        final ComponentLog logger;
        final ProcessSession session;
        final Process process;
        FlowFile outputFlowFile;
        int exitCode;
        final boolean putToAttribute;
        final int attributeSize;
        final String attributeName;
        byte[] outputBuffer;
        int size;

        public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable, ComponentLog logger, String attributeName, ProcessSession session, FlowFile outputFlowFile, Process process, boolean putToAttribute, int attributeSize) {
            this.ignoreStdin = ignoreStdin;
            this.stdinWritable = stdinWritable;
            this.stdoutReadable = stdoutReadable;
            this.logger = logger;
            this.session = session;
            this.outputFlowFile = outputFlowFile;
            this.process = process;
            this.putToAttribute = putToAttribute;
            this.attributeSize = attributeSize;
            this.attributeName = attributeName;
        }

        public void process(final InputStream incomingFlowFileIS) throws IOException {
            if (this.putToAttribute) {
                try (SoftLimitBoundedByteArrayOutputStream softLimitBoundedBAOS = new SoftLimitBoundedByteArrayOutputStream(this.attributeSize);){
                    ExecuteStreamCommand.readStdoutReadable(this.ignoreStdin, this.stdinWritable, this.logger, incomingFlowFileIS);
                    long longSize = StreamUtils.copy((InputStream)this.stdoutReadable, (OutputStream)softLimitBoundedBAOS);
                    this.size = longSize > (long)this.attributeSize ? this.attributeSize : (int)longSize;
                    this.outputBuffer = softLimitBoundedBAOS.getBuffer();
                    this.stdoutReadable.close();
                    try {
                        this.exitCode = this.process.waitFor();
                    }
                    catch (InterruptedException e) {
                        this.logger.warn("Command Execution Process was interrupted", (Throwable)e);
                    }
                }
            } else {
                this.outputFlowFile = this.session.write(this.outputFlowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        ExecuteStreamCommand.readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
                        StreamUtils.copy((InputStream)stdoutReadable, (OutputStream)out);
                        try {
                            exitCode = process.waitFor();
                        }
                        catch (InterruptedException e) {
                            logger.warn("Command Execution Process was interrupted", (Throwable)e);
                        }
                    }
                });
            }
        }
    }
}

