/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso(value={QueryDatabaseTable.class, ExecuteSQL.class})
@CapabilityDescription(value="Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This processor is intended to be run on the Primary Node only.")
@Stateful(scopes={Scope.CLUSTER}, description="After performing a query on the specified table, the maximum values for the specified column(s) will be retained for use in future executions of the query. This allows the Processor to fetch only those records that have max values greater than the retained values. This can be used for incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor per the State Management documentation")
public class GenerateTableFetch
extends AbstractDatabaseFetchProcessor {
    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder().name("gen-table-fetch-partition-size").displayName("Partition Size").description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows in the table.").defaultValue("10000").required(true).expressionLanguageSupported(false).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();

    public GenerateTableFetch() {
        HashSet<Relationship> r = new HashSet<Relationship>();
        r.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(r);
        ArrayList<PropertyDescriptor> pds = new ArrayList<PropertyDescriptor>();
        pds.add(DBCP_SERVICE);
        pds.add(DB_TYPE);
        pds.add(TABLE_NAME);
        pds.add(COLUMN_NAMES);
        pds.add(MAX_VALUE_COLUMN_NAMES);
        pds.add(QUERY_TIMEOUT);
        pds.add(PARTITION_SIZE);
        this.propDescriptors = Collections.unmodifiableList(pds);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        StateMap stateMap;
        ProcessSession session = sessionFactory.createSession();
        ComponentLog logger = this.getLogger();
        DBCPService dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        DatabaseAdapter dbAdapter = (DatabaseAdapter)dbAdapters.get(context.getProperty(DB_TYPE).getValue());
        String tableName = context.getProperty(TABLE_NAME).getValue();
        String columnNames = context.getProperty(COLUMN_NAMES).getValue();
        String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
        int partitionSize = context.getProperty(PARTITION_SIZE).asInteger();
        StateManager stateManager = context.getStateManager();
        try {
            stateMap = stateManager.getState(Scope.CLUSTER);
        }
        catch (IOException ioe) {
            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform query until this is accomplished.", (Throwable)ioe);
            context.yield();
            return;
        }
        try {
            int rowCount;
            List<Object> maxValueColumnNameList;
            String whereClause;
            HashMap<String, String> statePropertyMap;
            block37: {
                statePropertyMap = new HashMap<String, String>(stateMap.toMap());
                whereClause = null;
                maxValueColumnNameList = StringUtils.isEmpty((CharSequence)maxValueColumnNames) ? new ArrayList(0) : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
                ArrayList maxValueClauses = new ArrayList(maxValueColumnNameList.size());
                String columnsClause = null;
                ArrayList<String> maxValueSelectColumns = new ArrayList<String>(maxValueColumnNameList.size() + 1);
                maxValueSelectColumns.add("COUNT(*)");
                IntStream.range(0, maxValueColumnNameList.size()).forEach(index -> {
                    String colName = (String)maxValueColumnNameList.get(index);
                    maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                    String maxValue = (String)statePropertyMap.get(colName.toLowerCase());
                    if (!StringUtils.isEmpty((CharSequence)maxValue)) {
                        Integer type = (Integer)this.columnTypeMap.get(colName.toLowerCase());
                        if (type == null) {
                            throw new IllegalArgumentException("No column type found for: " + colName);
                        }
                        maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + GenerateTableFetch.getLiteralByType(type, maxValue, dbAdapter.getName()));
                    }
                });
                whereClause = StringUtils.join(maxValueClauses, (String)" AND ");
                columnsClause = StringUtils.join(maxValueSelectColumns, (String)", ");
                String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
                rowCount = 0;
                try (Connection con = dbcpService.getConnection();
                     Statement st = con.createStatement();){
                    Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
                    st.setQueryTimeout(queryTimeout);
                    logger.debug("Executing {}", new Object[]{selectQuery});
                    ResultSet resultSet = st.executeQuery(selectQuery);
                    if (resultSet.next()) {
                        rowCount = resultSet.getInt(1);
                        ResultSetMetaData rsmd = resultSet.getMetaData();
                        for (int i = 2; i <= rsmd.getColumnCount(); ++i) {
                            String resultColumnName = rsmd.getColumnName(i).toLowerCase();
                            int type = rsmd.getColumnType(i);
                            try {
                                String newMaxValue = GenerateTableFetch.getMaxValueFromRow(resultSet, i, type, (String)statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName());
                                if (newMaxValue == null) continue;
                                statePropertyMap.put(resultColumnName, newMaxValue);
                                continue;
                            }
                            catch (IOException | ParseException pie) {
                                throw new ProcessException((Throwable)pie);
                            }
                        }
                        break block37;
                    }
                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
                }
                catch (SQLException e) {
                    logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
                    throw new ProcessException((Throwable)e);
                }
            }
            int numberOfFetches = partitionSize == 0 ? rowCount : rowCount / partitionSize + (rowCount % partitionSize == 0 ? 0 : 1);
            for (int i = 0; i < numberOfFetches; ++i) {
                Integer limit = partitionSize == 0 ? null : Integer.valueOf(partitionSize);
                Integer offset = partitionSize == 0 ? null : Integer.valueOf(i * partitionSize);
                String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, (String)", "), limit, offset);
                FlowFile sqlFlowFile = session.create();
                sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
                session.transfer(sqlFlowFile, REL_SUCCESS);
            }
            session.commit();
            try {
                stateManager.setState(statePropertyMap, Scope.CLUSTER);
            }
            catch (IOException ioe) {
                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. Also, any generated SQL statements may be duplicated.", new Object[]{this, ioe});
            }
        }
        catch (ProcessException pe) {
            Throwable t = pe.getCause() == null ? pe : pe.getCause();
            logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
            session.rollback();
            context.yield();
        }
    }
}

