/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.ConvertJSONToSQL;
import org.apache.nifi.stream.io.StreamUtils;

@SupportsBatching
@SeeAlso(value={ConvertJSONToSQL.class})
@Tags(value={"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription(value="Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes(value={@ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or not two FlowFiles belong to the same transaction."), @ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles are needed to complete the transaction."), @ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles in a transaction should be evaluated."), @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")})
@WritesAttributes(value={@WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")})
public class PutSQL
extends AbstractProcessor {
    static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder().name("JDBC Connection Pool").description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. The Connection Pool is necessary in order to determine the appropriate database column types.").identifiesControllerService(DBCPService.class).required(true).build();
    static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder().name("Support Fragmented Transactions").description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.").allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder().name("Transaction Timeout").description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The preferred number of FlowFiles to put to the database in a single transaction").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").build();
    static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder().name("Obtain Generated Keys").description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. This may result in slightly slower performance and is not supported by all databases.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after the database is successfully updated").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation").build();
    private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
    private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
    private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
    private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
    private static final String FRAGMENT_COUNT_ATTR = "fragment.count";

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(CONNECTION_POOL);
        properties.add(SUPPORT_TRANSACTIONS);
        properties.add(TRANSACTION_TIMEOUT);
        properties.add(BATCH_SIZE);
        properties.add(OBTAIN_GENERATED_KEYS);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_RETRY);
        rels.add(REL_FAILURE);
        return rels;
    }

    /*
     * Exception decompiling
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
         *     at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:100)
         *     at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
         *     at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
         *     at java.base/java.util.Objects.checkIndex(Objects.java:385)
         *     at java.base/java.util.ArrayList.get(ArrayList.java:427)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.TryRewriter.extendTryBlock(TryRewriter.java:34)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.TryRewriter.extendTryBlocks(TryRewriter.java:147)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.Op03Rewriters.extendTryBlocks(Op03Rewriters.java:48)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:557)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private FlowFilePoll pollFlowFiles(ProcessContext context, ProcessSession session) {
        List flowFiles;
        boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
        boolean fragmentedTransaction = false;
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        if (useTransactions) {
            TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
            flowFiles = session.get((FlowFileFilter)filter);
            fragmentedTransaction = filter.isFragmentedTransaction();
        } else {
            flowFiles = session.get(batchSize);
        }
        if (flowFiles.isEmpty()) {
            return null;
        }
        if (fragmentedTransaction) {
            Relationship relationship = this.determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
            if (relationship != null) {
                if (relationship == Relationship.SELF) {
                    ListIterator<FlowFile> itr = flowFiles.listIterator();
                    while (itr.hasNext()) {
                        FlowFile flowFile = (FlowFile)itr.next();
                        FlowFile penalized = session.penalize(flowFile);
                        itr.remove();
                        itr.add(penalized);
                    }
                }
                session.transfer((Collection)flowFiles, relationship);
                return null;
            }
            Collections.sort(flowFiles, new Comparator<FlowFile>(){

                @Override
                public int compare(FlowFile o1, FlowFile o2) {
                    return Integer.compare(Integer.parseInt(o1.getAttribute(PutSQL.FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(PutSQL.FRAGMENT_INDEX_ATTR)));
                }
            });
        }
        return new FlowFilePoll(flowFiles, fragmentedTransaction);
    }

    private String determineGeneratedKey(PreparedStatement stmt) {
        try {
            ResultSet generatedKeys = stmt.getGeneratedKeys();
            if (generatedKeys != null && generatedKeys.next()) {
                return generatedKeys.getString(1);
            }
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
        return null;
    }

    private StatementFlowFileEnclosure getEnclosure(String sql, Connection conn, Map<String, StatementFlowFileEnclosure> stmtMap, boolean obtainKeys, boolean fragmentedTransaction) throws SQLException {
        StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
        if (enclosure != null) {
            return enclosure;
        }
        if (obtainKeys) {
            PreparedStatement stmt = conn.prepareStatement(sql, 1);
            if (stmt == null) {
                stmt = conn.prepareStatement(sql);
            }
            return new StatementFlowFileEnclosure(stmt);
        }
        if (fragmentedTransaction) {
            PreparedStatement stmt = conn.prepareStatement(sql);
            return new StatementFlowFileEnclosure(stmt);
        }
        PreparedStatement stmt = conn.prepareStatement(sql);
        enclosure = new StatementFlowFileEnclosure(stmt);
        stmtMap.put(sql, enclosure);
        return enclosure;
    }

    private String getSQL(ProcessSession session, FlowFile flowFile) {
        final byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.fillBuffer((InputStream)in, (byte[])buffer);
            }
        });
        String sql = new String(buffer, StandardCharsets.UTF_8);
        return sql;
    }

    private void setParameters(PreparedStatement stmt, Map<String, String> attributes) throws SQLException {
        for (Map.Entry<String, String> entry : attributes.entrySet()) {
            String key = entry.getKey();
            Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
            if (!matcher.matches()) continue;
            int parameterIndex = Integer.parseInt(matcher.group(1));
            boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
            if (!isNumeric) {
                throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
            }
            int jdbcType = Integer.parseInt(entry.getValue());
            String valueAttrName = "sql.args." + parameterIndex + ".value";
            String parameterValue = attributes.get(valueAttrName);
            try {
                this.setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
            }
            catch (NumberFormatException nfe) {
                throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", (Throwable)nfe);
            }
        }
    }

    Relationship determineRelationship(List<FlowFile> flowFiles, Long transactionTimeoutMillis) {
        int selectedNumFragments = 0;
        BitSet bitSet = new BitSet();
        for (FlowFile flowFile : flowFiles) {
            int idx;
            int numFragments;
            String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
            if (fragmentCount == null && flowFiles.size() == 1) {
                return null;
            }
            if (fragmentCount == null) {
                this.getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[]{flowFile, flowFiles.size()});
                return REL_FAILURE;
            }
            try {
                numFragments = Integer.parseInt(fragmentCount);
            }
            catch (NumberFormatException nfe) {
                this.getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile, fragmentCount});
                return REL_FAILURE;
            }
            if (numFragments < 1) {
                this.getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile, fragmentCount});
                return REL_FAILURE;
            }
            if (selectedNumFragments == 0) {
                selectedNumFragments = numFragments;
            } else if (numFragments != selectedNumFragments) {
                this.getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile});
                return REL_FAILURE;
            }
            String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
            if (fragmentIndex == null) {
                this.getLogger().error("Cannot process {} because the fragment.index attribute is missing; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile});
                return REL_FAILURE;
            }
            try {
                idx = Integer.parseInt(fragmentIndex);
            }
            catch (NumberFormatException nfe) {
                this.getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile, fragmentIndex});
                return REL_FAILURE;
            }
            if (idx < 0) {
                this.getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile, fragmentIndex});
                return REL_FAILURE;
            }
            if (bitSet.get(idx)) {
                this.getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; routing all FlowFiles with this fragment.identifier to failure", new Object[]{flowFile});
                return REL_FAILURE;
            }
            bitSet.set(idx);
        }
        if (selectedNumFragments == flowFiles.size()) {
            return null;
        }
        long latestQueueTime = 0L;
        for (FlowFile flowFile : flowFiles) {
            if (flowFile.getLastQueueDate() == null || flowFile.getLastQueueDate() <= latestQueueTime) continue;
            latestQueueTime = flowFile.getLastQueueDate();
        }
        if (transactionTimeoutMillis != null && latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) {
            this.getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[]{flowFiles});
            return REL_FAILURE;
        }
        this.getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
        return Relationship.SELF;
    }

    private void setParameter(PreparedStatement stmt, String attrName, int parameterIndex, String parameterValue, int jdbcType) throws SQLException {
        if (parameterValue == null) {
            stmt.setNull(parameterIndex, jdbcType);
        } else {
            switch (jdbcType) {
                case -7: {
                    stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
                    break;
                }
                case -6: {
                    stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
                    break;
                }
                case 5: {
                    stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
                    break;
                }
                case 4: {
                    stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
                    break;
                }
                case -5: {
                    stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
                    break;
                }
                case 7: {
                    stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
                    break;
                }
                case 6: 
                case 8: {
                    stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
                    break;
                }
                case 91: {
                    stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
                    break;
                }
                case 92: {
                    stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
                    break;
                }
                case 93: {
                    stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
                    break;
                }
                case -16: 
                case -1: 
                case 1: 
                case 12: {
                    stmt.setString(parameterIndex, parameterValue);
                    break;
                }
                default: {
                    throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue + "' and a type of '" + jdbcType + "' but this is not a known data type");
                }
            }
        }
    }

    private static class StatementFlowFileEnclosure {
        private final PreparedStatement statement;
        private final List<FlowFile> flowFiles = new ArrayList<FlowFile>();

        public StatementFlowFileEnclosure(PreparedStatement statement) {
            this.statement = statement;
        }

        public PreparedStatement getStatement() {
            return this.statement;
        }

        public List<FlowFile> getFlowFiles() {
            return this.flowFiles;
        }

        public void addFlowFile(FlowFile flowFile) {
            this.flowFiles.add(flowFile);
        }

        public int hashCode() {
            return this.statement.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (obj == this) {
                return false;
            }
            if (!(obj instanceof StatementFlowFileEnclosure)) {
                return false;
            }
            StatementFlowFileEnclosure other = (StatementFlowFileEnclosure)obj;
            return this.statement.equals(other.getStatement());
        }
    }

    private static class FlowFilePoll {
        private final List<FlowFile> flowFiles;
        private final boolean fragmentedTransaction;

        public FlowFilePoll(List<FlowFile> flowFiles, boolean fragmentedTransaction) {
            this.flowFiles = flowFiles;
            this.fragmentedTransaction = fragmentedTransaction;
        }

        public List<FlowFile> getFlowFiles() {
            return this.flowFiles;
        }

        public boolean isFragmentedTransaction() {
            return this.fragmentedTransaction;
        }
    }

    static class TransactionalFlowFileFilter
    implements FlowFileFilter {
        private String selectedId = null;
        private int numSelected = 0;
        private boolean ignoreFragmentIdentifiers = false;

        TransactionalFlowFileFilter() {
        }

        public boolean isFragmentedTransaction() {
            return !this.ignoreFragmentIdentifiers;
        }

        public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
            String fragmentId = flowFile.getAttribute(PutSQL.FRAGMENT_ID_ATTR);
            String fragCount = flowFile.getAttribute(PutSQL.FRAGMENT_COUNT_ATTR);
            if (this.ignoreFragmentIdentifiers) {
                if (fragmentId == null || "1".equals(fragCount)) {
                    return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                }
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            if (fragmentId == null || "1".equals(fragCount)) {
                if (this.selectedId == null) {
                    this.ignoreFragmentIdentifiers = true;
                    return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                }
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            if (this.selectedId == null) {
                this.selectedId = fragmentId;
                ++this.numSelected;
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            if (this.selectedId.equals(fragmentId)) {
                int numFragments = NUMBER_PATTERN.matcher(fragCount).matches() ? Integer.parseInt(fragCount) : Integer.MAX_VALUE;
                if (this.numSelected >= numFragments - 1) {
                    return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                }
                ++this.numSelected;
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        }
    }
}

