/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@Tags(value={"command", "process", "source", "external", "invoke", "script"})
@CapabilityDescription(value="Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals.")
@DynamicProperty(name="An environment variable name", value="An environment variable value", description="These environment variables are passed to the process spawned by this Processor")
public class ExecuteProcess
extends AbstractProcessor {
    public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder().name("Command").description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.").required(true).expressionLanguageSupported(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder().name("Command Arguments").description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.").required(false).expressionLanguageSupported(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder().name("Working Directory").description("The directory to use as the current working directory when executing the command").expressionLanguageSupported(false).addValidator(StandardValidators.createDirectoryExistsValidator((boolean)false, (boolean)true)).required(false).build();
    public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder().name("Batch Duration").description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so that the output will be captured for this amount of time and a FlowFile will then be sent out with the results and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results").required(false).expressionLanguageSupported(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder().name("Redirect Error Stream").description("If true will redirect any error stream output of the process to the output stream. This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.").required(false).allowableValues(new String[]{"true", "false"}).defaultValue("false").expressionLanguageSupported(false).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All created FlowFiles are routed to this relationship").build();
    private volatile ExecutorService executor;
    private Future<?> longRunningProcess;
    private AtomicBoolean failure = new AtomicBoolean(false);
    private volatile ProxyOutputStream proxyOut;

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(COMMAND);
        properties.add(COMMAND_ARGUMENTS);
        properties.add(BATCH_DURATION);
        properties.add(REDIRECT_ERROR_STREAM);
        return properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment").dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    }

    static List<String> splitArgs(String input) {
        if (input == null) {
            return Collections.emptyList();
        }
        ArrayList<String> args = new ArrayList<String>();
        String trimmed = input.trim();
        boolean inQuotes = false;
        StringBuilder sb = new StringBuilder();
        block4: for (int i = 0; i < trimmed.length(); ++i) {
            char c = trimmed.charAt(i);
            switch (c) {
                case '\t': 
                case '\n': 
                case '\r': 
                case ' ': {
                    if (inQuotes) {
                        sb.append(c);
                        continue block4;
                    }
                    String arg = sb.toString().trim();
                    if (!arg.isEmpty()) {
                        args.add(arg);
                    }
                    sb.setLength(0);
                    continue block4;
                }
                case '\"': {
                    inQuotes = !inQuotes;
                    continue block4;
                }
                default: {
                    sb.append(c);
                }
            }
        }
        String finalArg = sb.toString().trim();
        if (!finalArg.isEmpty()) {
            args.add(finalArg);
        }
        return args;
    }

    @OnScheduled
    public void setupExecutor(ProcessContext context) {
        this.executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory(){
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable r) {
                Thread t = this.defaultFactory.newThread(r);
                t.setName("ExecuteProcess " + ExecuteProcess.this.getIdentifier() + " Task");
                return t;
            }
        });
    }

    @OnUnscheduled
    public void shutdownExecutor() {
        this.executor.shutdown();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.proxyOut == null) {
            this.proxyOut = new ProxyOutputStream(this.getLogger());
        }
        final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
        List<String> commandStrings = this.createCommandStrings(context);
        String commandString = StringUtils.join(commandStrings, (String)" ");
        if (this.longRunningProcess == null || this.longRunningProcess.isDone()) {
            try {
                this.longRunningProcess = this.launchProcess(context, commandStrings, batchNanos, this.proxyOut);
            }
            catch (IOException ioe) {
                this.getLogger().error("Failed to create process due to {}", new Object[]{ioe});
                context.yield();
                return;
            }
        } else {
            this.getLogger().info("Read from long running process");
        }
        if (!this.isScheduled()) {
            this.getLogger().info("User stopped processor; will terminate process immediately");
            this.longRunningProcess.cancel(true);
            return;
        }
        FlowFile flowFile = session.create();
        if ((flowFile = session.write(flowFile, new OutputStreamCallback(){

            public void process(OutputStream flowFileOut) throws IOException {
                try (BufferedOutputStream out = new BufferedOutputStream(flowFileOut);){
                    ExecuteProcess.this.proxyOut.setDelegate(out);
                    if (batchNanos == null) {
                        try {
                            ExecuteProcess.this.longRunningProcess.get();
                        }
                        catch (InterruptedException ie) {
                        }
                        catch (ExecutionException ee) {
                            ExecuteProcess.this.getLogger().error("Process execution failed due to {}", new Object[]{ee.getCause()});
                        }
                    } else {
                        try {
                            TimeUnit.NANOSECONDS.sleep(batchNanos);
                        }
                        catch (InterruptedException ie) {
                            // empty catch block
                        }
                    }
                    ExecuteProcess.this.proxyOut.setDelegate(null);
                }
            }
        })).getSize() == 0L) {
            session.remove(flowFile);
        } else if (this.failure.get()) {
            session.remove(flowFile);
            this.getLogger().error("Failed to read data from Process, so will not generate FlowFile");
        } else {
            session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
            this.getLogger().info("Created {} and routed to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
        }
        session.commit();
    }

    protected List<String> createCommandStrings(ProcessContext context) {
        String command = context.getProperty(COMMAND).getValue();
        List<String> args = ExecuteProcess.splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
        ArrayList<String> commandStrings = new ArrayList<String>(args.size() + 1);
        commandStrings.add(command);
        commandStrings.addAll(args);
        return commandStrings;
    }

    protected Future<?> launchProcess(ProcessContext context, List<String> commandStrings, final Long batchNanos, final ProxyOutputStream proxyOut) throws IOException {
        Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
        ProcessBuilder builder = new ProcessBuilder(commandStrings);
        String workingDirName = context.getProperty(WORKING_DIR).getValue();
        if (workingDirName != null) {
            builder.directory(new File(workingDirName));
        }
        HashMap environment = new HashMap();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            environment.put(((PropertyDescriptor)entry.getKey()).getName(), entry.getValue());
        }
        if (!environment.isEmpty()) {
            builder.environment().putAll(environment);
        }
        this.getLogger().info("Start creating new Process > {} ", new Object[]{commandStrings});
        final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start();
        if (!redirectErrorStream.booleanValue()) {
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()));){
                        while (reader.read() >= 0) {
                        }
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
            });
        }
        this.failure = new AtomicBoolean(false);
        Future<Object> future = this.executor.submit(new Callable<Object>(){

            /*
             * Exception decompiling
             */
            @Override
            public Object call() throws IOException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [1[TRYBLOCK]], but top level block is 36[WHILELOOP]
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }
        });
        return future;
    }

    static /* synthetic */ boolean access$300(ExecuteProcess x0) {
        return x0.isScheduled();
    }

    static /* synthetic */ boolean access$400(ExecuteProcess x0) {
        return x0.isScheduled();
    }

    static /* synthetic */ AtomicBoolean access$500(ExecuteProcess x0) {
        return x0.failure;
    }

    static /* synthetic */ ProcessorLog access$600(ExecuteProcess x0) {
        return x0.getLogger();
    }

    private static class ProxyOutputStream
    extends OutputStream {
        private final ProcessorLog logger;
        private final Lock lock = new ReentrantLock();
        private OutputStream delegate;

        public ProxyOutputStream(ProcessorLog logger) {
            this.logger = logger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setDelegate(OutputStream delegate) {
            this.lock.lock();
            try {
                this.logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate});
                this.delegate = delegate;
            }
            finally {
                this.lock.unlock();
            }
        }

        private void sleep(long millis) {
            try {
                Thread.sleep(millis);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        @Override
        public void write(int b) throws IOException {
            this.lock.lock();
            try {
                while (true) {
                    if (this.delegate != null) {
                        this.logger.trace("Writing to {}", new Object[]{this.delegate});
                        this.delegate.write(b);
                        return;
                    }
                    this.lock.unlock();
                    this.sleep(1L);
                    this.lock.lock();
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.lock.lock();
            try {
                while (true) {
                    if (this.delegate != null) {
                        this.logger.trace("Writing to {}", new Object[]{this.delegate});
                        this.delegate.write(b, off, len);
                        return;
                    }
                    this.lock.unlock();
                    this.sleep(1L);
                    this.lock.lock();
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.write(b, 0, b.length);
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public void flush() throws IOException {
            this.lock.lock();
            try {
                while (true) {
                    if (this.delegate != null) {
                        this.delegate.flush();
                        return;
                    }
                    this.lock.unlock();
                    this.sleep(1L);
                    this.lock.lock();
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

