/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.processor.util.pattern.Put;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;

@EventDriven
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"sql", "record", "jdbc", "put", "database", "update", "insert", "delete"})
@CapabilityDescription(value="The PutDatabaseRecord processor uses a specified RecordReader to input (possibly multiple) records from an incoming flow file. These records are translated to SQL statements and executed as a single batch. If any errors occur, the flow file is routed to failure or retry, and if the records are transmitted successfully, the incoming flow file is routed to success.  The type of statement executed by the processor is specified via the Statement Type property, which accepts some hard-coded values such as INSERT, UPDATE, and DELETE, as well as 'Use statement.type Attribute', which causes the processor to get the statement type from a flow file attribute.  IMPORTANT: If the Statement Type is UPDATE, then the incoming records must not alter the value(s) of the primary keys (or user-specified Update Keys). If such records are encountered, the UPDATE statement issued to the database may do nothing (if no existing records with the new primary key values are found), or could inadvertently corrupt the existing data (by changing records for which the new values of the primary keys exist).")
@ReadsAttribute(attribute="statement.type", description="If 'Use statement.type Attribute' is selected for the Statement Type property, the value of this attribute will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.")
@WritesAttribute(attribute="putdatabaserecord.error", description="If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute will be populated with the cause of the error.")
public class PutDatabaseRecord
extends AbstractSessionFactoryProcessor {
    static final String UPDATE_TYPE = "UPDATE";
    static final String INSERT_TYPE = "INSERT";
    static final String DELETE_TYPE = "DELETE";
    static final String SQL_TYPE = "SQL";
    static final String USE_ATTR_TYPE = "Use statement.type Attribute";
    static final String STATEMENT_TYPE_ATTRIBUTE = "statement.type";
    static final String PUT_DATABASE_RECORD_ERROR = "putdatabaserecord.error";
    static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", "Any field in the document that cannot be mapped to a column in the database is ignored");
    static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail on Unmatched Fields", "Fail on Unmatched Fields", "If the document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns", "Any column in the database that does not have a field in the document will be assumed to not be required.  No notification will be logged");
    static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns", "Warn on Unmatched Columns", "Any column in the database that does not have a field in the document will be assumed to not be required.  A warning will be logged");
    static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns", "Fail on Unmatched Columns", "A flow will fail if any column in the database that does not have a field in the document.  An error will be logged");
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successfully created FlowFile from SQL query result set.").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation").build();
    protected static Set<Relationship> relationships;
    static final PropertyDescriptor RECORD_READER_FACTORY;
    static final PropertyDescriptor STATEMENT_TYPE;
    static final PropertyDescriptor DBCP_SERVICE;
    static final PropertyDescriptor CATALOG_NAME;
    static final PropertyDescriptor SCHEMA_NAME;
    static final PropertyDescriptor TABLE_NAME;
    static final PropertyDescriptor TRANSLATE_FIELD_NAMES;
    static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR;
    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR;
    static final PropertyDescriptor UPDATE_KEYS;
    static final PropertyDescriptor FIELD_CONTAINING_SQL;
    static final PropertyDescriptor QUOTED_IDENTIFIERS;
    static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER;
    static final PropertyDescriptor QUERY_TIMEOUT;
    protected static List<PropertyDescriptor> propDescriptors;
    private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100){
        private static final long serialVersionUID = 1L;

        @Override
        protected boolean removeEldestEntry(Map.Entry<SchemaKey, TableSchema> eldest) {
            return this.size() >= 100;
        }
    };
    private Put<FunctionContext, Connection> process;
    private ExceptionHandler<FunctionContext> exceptionHandler;
    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
        Connection connection = ((DBCPService)c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)).getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
        try {
            ((FunctionContext)fc).originalAutoCommit = connection.getAutoCommit();
            connection.setAutoCommit(false);
            String jdbcUrl = "DBCPService";
            try {
                DatabaseMetaData databaseMetaData = connection.getMetaData();
                if (databaseMetaData != null) {
                    jdbcUrl = databaseMetaData.getURL();
                }
            }
            catch (SQLException sQLException) {
            }
            finally {
                ((FunctionContext)fc).jdbcUrl = jdbcUrl;
            }
        }
        catch (SQLException e) {
            throw new ProcessException("Failed to disable auto commit due to " + e, (Throwable)e);
        }
        return connection;
    };
    private final Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, functionContext, conn, flowFile, result) -> this.exceptionHandler.execute((Object)functionContext, (Object)flowFile, inputFlowFile -> {
        String statementTypeProperty;
        String statementType = statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
        if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
            statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
        }
        if (StringUtils.isEmpty((CharSequence)statementType)) {
            String msg = String.format("Statement Type is not specified, FlowFile %s", inputFlowFile);
            throw new IllegalArgumentException(msg);
        }
        try (InputStream in = session.read(inputFlowFile);){
            RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
            RecordReader recordParser = recordParserFactory.createRecordReader(inputFlowFile, in, this.getLogger());
            if (SQL_TYPE.equalsIgnoreCase(statementType)) {
                this.executeSQL(context, session, (FlowFile)inputFlowFile, (FunctionContext)((Object)functionContext), result, (Connection)conn, recordParser);
            } else {
                DMLSettings settings = new DMLSettings(context);
                this.executeDML(context, session, (FlowFile)inputFlowFile, (FunctionContext)((Object)functionContext), result, (Connection)conn, recordParser, statementType, settings);
            }
        }
    }, (fc, inputFlowFile, r, e) -> {
        this.getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, (Throwable)e);
        if (e instanceof BatchUpdateException) {
            try {
                conn.rollback();
            }
            catch (SQLException re) {
                this.getLogger().error("Failed to rollback database due to {}, transaction may be incomplete.", new Object[]{re}, (Throwable)re);
            }
        }
        FlowFile flowFileWithAttributes = session.putAttribute(inputFlowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
        ExceptionHandler.OnError defaultOnError = ExceptionHandler.createOnError((ProcessContext)context, (ProcessSession)session, (RoutingResult)result, (Relationship)REL_FAILURE, (Relationship)REL_RETRY);
        ExceptionHandler.OnError rollbackOnFailure = RollbackOnFailure.createOnError((ExceptionHandler.OnError)defaultOnError);
        rollbackOnFailure.apply((Object)fc, (Object)flowFileWithAttributes, r, e);
    });

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propDescriptors;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        PutDatabaseRecord putDatabaseRecord = this;
        synchronized (putDatabaseRecord) {
            this.schemaCache.clear();
        }
        this.process = new Put();
        this.process.setLogger(this.getLogger());
        this.process.initConnection(this.initConnection);
        this.process.putFlowFile(this.putFlowFile);
        this.process.adjustRoute(RollbackOnFailure.createAdjustRoute((Relationship[])new Relationship[]{REL_FAILURE, REL_RETRY}));
        this.process.onCompleted((c, s, fc, conn) -> {
            try {
                conn.commit();
            }
            catch (SQLException e) {
                throw new ProcessException("Failed to commit database connection due to " + e, (Throwable)e);
            }
        });
        this.process.onFailed((c, s, fc, conn, e) -> {
            try {
                conn.rollback();
            }
            catch (SQLException re) {
                this.getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, (Throwable)re);
            }
        });
        this.process.cleanup((c, s, fc, conn) -> {
            if (((FunctionContext)fc).originalAutoCommit) {
                try {
                    conn.setAutoCommit(true);
                }
                catch (SQLException se) {
                    this.getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
                }
            }
        });
        this.exceptionHandler = new ExceptionHandler();
        this.exceptionHandler.mapException(s -> {
            try {
                if (s == null) {
                    return ErrorTypes.PersistentFailure;
                }
                throw s;
            }
            catch (IllegalArgumentException | SQLNonTransientException | MalformedRecordException e) {
                return ErrorTypes.InvalidInput;
            }
            catch (IOException | SQLException e) {
                return ErrorTypes.TemporalFailure;
            }
            catch (Exception e) {
                return ErrorTypes.UnknownFailure;
            }
        });
        this.exceptionHandler.adjustError(RollbackOnFailure.createAdjustError((ComponentLog)this.getLogger()));
    }

    private void executeSQL(ProcessContext context, ProcessSession session, FlowFile flowFile, FunctionContext functionContext, RoutingResult result, Connection con, RecordReader recordParser) throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
        RecordSchema recordSchema = recordParser.getSchema();
        String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isEmpty((CharSequence)sqlField)) {
            throw new IllegalArgumentException(String.format("SQL specified as Statement Type but no Field Containing SQL was found, FlowFile %s", flowFile));
        }
        boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch(field -> sqlField.equals(field.getFieldName()));
        if (!schemaHasSqlField) {
            throw new IllegalArgumentException(String.format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
        }
        try (Statement s = con.createStatement();){
            Record currentRecord;
            block18: {
                try {
                    s.setQueryTimeout(functionContext.queryTimeout);
                }
                catch (SQLException se) {
                    if (functionContext.queryTimeout <= 0) break block18;
                    throw se;
                }
            }
            while ((currentRecord = recordParser.nextRecord()) != null) {
                Object sql = currentRecord.getValue(sqlField);
                if (sql == null || StringUtils.isEmpty((CharSequence)((String)sql))) {
                    throw new MalformedRecordException(String.format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
                }
                s.execute((String)sql);
            }
            result.routeTo(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeDML(ProcessContext context, ProcessSession session, FlowFile flowFile, FunctionContext functionContext, RoutingResult result, Connection con, RecordReader recordParser, String statementType, DMLSettings settings) throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
        SqlAndIncludedColumns sqlHolder;
        TableSchema tableSchema;
        RecordSchema recordSchema = recordParser.getSchema();
        ComponentLog log = this.getLogger();
        String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
        SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
        if (StringUtils.isEmpty((CharSequence)tableName)) {
            throw new IllegalArgumentException(String.format("Cannot process %s because Table Name is null or empty", flowFile));
        }
        boolean includePrimaryKeys = updateKeys == null;
        PutDatabaseRecord putDatabaseRecord = this;
        synchronized (putDatabaseRecord) {
            tableSchema = this.schemaCache.get(schemaKey);
            if (tableSchema == null) {
                tableSchema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
                this.schemaCache.put(schemaKey, tableSchema);
            }
        }
        if (tableSchema == null) {
            throw new IllegalArgumentException("No table schema specified!");
        }
        StringBuilder tableNameBuilder = new StringBuilder();
        if (catalog != null) {
            tableNameBuilder.append(catalog).append(".");
        }
        if (schemaName != null) {
            tableNameBuilder.append(schemaName).append(".");
        }
        tableNameBuilder.append(tableName);
        String fqTableName = tableNameBuilder.toString();
        if (recordSchema == null) {
            throw new IllegalArgumentException("No record schema specified!");
        }
        if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
            sqlHolder = this.generateInsert(recordSchema, fqTableName, tableSchema, settings);
        } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
            sqlHolder = this.generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
        } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
            sqlHolder = this.generateDelete(recordSchema, fqTableName, tableSchema, settings);
        } else {
            throw new IllegalArgumentException(String.format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
        }
        try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql());){
            Record currentRecord;
            block36: {
                int queryTimeout = functionContext.queryTimeout;
                try {
                    ps.setQueryTimeout(queryTimeout);
                }
                catch (SQLException se) {
                    if (queryTimeout <= 0) break block36;
                    throw se;
                }
            }
            List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
            while ((currentRecord = recordParser.nextRecord()) != null) {
                int i;
                Object[] values = currentRecord.getValues();
                if (values == null) continue;
                if (fieldIndexes != null) {
                    for (i = 0; i < fieldIndexes.size(); ++i) {
                        if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
                            ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]);
                            ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]);
                            continue;
                        }
                        ps.setObject(i + 1, values[fieldIndexes.get(i)]);
                    }
                } else {
                    for (i = 0; i < values.length; ++i) {
                        if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
                            ps.setObject(i * 2 + 1, values[i]);
                            ps.setObject(i * 2 + 2, values[i]);
                            continue;
                        }
                        ps.setObject(i + 1, values[i]);
                    }
                }
                ps.addBatch();
            }
            log.debug("Executing query {}", new Object[]{sqlHolder});
            ps.executeBatch();
            result.routeTo(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
        }
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
        Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
        FunctionContext functionContext = new FunctionContext(rollbackOnFailure, queryTimeout);
        RollbackOnFailure.onTrigger((ProcessContext)context, (ProcessSessionFactory)sessionFactory, (RollbackOnFailure)functionContext, (ComponentLog)this.getLogger(), session -> this.process.onTrigger(context, session, (Object)functionContext));
    }

    private Set<String> getNormalizedColumnNames(RecordSchema schema, boolean translateFieldNames) {
        HashSet<String> normalizedFieldNames = new HashSet<String>();
        if (schema != null) {
            schema.getFieldNames().forEach(fieldName -> normalizedFieldNames.add(PutDatabaseRecord.normalizeColumnName(fieldName, translateFieldNames)));
        }
        return normalizedFieldNames;
    }

    SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
        Set<String> normalizedFieldNames = this.getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
        for (String requiredColName : tableSchema.getRequiredColumnNames()) {
            String normalizedColName = PutDatabaseRecord.normalizeColumnName(requiredColName, settings.translateFieldNames);
            if (normalizedFieldNames.contains(normalizedColName)) continue;
            String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
            if (settings.failUnmappedColumns) {
                this.getLogger().error(missingColMessage);
                throw new IllegalArgumentException(missingColMessage);
            }
            if (!settings.warningUnmappedColumns) continue;
            this.getLogger().warn(missingColMessage);
        }
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("INSERT INTO ");
        if (settings.quoteTableName) {
            sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(tableName).append(tableSchema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }
        sqlBuilder.append(" (");
        List fieldNames = recordSchema.getFieldNames();
        ArrayList<Integer> includedColumns = new ArrayList<Integer>();
        if (fieldNames != null) {
            int fieldCount = fieldNames.size();
            AtomicInteger fieldsFound = new AtomicInteger(0);
            for (int i = 0; i < fieldCount; ++i) {
                RecordField field = recordSchema.getField(i);
                String fieldName = field.getFieldName();
                ColumnDescription desc = tableSchema.getColumns().get(PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames));
                if (desc == null && !settings.ignoreUnmappedFields) {
                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                }
                if (desc == null) continue;
                if (fieldsFound.getAndIncrement() > 0) {
                    sqlBuilder.append(", ");
                }
                if (settings.escapeColumnNames) {
                    sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(desc.getColumnName()).append(tableSchema.getQuotedIdentifierString());
                } else {
                    sqlBuilder.append(desc.getColumnName());
                }
                includedColumns.add(i);
            }
            sqlBuilder.append(") VALUES (");
            sqlBuilder.append(StringUtils.repeat((String)"?", (String)",", (int)includedColumns.size()));
            sqlBuilder.append(")");
            if (fieldsFound.get() == 0) {
                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
            }
        }
        return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
    }

    SqlAndIncludedColumns generateUpdate(RecordSchema recordSchema, String tableName, String updateKeys, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, MalformedRecordException, SQLException {
        Set<String> updateKeyNames;
        if (updateKeys == null) {
            updateKeyNames = tableSchema.getPrimaryKeyColumnNames();
        } else {
            updateKeyNames = new HashSet<String>();
            for (String updateKey : updateKeys.split(",")) {
                updateKeyNames.add(updateKey.trim());
            }
        }
        if (updateKeyNames.isEmpty()) {
            throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
        }
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("UPDATE ");
        if (settings.quoteTableName) {
            sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(tableName).append(tableSchema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }
        Set<String> normalizedFieldNames = this.getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
        HashSet<String> normalizedUpdateNames = new HashSet<String>();
        for (String uk : updateKeyNames) {
            String normalizedUK = PutDatabaseRecord.normalizeColumnName(uk, settings.translateFieldNames);
            normalizedUpdateNames.add(normalizedUK);
            if (normalizedFieldNames.contains(normalizedUK)) continue;
            String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
            if (settings.failUnmappedColumns) {
                this.getLogger().error(missingColMessage);
                throw new MalformedRecordException(missingColMessage);
            }
            if (!settings.warningUnmappedColumns) continue;
            this.getLogger().warn(missingColMessage);
        }
        List fieldNames = recordSchema.getFieldNames();
        ArrayList<Integer> includedColumns = new ArrayList<Integer>();
        if (fieldNames != null) {
            sqlBuilder.append(" SET ");
            int fieldCount = fieldNames.size();
            AtomicInteger fieldsFound = new AtomicInteger(0);
            for (int i = 0; i < fieldCount; ++i) {
                RecordField field = recordSchema.getField(i);
                String fieldName = field.getFieldName();
                String normalizedColName = PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames);
                ColumnDescription desc = tableSchema.getColumns().get(PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames));
                if (desc == null) {
                    if (settings.ignoreUnmappedFields) continue;
                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                }
                if (normalizedUpdateNames.contains(normalizedColName)) continue;
                if (fieldsFound.getAndIncrement() > 0) {
                    sqlBuilder.append(", ");
                }
                if (settings.escapeColumnNames) {
                    sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(desc.getColumnName()).append(tableSchema.getQuotedIdentifierString());
                } else {
                    sqlBuilder.append(desc.getColumnName());
                }
                sqlBuilder.append(" = ?");
                includedColumns.add(i);
            }
            sqlBuilder.append(" WHERE ");
            AtomicInteger whereFieldCount = new AtomicInteger(0);
            for (int i = 0; i < fieldCount; ++i) {
                RecordField field = recordSchema.getField(i);
                String fieldName = field.getFieldName();
                String normalizedColName = PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames);
                ColumnDescription desc = tableSchema.getColumns().get(PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames));
                if (desc == null || !normalizedUpdateNames.contains(normalizedColName)) continue;
                if (whereFieldCount.getAndIncrement() > 0) {
                    sqlBuilder.append(" AND ");
                }
                if (settings.escapeColumnNames) {
                    sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(normalizedColName).append(tableSchema.getQuotedIdentifierString());
                } else {
                    sqlBuilder.append(normalizedColName);
                }
                sqlBuilder.append(" = ?");
                includedColumns.add(i);
            }
        }
        return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
    }

    SqlAndIncludedColumns generateDelete(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, MalformedRecordException, SQLDataException {
        Set<String> normalizedFieldNames = this.getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
        for (String requiredColName : tableSchema.getRequiredColumnNames()) {
            String normalizedColName = PutDatabaseRecord.normalizeColumnName(requiredColName, settings.translateFieldNames);
            if (normalizedFieldNames.contains(normalizedColName)) continue;
            String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
            if (settings.failUnmappedColumns) {
                this.getLogger().error(missingColMessage);
                throw new MalformedRecordException(missingColMessage);
            }
            if (!settings.warningUnmappedColumns) continue;
            this.getLogger().warn(missingColMessage);
        }
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("DELETE FROM ");
        if (settings.quoteTableName) {
            sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(tableName).append(tableSchema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }
        List fieldNames = recordSchema.getFieldNames();
        ArrayList<Integer> includedColumns = new ArrayList<Integer>();
        if (fieldNames != null) {
            sqlBuilder.append(" WHERE ");
            int fieldCount = fieldNames.size();
            AtomicInteger fieldsFound = new AtomicInteger(0);
            for (int i = 0; i < fieldCount; ++i) {
                RecordField field = recordSchema.getField(i);
                String fieldName = field.getFieldName();
                ColumnDescription desc = tableSchema.getColumns().get(PutDatabaseRecord.normalizeColumnName(fieldName, settings.translateFieldNames));
                if (desc == null && !settings.ignoreUnmappedFields) {
                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                }
                if (desc == null) continue;
                if (fieldsFound.getAndIncrement() > 0) {
                    sqlBuilder.append(" AND ");
                }
                String columnName = settings.escapeColumnNames ? tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString() : desc.getColumnName();
                sqlBuilder.append("(");
                sqlBuilder.append(columnName);
                sqlBuilder.append(" = ? OR (");
                sqlBuilder.append(columnName);
                sqlBuilder.append(" is null AND ? is null))");
                includedColumns.add(i);
            }
            if (fieldsFound.get() == 0) {
                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
            }
        }
        return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
    }

    private static String normalizeColumnName(String colName, boolean translateColumnNames) {
        return colName == null ? null : (translateColumnNames ? colName.toUpperCase().replace("_", "") : colName);
    }

    static {
        RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("put-db-record-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
        STATEMENT_TYPE = new PropertyDescriptor.Builder().name("put-db-record-statement-type").displayName("Statement Type").description("Specifies the type of SQL Statement to generate. If 'Use statement.type Attribute' is chosen, then the value is taken from the statement.type attribute in the FlowFile. The 'Use statement.type Attribute' option is the only one that allows the 'SQL' statement type. If 'SQL' is specified, the value of the field specified by the 'Field Containing SQL' property is expected to be a valid SQL statement on the target database, and will be executed as-is.").required(true).allowableValues(new String[]{UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE, USE_ATTR_TYPE}).build();
        DBCP_SERVICE = new PropertyDescriptor.Builder().name("put-db-record-dcbp-service").displayName("Database Connection Pooling Service").description("The Controller Service that is used to obtain a connection to the database for sending records.").required(true).identifiesControllerService(DBCPService.class).build();
        CATALOG_NAME = new PropertyDescriptor.Builder().name("put-db-record-catalog-name").displayName("Catalog Name").description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        SCHEMA_NAME = new PropertyDescriptor.Builder().name("put-db-record-schema-name").displayName("Schema Name").description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TABLE_NAME = new PropertyDescriptor.Builder().name("put-db-record-table-name").displayName("Table Name").description("The name of the table that the statement should affect.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder().name("put-db-record-translate-field-names").displayName("Translate Field Names").description("If true, the Processor will attempt to translate field names into the appropriate column names for the table specified. If false, the field names must match the column names exactly, or the column will not be updated").allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder().name("put-db-record-unmatched-field-behavior").displayName("Unmatched Field Behavior").description("If an incoming record has a field that does not map to any of the database table's columns, this property specifies how to handle the situation").allowableValues(new AllowableValue[]{IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD}).defaultValue(IGNORE_UNMATCHED_FIELD.getValue()).build();
        UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder().name("put-db-record-unmatched-column-behavior").displayName("Unmatched Column Behavior").description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation").allowableValues(new AllowableValue[]{IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN, FAIL_UNMATCHED_COLUMN}).defaultValue(FAIL_UNMATCHED_COLUMN.getValue()).build();
        UPDATE_KEYS = new PropertyDescriptor.Builder().name("put-db-record-update-keys").displayName("Update Keys").description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. In this case, if no Primary Key exists, the conversion to SQL will fail if Unmatched Column Behaviour is set to FAIL. This property is ignored if the Statement Type is INSERT").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
        FIELD_CONTAINING_SQL = new PropertyDescriptor.Builder().name("put-db-record-field-containing-sql").displayName("Field Containing SQL").description("If the Statement Type is 'SQL' (as set in the statement.type attribute), this field indicates which field in the record(s) contains the SQL statement to execute. The value of the field must be a single SQL statement. If the Statement Type is not 'SQL', this field is ignored.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
        QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder().name("put-db-record-quoted-identifiers").displayName("Quote Column Identifiers").description("Enabling this option will cause all column names to be quoted, allowing you to use reserved words as column names in your tables.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
        QUOTED_TABLE_IDENTIFIER = new PropertyDescriptor.Builder().name("put-db-record-quoted-table-identifiers").displayName("Quote Table Identifiers").description("Enabling this option will cause the table name to be quoted to support the use of special characters in the table name.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
        QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("put-db-record-query-timeout").displayName("Max Wait Time").description("The maximum amount of time allowed for a running SQL statement , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("0 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        HashSet<Relationship> r = new HashSet<Relationship>();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        r.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(r);
        ArrayList<PropertyDescriptor> pds = new ArrayList<PropertyDescriptor>();
        pds.add(RECORD_READER_FACTORY);
        pds.add(STATEMENT_TYPE);
        pds.add(DBCP_SERVICE);
        pds.add(CATALOG_NAME);
        pds.add(SCHEMA_NAME);
        pds.add(TABLE_NAME);
        pds.add(TRANSLATE_FIELD_NAMES);
        pds.add(UNMATCHED_FIELD_BEHAVIOR);
        pds.add(UNMATCHED_COLUMN_BEHAVIOR);
        pds.add(UPDATE_KEYS);
        pds.add(FIELD_CONTAINING_SQL);
        pds.add(QUOTED_IDENTIFIERS);
        pds.add(QUOTED_TABLE_IDENTIFIER);
        pds.add(QUERY_TIMEOUT);
        pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
        propDescriptors = Collections.unmodifiableList(pds);
    }

    static class SqlAndIncludedColumns {
        String sql;
        List<Integer> fieldIndexes;

        public SqlAndIncludedColumns(String sql, List<Integer> fieldIndexes) {
            this.sql = sql;
            this.fieldIndexes = fieldIndexes;
        }

        public String getSql() {
            return this.sql;
        }

        public List<Integer> getFieldIndexes() {
            return this.fieldIndexes;
        }
    }

    static class SchemaKey {
        private final String catalog;
        private final String schemaName;
        private final String tableName;

        public SchemaKey(String catalog, String schemaName, String tableName) {
            this.catalog = catalog;
            this.schemaName = schemaName;
            this.tableName = tableName;
        }

        public int hashCode() {
            int result = this.catalog != null ? this.catalog.hashCode() : 0;
            result = 31 * result + (this.schemaName != null ? this.schemaName.hashCode() : 0);
            result = 31 * result + this.tableName.hashCode();
            return result;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SchemaKey schemaKey = (SchemaKey)o;
            if (this.catalog != null ? !this.catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) {
                return false;
            }
            if (this.schemaName != null ? !this.schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) {
                return false;
            }
            return this.tableName.equals(schemaKey.tableName);
        }
    }

    protected static class ColumnDescription {
        private final String columnName;
        private final int dataType;
        private final boolean required;
        private final Integer columnSize;

        public ColumnDescription(String columnName, int dataType, boolean required, Integer columnSize) {
            this.columnName = columnName;
            this.dataType = dataType;
            this.required = required;
            this.columnSize = columnSize;
        }

        public int getDataType() {
            return this.dataType;
        }

        public Integer getColumnSize() {
            return this.columnSize;
        }

        public String getColumnName() {
            return this.columnName;
        }

        public boolean isRequired() {
            return this.required;
        }

        public static ColumnDescription from(ResultSet resultSet) throws SQLException {
            ResultSetMetaData md = resultSet.getMetaData();
            ArrayList<String> columns = new ArrayList<String>();
            for (int i = 1; i < md.getColumnCount() + 1; ++i) {
                columns.add(md.getColumnName(i));
            }
            String columnName = resultSet.getString("COLUMN_NAME");
            int dataType = resultSet.getInt("DATA_TYPE");
            int colSize = resultSet.getInt("COLUMN_SIZE");
            String nullableValue = resultSet.getString("IS_NULLABLE");
            boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
            String defaultValue = resultSet.getString("COLUMN_DEF");
            String autoIncrementValue = "NO";
            if (columns.contains("IS_AUTOINCREMENT")) {
                autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
            }
            boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
            boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
            return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : Integer.valueOf(colSize));
        }
    }

    static class TableSchema {
        private List<String> requiredColumnNames;
        private Set<String> primaryKeyColumnNames;
        private Map<String, ColumnDescription> columns = new HashMap<String, ColumnDescription>();
        private String quotedIdentifierString;

        private TableSchema(List<ColumnDescription> columnDescriptions, boolean translateColumnNames, Set<String> primaryKeyColumnNames, String quotedIdentifierString) {
            this.primaryKeyColumnNames = primaryKeyColumnNames;
            this.quotedIdentifierString = quotedIdentifierString;
            this.requiredColumnNames = new ArrayList<String>();
            for (ColumnDescription desc : columnDescriptions) {
                this.columns.put(PutDatabaseRecord.normalizeColumnName(desc.columnName, translateColumnNames), desc);
                if (!desc.isRequired()) continue;
                this.requiredColumnNames.add(desc.columnName);
            }
        }

        public Map<String, ColumnDescription> getColumns() {
            return this.columns;
        }

        public List<String> getRequiredColumnNames() {
            return this.requiredColumnNames;
        }

        public Set<String> getPrimaryKeyColumnNames() {
            return this.primaryKeyColumnNames;
        }

        public String getQuotedIdentifierString() {
            return this.quotedIdentifierString;
        }

        public static TableSchema from(Connection conn, String catalog, String schema, String tableName, boolean translateColumnNames, boolean includePrimaryKeys) throws SQLException {
            DatabaseMetaData dmd = conn.getMetaData();
            try (ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%");){
                ArrayList<ColumnDescription> cols = new ArrayList<ColumnDescription>();
                while (colrs.next()) {
                    ColumnDescription col = ColumnDescription.from(colrs);
                    cols.add(col);
                }
                HashSet<String> primaryKeyColumns = new HashSet<String>();
                if (includePrimaryKeys) {
                    try (ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName);){
                        while (pkrs.next()) {
                            String colName = pkrs.getString("COLUMN_NAME");
                            primaryKeyColumns.add(PutDatabaseRecord.normalizeColumnName(colName, translateColumnNames));
                        }
                    }
                }
                TableSchema tableSchema = new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
                return tableSchema;
            }
        }
    }

    static class DMLSettings {
        private final boolean translateFieldNames;
        private final boolean ignoreUnmappedFields;
        private final boolean failUnmappedColumns;
        private final boolean warningUnmappedColumns;
        private final boolean escapeColumnNames;
        private final boolean quoteTableName;

        private DMLSettings(ProcessContext context) {
            this.translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
            this.ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
            this.failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
            this.warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
            this.escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
            this.quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
        }
    }

    private static class FunctionContext
    extends RollbackOnFailure {
        private final int queryTimeout;
        private boolean originalAutoCommit = false;
        private String jdbcUrl;

        public FunctionContext(boolean rollbackOnFailure, int queryTimeout) {
            super(rollbackOnFailure, true);
            this.queryTimeout = queryTimeout;
        }
    }
}

