/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@Tags(value={"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso(value={QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription(value="Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This processor is intended to be run on the Primary Node only.\n\nThis processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n  - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n  - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n  - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes={Scope.CLUSTER}, description="After performing a query on the specified table, the maximum values for the specified column(s) will be retained for use in future executions of the query. This allows the Processor to fetch only those records that have max values greater than the retained values. This can be used for incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor per the State Management documentation")
@WritesAttributes(value={@WritesAttribute(attribute="generatetablefetch.sql.error", description="If the processor has incoming connections, and processing an incoming flow file causes a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute="generatetablefetch.tableName", description="The name of the database table to be queried."), @WritesAttribute(attribute="generatetablefetch.columnNames", description="The comma-separated list of column names used in the query."), @WritesAttribute(attribute="generatetablefetch.whereClause", description="Where clause used in the query to get the expected rows."), @WritesAttribute(attribute="generatetablefetch.maxColumnNames", description="The comma-separated list of column names used to keep track of data that has been returned since the processor started running."), @WritesAttribute(attribute="generatetablefetch.limit", description="The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute="generatetablefetch.offset", description="Offset to be used to retrieve the corresponding partition.")})
@DynamicProperty(name="Initial Max Value", value="Attribute Expression Language", supportsExpressionLanguage=false, description="Specifies an initial max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class GenerateTableFetch
extends AbstractDatabaseFetchProcessor {
    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder().name("gen-table-fetch-partition-size").displayName("Partition Size").description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows in the table.").defaultValue("10000").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. If no incoming connection(s) are specified, this relationship is unused.").build();

    public GenerateTableFetch() {
        HashSet<Relationship> r = new HashSet<Relationship>();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(r);
        ArrayList<PropertyDescriptor> pds = new ArrayList<PropertyDescriptor>();
        pds.add(DBCP_SERVICE);
        pds.add(DB_TYPE);
        pds.add(TABLE_NAME);
        pds.add(COLUMN_NAMES);
        pds.add(MAX_VALUE_COLUMN_NAMES);
        pds.add(QUERY_TIMEOUT);
        pds.add(PARTITION_SIZE);
        pds.add(WHERE_CLAUSE);
        this.propDescriptors = Collections.unmodifiableList(pds);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return super.customValidate(validationContext);
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        this.maxValueProperties = this.getDefaultMaxValueProperties(context.getProperties());
        if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
            this.getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
        }
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        StateMap stateMap;
        if (!(this.isDynamicTableName || this.isDynamicMaxValues || this.setupComplete.get())) {
            super.setup(context);
        }
        ProcessSession session = sessionFactory.createSession();
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection() && (fileToProcess = session.get()) == null) {
            return;
        }
        ComponentLog logger = this.getLogger();
        DBCPService dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        DatabaseAdapter dbAdapter = (DatabaseAdapter)dbAdapters.get(context.getProperty(DB_TYPE).getValue());
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
        String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
        StateManager stateManager = context.getStateManager();
        FlowFile finalFileToProcess = fileToProcess;
        try {
            stateMap = stateManager.getState(Scope.CLUSTER);
        }
        catch (IOException ioe) {
            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform query until this is accomplished.", (Throwable)ioe);
            context.yield();
            return;
        }
        try {
            HashMap<String, String> statePropertyMap = new HashMap<String, String>(stateMap.toMap());
            for (Map.Entry maxProp : this.maxValueProperties.entrySet()) {
                String maxPropKey = ((String)maxProp.getKey()).toLowerCase();
                String fullyQualifiedMaxPropKey = GenerateTableFetch.getStateKey(tableName, maxPropKey);
                if (statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) continue;
                String newMaxPropValue = statePropertyMap.containsKey(maxPropKey) ? (String)statePropertyMap.get(maxPropKey) : (String)maxProp.getValue();
                statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
            }
            String whereClause = null;
            List<Object> maxValueColumnNameList = StringUtils.isEmpty((CharSequence)maxValueColumnNames) ? new ArrayList(0) : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
            ArrayList<String> maxValueClauses = new ArrayList<String>(maxValueColumnNameList.size());
            String columnsClause = null;
            ArrayList<String> maxValueSelectColumns = new ArrayList<String>(maxValueColumnNameList.size() + 1);
            maxValueSelectColumns.add("COUNT(*)");
            IntStream.range(0, maxValueColumnNameList.size()).forEach(index -> {
                String colName = (String)maxValueColumnNameList.get(index);
                maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                String maxValue = this.getColumnStateMaxValue(tableName, statePropertyMap, colName);
                if (!StringUtils.isEmpty((CharSequence)maxValue)) {
                    if (this.columnTypeMap.isEmpty() || this.getColumnType(tableName, colName) == null) {
                        super.setup(context, false, finalFileToProcess);
                    }
                    Integer type = this.getColumnType(tableName, colName);
                    maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + GenerateTableFetch.getLiteralByType(type, maxValue, dbAdapter.getName()));
                }
            });
            if (customWhereClause != null) {
                maxValueClauses.add("(" + customWhereClause + ")");
            }
            whereClause = StringUtils.join(maxValueClauses, (String)" AND ");
            columnsClause = StringUtils.join(maxValueSelectColumns, (String)", ");
            String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
            long rowCount = 0L;
            try (Connection con = dbcpService.getConnection();
                 Statement st = con.createStatement();){
                Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                st.setQueryTimeout(queryTimeout);
                logger.debug("Executing {}", new Object[]{selectQuery});
                ResultSet resultSet = st.executeQuery(selectQuery);
                if (resultSet.next()) {
                    rowCount = resultSet.getLong(1);
                    ResultSetMetaData rsmd = resultSet.getMetaData();
                    for (int i = 2; i <= rsmd.getColumnCount(); ++i) {
                        String resultColumnName = (StringUtils.isNotEmpty((CharSequence)rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
                        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, resultColumnName);
                        String resultColumnCurrentMax = (String)statePropertyMap.get(fullyQualifiedStateKey);
                        if (StringUtils.isEmpty((CharSequence)resultColumnCurrentMax) && !this.isDynamicTableName) {
                            resultColumnCurrentMax = (String)statePropertyMap.get(resultColumnName);
                        }
                        int type = rsmd.getColumnType(i);
                        if (this.isDynamicTableName) {
                            this.columnTypeMap.put(fullyQualifiedStateKey, type);
                        }
                        try {
                            String newMaxValue = GenerateTableFetch.getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
                            if (newMaxValue == null) continue;
                            statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                            continue;
                        }
                        catch (IOException | ParseException pie) {
                            throw new ProcessException((Throwable)pie);
                        }
                    }
                } else {
                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
                }
                IntStream.range(0, maxValueColumnNameList.size()).forEach(index -> {
                    String colName = (String)maxValueColumnNameList.get(index);
                    maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                    String maxValue = this.getColumnStateMaxValue(tableName, statePropertyMap, colName);
                    if (!StringUtils.isEmpty((CharSequence)maxValue)) {
                        if (this.columnTypeMap.isEmpty() || this.getColumnType(tableName, colName) == null) {
                            super.setup(context, false, finalFileToProcess);
                        }
                        Integer type = this.getColumnType(tableName, colName);
                        maxValueClauses.add(colName + " <= " + GenerateTableFetch.getLiteralByType(type, maxValue, dbAdapter.getName()));
                    }
                });
                whereClause = StringUtils.join(maxValueClauses, (String)" AND ");
                long numberOfFetches = partitionSize == 0 ? 1L : rowCount / (long)partitionSize + (long)(rowCount % (long)partitionSize == 0L ? 0 : 1);
                for (long i = 0L; i < numberOfFetches; ++i) {
                    Long limit = partitionSize == 0 ? null : Long.valueOf(partitionSize);
                    Long offset = partitionSize == 0 ? null : Long.valueOf(i * (long)partitionSize);
                    String maxColumnNames = StringUtils.join(maxValueColumnNameList, (String)", ");
                    String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
                    FlowFile sqlFlowFile = fileToProcess == null ? session.create() : session.create(fileToProcess);
                    sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
                    if (columnNames != null) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
                    }
                    if (StringUtils.isNotBlank((CharSequence)whereClause)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
                    }
                    if (StringUtils.isNotBlank((CharSequence)maxColumnNames)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
                    }
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
                    if (partitionSize != 0) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
                    }
                    session.transfer(sqlFlowFile, REL_SUCCESS);
                }
                if (fileToProcess != null) {
                    session.remove(fileToProcess);
                }
            }
            catch (SQLException e) {
                if (fileToProcess != null) {
                    logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
                    fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
                    session.transfer(fileToProcess, REL_FAILURE);
                }
                logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
                throw new ProcessException((Throwable)e);
            }
            session.commit();
            try {
                stateManager.setState(statePropertyMap, Scope.CLUSTER);
            }
            catch (IOException ioe) {
                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. Also, any generated SQL statements may be duplicated.", new Object[]{this, ioe});
            }
        }
        catch (ProcessException pe) {
            Throwable t = pe.getCause() == null ? pe : pe.getCause();
            logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
            session.rollback();
            context.yield();
        }
    }

    private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, colName);
        String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
        if (StringUtils.isEmpty((CharSequence)maxValue) && !this.isDynamicTableName) {
            maxValue = statePropertyMap.get(GenerateTableFetch.getStateKey(null, colName));
        }
        return maxValue;
    }

    private Integer getColumnType(String tableName, String colName) {
        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, colName);
        Integer type = (Integer)this.columnTypeMap.get(fullyQualifiedStateKey);
        if (type == null && !this.isDynamicTableName) {
            type = (Integer)this.columnTypeMap.get(GenerateTableFetch.getStateKey(null, colName));
        }
        return type;
    }
}

