/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBigDecimal;
import org.supercsv.cellprocessor.ParseBool;
import org.supercsv.cellprocessor.ParseChar;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseDouble;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.ParseLong;
import org.supercsv.cellprocessor.constraint.DMinMax;
import org.supercsv.cellprocessor.constraint.Equals;
import org.supercsv.cellprocessor.constraint.ForbidSubStr;
import org.supercsv.cellprocessor.constraint.IsIncludedIn;
import org.supercsv.cellprocessor.constraint.LMinMax;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.constraint.RequireHashCode;
import org.supercsv.cellprocessor.constraint.RequireSubStr;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
import org.supercsv.cellprocessor.constraint.StrRegEx;
import org.supercsv.cellprocessor.constraint.Strlen;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.constraint.UniqueHashCode;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.exception.SuperCsvException;
import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;

@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"csv", "schema", "validation"})
@CapabilityDescription(value="Validates the contents of FlowFiles against a user-specified CSV schema. Take a look at the additional documentation of this processor for some schema examples.")
@WritesAttributes(value={@WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"), @WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"), @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")})
public class ValidateCsv
extends AbstractProcessor {
    private static final List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate", "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null", "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique", "UniqueHashCode", "IsIncludedIn");
    private static final String routeWholeFlowFile = "FlowFile validation";
    private static final String routeLinesIndividually = "Line by line validation";
    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue("FlowFile validation", "FlowFile validation", "As soon as an error is found in the CSV file, the validation will stop and the whole flow file will be routed to the 'invalid' relationship. This option offers best performances.");
    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue("Line by line validation", "Line by line validation", "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' relationship containing all the correct lines and one routed to the 'invalid' relationship containing all the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition:the first occurrence will be considered valid and the next ones as invalid.");
    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder().name("validate-csv-schema").displayName("Schema").description("The schema to be used for validation. Is expected a comma-delimited string representing the cell processors to apply. The following cell processors are allowed in the schema definition: " + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder().name("validate-csv-header").displayName("Header").description("True if the incoming flow file contains a header to ignore, false otherwise.").required(true).defaultValue("true").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder().name("validate-csv-quote").displayName("Quote character").description("Character used as 'quote' in the incoming data. Example: \"").required(true).defaultValue("\"").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder().name("validate-csv-delimiter").displayName("Delimiter character").description("Character used as 'delimiter' in the incoming data. Example: ,").required(true).defaultValue(",").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder().name("validate-csv-eol").displayName("End of line symbols").description("Symbols used as 'end of line' in the incoming data. Example: \\n").required(true).defaultValue("\\n").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder().name("validate-csv-strategy").displayName("Validation strategy").description("Strategy to apply when routing input files to output relationships.").required(true).defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue()).allowableValues(new AllowableValue[]{VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final Relationship REL_VALID = new Relationship.Builder().name("valid").description("FlowFiles that are successfully validated against the schema are routed to this relationship").build();
    public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("FlowFiles that are not valid according to the specified schema are routed to this relationship").build();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final AtomicReference<CellProcessor[]> processors = new AtomicReference();
    private final AtomicReference<CsvPreference> preference = new AtomicReference();

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(SCHEMA);
        properties.add(HEADER);
        properties.add(DELIMITER_CHARACTER);
        properties.add(QUOTE_CHARACTER);
        properties.add(END_OF_LINE_CHARACTER);
        properties.add(VALIDATION_STRATEGY);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_VALID);
        relationships.add(REL_INVALID);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String schema = validationContext.getProperty(SCHEMA).getValue();
        try {
            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
        }
        catch (Exception e) {
            ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(1);
            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName()).input(schema).valid(false).explanation("Error while parsing the schema: " + e.getMessage()).build());
            return problems;
        }
        return super.customValidate(validationContext);
    }

    @OnScheduled
    public void setPreference(ProcessContext context) {
        String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0), (int)context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0), msgDemarcator).build());
    }

    private void parseSchema(String schema) {
        ArrayList<CellProcessor> processorsList = new ArrayList<CellProcessor>();
        String remaining = schema;
        while (remaining.length() > 0) {
            remaining = this.setProcessor(remaining, processorsList);
        }
        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    }

    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
        StringBuffer buffer = new StringBuffer();
        String inputString = remaining;
        int i = 0;
        int opening = 0;
        int closing = 0;
        while (buffer.length() != inputString.length()) {
            char c = remaining.charAt(i);
            ++i;
            if (opening == 0 && c == ',') {
                if (i != 1) break;
                inputString = inputString.substring(1);
                continue;
            }
            buffer.append(c);
            if (c == '(') {
                ++opening;
            } else if (c == ')') {
                ++closing;
            }
            if (opening <= 0 || opening != closing) continue;
            break;
        }
        String procString = buffer.toString().trim();
        opening = procString.indexOf(40);
        String method = procString;
        String argument = null;
        if (opening != -1) {
            argument = method.substring(opening + 1, method.length() - 1);
            method = method.substring(0, opening);
        }
        processorsList.add(this.getProcessor(method.toLowerCase(), argument));
        return remaining.substring(i);
    }

    private CellProcessor getProcessor(String method, String argument) {
        switch (method) {
            case "optional": {
                int opening = argument.indexOf(40);
                String subMethod = argument;
                String subArgument = null;
                if (opening != -1) {
                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
                    subMethod = subMethod.substring(0, opening);
                }
                return new Optional(this.getProcessor(subMethod.toLowerCase(), subArgument));
            }
            case "parsedate": {
                return new ParseDate(argument.substring(1, argument.length() - 1));
            }
            case "parsedouble": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
                }
                return new ParseDouble();
            }
            case "parsebigdecimal": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
                }
                return new ParseBigDecimal();
            }
            case "parsebool": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
                }
                return new ParseBool();
            }
            case "parsechar": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
                }
                return new ParseChar();
            }
            case "parseint": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
                }
                return new ParseInt();
            }
            case "parselong": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
                }
                return new ParseLong();
            }
            case "notnull": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
                }
                return new NotNull();
            }
            case "strregex": {
                return new StrRegEx(argument.substring(1, argument.length() - 1));
            }
            case "unique": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
                }
                return new Unique();
            }
            case "uniquehashcode": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
                }
                return new UniqueHashCode();
            }
            case "strlen": {
                String[] splts = argument.split(",");
                int[] requiredLengths = new int[splts.length];
                for (int i = 0; i < splts.length; ++i) {
                    requiredLengths[i] = Integer.parseInt(splts[i]);
                }
                return new Strlen(requiredLengths);
            }
            case "strminmax": {
                String[] splits = argument.split(",");
                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
            }
            case "lminmax": {
                String[] args = argument.split(",");
                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
            }
            case "dminmax": {
                String[] doubles = argument.split(",");
                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
            }
            case "equals": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
                }
                return new Equals();
            }
            case "forbidsubstr": {
                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
                return new ForbidSubStr(forbiddenSubStrings);
            }
            case "requiresubstr": {
                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
                return new RequireSubStr(requiredSubStrings);
            }
            case "strnotnullorempty": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
                }
                return new StrNotNullOrEmpty();
            }
            case "requirehashcode": {
                String[] hashs = argument.split(",");
                int[] hashcodes = new int[hashs.length];
                for (int i = 0; i < hashs.length; ++i) {
                    hashcodes[i] = Integer.parseInt(hashs[i]);
                }
                return new RequireHashCode(hashcodes);
            }
            case "null": {
                if (argument != null && !argument.isEmpty()) {
                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
                }
                return null;
            }
            case "isincludedin": {
                Object[] elements = argument.replaceAll("\"", "").split(",[ ]*");
                return new IsIncludedIn(elements);
            }
        }
        throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    }

    public void onTrigger(ProcessContext context, final ProcessSession session) {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final CsvPreference csvPref = this.preference.get();
        final boolean header = context.getProperty(HEADER).asBoolean();
        final ComponentLog logger = this.getLogger();
        final CellProcessor[] cellProcs = this.processors.get();
        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
        final AtomicReference<Boolean> isFirstLineValid = new AtomicReference<Boolean>(true);
        final AtomicReference<Boolean> isFirstLineInvalid = new AtomicReference<Boolean>(true);
        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
        final AtomicReference<Object> invalidFF = new AtomicReference<Object>(null);
        final AtomicReference<Object> validFF = new AtomicReference<Object>(null);
        if (!isWholeFFValidation) {
            invalidFF.set(session.create(flowFile));
            validFF.set(session.create(flowFile));
        }
        session.read(flowFile, new InputStreamCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void process(InputStream in) throws IOException {
                try (NifiCsvListReader listReader = null;){
                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
                    if (header) {
                        final List headerList = listReader.read();
                        if (!isWholeFFValidation) {
                            invalidFF.set(session.append((FlowFile)invalidFF.get(), new OutputStreamCallback(){

                                public void process(OutputStream out) throws IOException {
                                    out.write(ValidateCsv.this.print(headerList, csvPref, (Boolean)isFirstLineInvalid.get()));
                                }
                            }));
                            validFF.set(session.append((FlowFile)validFF.get(), new OutputStreamCallback(){

                                public void process(OutputStream out) throws IOException {
                                    out.write(ValidateCsv.this.print(headerList, csvPref, (Boolean)isFirstLineValid.get()));
                                }
                            }));
                            isFirstLineValid.set(false);
                            isFirstLineInvalid.set(false);
                        }
                    }
                    boolean stop = false;
                    while (!stop) {
                        try {
                            final List<Object> list = listReader.read(cellProcs);
                            boolean bl = stop = list == null;
                            if (isWholeFFValidation || stop) continue;
                            validFF.set(session.append((FlowFile)validFF.get(), new OutputStreamCallback(){

                                public void process(OutputStream out) throws IOException {
                                    out.write(ValidateCsv.this.print(list, csvPref, (Boolean)isFirstLineValid.get()));
                                }
                            }));
                            okCount.set((Integer)okCount.get() + 1);
                            if (!((Boolean)isFirstLineValid.get()).booleanValue()) continue;
                            isFirstLineValid.set(false);
                        }
                        catch (SuperCsvException e) {
                            valid.set(false);
                            if (isWholeFFValidation) {
                                logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, (Throwable)e);
                                break;
                            }
                            invalidFF.set(session.append((FlowFile)invalidFF.get(), new OutputStreamCallback(){

                                public void process(OutputStream out) throws IOException {
                                    out.write(ValidateCsv.this.print(e.getCsvContext().getRowSource(), csvPref, (Boolean)isFirstLineInvalid.get()));
                                }
                            }));
                            if (!((Boolean)isFirstLineInvalid.get()).booleanValue()) continue;
                            isFirstLineInvalid.set(false);
                        }
                        finally {
                            if (isWholeFFValidation) continue;
                            totalCount.set((Integer)totalCount.get() + 1);
                        }
                    }
                }
            }
        });
        if (isWholeFFValidation) {
            if (valid.get().booleanValue()) {
                logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
                session.getProvenanceReporter().route(flowFile, REL_VALID);
                session.transfer(flowFile, REL_VALID);
            } else {
                session.getProvenanceReporter().route(flowFile, REL_INVALID);
                session.transfer(flowFile, REL_INVALID);
            }
        } else if (valid.get().booleanValue()) {
            logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
            session.getProvenanceReporter().route((FlowFile)validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
            session.putAttribute((FlowFile)validFF.get(), "count.valid.lines", Integer.toString(totalCount.get()));
            session.putAttribute((FlowFile)validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
            session.transfer((FlowFile)validFF.get(), REL_VALID);
            session.remove((FlowFile)invalidFF.get());
            session.remove(flowFile);
        } else if (okCount.get() != 0) {
            totalCount.set(totalCount.get() - 1);
            logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", new Object[]{okCount.get(), totalCount.get(), flowFile});
            session.getProvenanceReporter().route((FlowFile)validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
            session.putAttribute((FlowFile)validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
            session.putAttribute((FlowFile)validFF.get(), "count.valid.lines", Integer.toString(okCount.get()));
            session.transfer((FlowFile)validFF.get(), REL_VALID);
            session.getProvenanceReporter().route((FlowFile)invalidFF.get(), REL_INVALID, totalCount.get() - okCount.get() + " invalid line(s)");
            session.putAttribute((FlowFile)invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get() - okCount.get()));
            session.putAttribute((FlowFile)invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
            session.transfer((FlowFile)invalidFF.get(), REL_INVALID);
            session.remove(flowFile);
        } else {
            logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
            session.getProvenanceReporter().route((FlowFile)invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
            session.putAttribute((FlowFile)invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
            session.putAttribute((FlowFile)invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
            session.transfer((FlowFile)invalidFF.get(), REL_INVALID);
            session.remove((FlowFile)validFF.get());
            session.remove(flowFile);
        }
    }

    private byte[] print(List<?> list, CsvPreference csvPref, boolean isFirstLine) {
        StringBuffer buffer = new StringBuffer();
        if (!isFirstLine) {
            buffer.append(csvPref.getEndOfLineSymbols());
        }
        int size = list.size();
        int i = 0;
        for (Object item : list) {
            if (item != null) {
                buffer.append(item.toString());
            }
            if (i < size - 1) {
                buffer.append((char)csvPref.getDelimiterChar());
            }
            ++i;
        }
        return buffer.toString().getBytes();
    }

    private class NifiCsvListReader
    extends CsvListReader {
        public NifiCsvListReader(Reader reader, CsvPreference preferences) {
            super(reader, preferences);
        }

        public List<Object> read(CellProcessor ... processors) throws IOException {
            if (processors == null) {
                throw new NullPointerException("Processors should not be null");
            }
            if (this.readRow()) {
                super.executeProcessors(new ArrayList(this.getColumns().size()), processors);
                return new ArrayList<Object>(this.getColumns());
            }
            return null;
        }
    }
}

