/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.db.JdbcCommon;

public abstract class AbstractExecuteSQL
extends AbstractProcessor {
    public static final String RESULT_ROW_COUNT = "executesql.row.count";
    public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
    public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
    public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
    public static final String RESULTSET_INDEX = "executesql.resultset.index";
    public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
    public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successfully created FlowFile from SQL query result set.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship").build();
    protected Set<Relationship> relationships;
    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("The Controller Service that is used to obtain connection to database").required(true).identifiesControllerService(DBCPService.class).build();
    public static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder().name("sql-pre-query").displayName("SQL Pre-Query").description("A semicolon-delimited list of queries executed before the main SQL query is executed. For example, set session properties before main query. It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). Results/outputs from these queries will be suppressed if there are no errors.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder().name("SQL select query").description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression Language is not evaluated for flow file contents.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder().name("sql-post-query").displayName("SQL Post-Query").description("A semicolon-delimited list of queries executed after the main SQL query is executed. Example like setting session properties after main query. It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). Results/outputs from these queries will be suppressed if there are no errors.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a running SQL select query  , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("0 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).sensitive(false).build();
    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder().name("esql-max-rows").displayName("Max Rows Per Flow File").description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder().name("esql-output-batch-size").displayName("Output Batch Size").description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this property is set.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder().name("esql-fetch-size").displayName("Fetch Size").description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be honored and/or exact. If the value specified is zero, then the hint is ignored.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    protected List<PropertyDescriptor> propDescriptors;
    protected DBCPService dbcpService;

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
            String errorString = "Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query";
            this.getLogger().error("Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query");
            throw new ProcessException("Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query");
        }
        this.dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String selectQuery;
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection() && (fileToProcess = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        ArrayList<FlowFile> resultSetFlowFiles = new ArrayList<FlowFile>();
        ComponentLog logger = this.getLogger();
        int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
        Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
        Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
        Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        List<String> preQueries = this.getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
        List<String> postQueries = this.getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
        SqlWriter sqlWriter = this.configureSqlWriter(session, context, fileToProcess);
        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
            selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
        } else {
            StringBuilder queryContents = new StringBuilder();
            session.read(fileToProcess, in -> queryContents.append(IOUtils.toString((InputStream)in, (Charset)Charset.defaultCharset())));
            selectQuery = queryContents.toString();
        }
        int resultCount = 0;
        try (Connection con = this.dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
             PreparedStatement st = con.prepareStatement(selectQuery);){
            String inputFileUUID;
            if (fetchSize != null && fetchSize > 0) {
                try {
                    st.setFetchSize(fetchSize);
                }
                catch (SQLException se) {
                    logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, (Throwable)se);
                }
            }
            st.setQueryTimeout(queryTimeout);
            Pair<String, SQLException> failure = this.executeConfigStatements(con, preQueries);
            if (failure != null) {
                selectQuery = (String)failure.getLeft();
                throw (SQLException)failure.getRight();
            }
            if (fileToProcess != null) {
                JdbcCommon.setParameters((PreparedStatement)st, (Map)fileToProcess.getAttributes());
            }
            logger.debug("Executing query {}", new Object[]{selectQuery});
            int fragmentIndex = 0;
            String fragmentId = UUID.randomUUID().toString();
            StopWatch executionTime = new StopWatch(true);
            boolean hasResults = st.execute();
            long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
            boolean hasUpdateCount = st.getUpdateCount() != -1;
            Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
            String string = inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
            while (hasResults || hasUpdateCount) {
                if (hasResults) {
                    AtomicLong nrOfRows = new AtomicLong(0L);
                    try {
                        ResultSet resultSet = st.getResultSet();
                        do {
                            StopWatch fetchTime = new StopWatch(true);
                            FlowFile resultSetFF = fileToProcess == null ? session.create() : session.create(fileToProcess);
                            if (inputFileAttrMap != null) {
                                resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
                            }
                            try {
                                resultSetFF = session.write(resultSetFF, out -> {
                                    try {
                                        nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, this.getLogger(), null));
                                    }
                                    catch (Exception e) {
                                        throw e instanceof ProcessException ? (ProcessException)((Object)e) : new ProcessException((Throwable)e);
                                    }
                                });
                                long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
                                HashMap<String, String> attributesToAdd = new HashMap<String, String>();
                                attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
                                attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
                                attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
                                attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
                                attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
                                if (inputFileUUID != null) {
                                    attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
                                }
                                attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
                                resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
                                sqlWriter.updateCounters(session);
                                if (maxRowsPerFlowFile > 0) {
                                    if (nrOfRows.get() == 0L && fragmentIndex > 0) {
                                        session.remove(resultSetFF);
                                        break;
                                    }
                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                                }
                                logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
                                if (context.hasIncomingConnection()) {
                                    session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
                                } else {
                                    session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
                                }
                                resultSetFlowFiles.add(resultSetFF);
                                if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
                                    session.transfer(resultSetFlowFiles, REL_SUCCESS);
                                    if (fileToProcess != null) {
                                        session.remove(fileToProcess);
                                        fileToProcess = null;
                                    }
                                    session.commit();
                                    resultSetFlowFiles.clear();
                                }
                                ++fragmentIndex;
                            }
                            catch (Exception e) {
                                session.remove(resultSetFF);
                                session.remove(resultSetFlowFiles);
                                if (e instanceof ProcessException) {
                                    throw (ProcessException)((Object)e);
                                }
                                throw new ProcessException((Throwable)e);
                            }
                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == (long)maxRowsPerFlowFile.intValue());
                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
                            for (int i = 0; i < resultSetFlowFiles.size(); ++i) {
                                resultSetFlowFiles.set(i, session.putAttribute((FlowFile)resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
                            }
                        }
                    }
                    catch (SQLException e) {
                        throw new ProcessException((Throwable)e);
                    }
                    ++resultCount;
                }
                try {
                    hasResults = st.getMoreResults(1);
                    hasUpdateCount = st.getUpdateCount() != -1;
                }
                catch (SQLException ex) {
                    hasResults = false;
                    hasUpdateCount = false;
                }
            }
            failure = this.executeConfigStatements(con, postQueries);
            if (failure != null) {
                selectQuery = (String)failure.getLeft();
                resultSetFlowFiles.forEach(ff -> session.remove(ff));
                throw (SQLException)failure.getRight();
            }
            session.transfer(resultSetFlowFiles, REL_SUCCESS);
            resultSetFlowFiles.clear();
            if (fileToProcess != null) {
                if (resultCount > 0) {
                    session.remove(fileToProcess);
                } else {
                    fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, this.getLogger()));
                    fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
                    fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
                    session.transfer(fileToProcess, REL_SUCCESS);
                }
            } else if (resultCount == 0) {
                FlowFile resultSetFF = session.create();
                resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, this.getLogger()));
                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
                session.transfer(resultSetFF, REL_SUCCESS);
            }
        }
        catch (SQLException | ProcessException e) {
            if (fileToProcess == null) {
                logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", new Object[]{selectQuery, e});
                context.yield();
            }
            if (context.hasIncomingConnection()) {
                logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[]{selectQuery, fileToProcess, e});
                fileToProcess = session.penalize(fileToProcess);
            } else {
                logger.error("Unable to execute SQL select query {} due to {}; routing to failure", new Object[]{selectQuery, e});
                context.yield();
            }
            session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE, e.getMessage());
            session.transfer(fileToProcess, REL_FAILURE);
        }
    }

    protected Pair<String, SQLException> executeConfigStatements(Connection con, List<String> configQueries) {
        if (configQueries == null || configQueries.isEmpty()) {
            return null;
        }
        for (String confSQL : configQueries) {
            try {
                Statement st = con.createStatement();
                Throwable throwable = null;
                try {
                    st.execute(confSQL);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (st == null) continue;
                    if (throwable != null) {
                        try {
                            st.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    st.close();
                }
            }
            catch (SQLException e) {
                return Pair.of((Object)confSQL, (Object)e);
            }
        }
        return null;
    }

    protected List<String> getQueries(String value) {
        if (value == null || value.length() == 0 || value.trim().length() == 0) {
            return null;
        }
        LinkedList<String> queries = new LinkedList<String>();
        for (String query : value.split("(?<!\\\\);")) {
            if ((query = query.replaceAll("\\\\;", ";")).trim().length() <= 0) continue;
            queries.add(query.trim());
        }
        return queries;
    }

    protected abstract SqlWriter configureSqlWriter(ProcessSession var1, ProcessContext var2, FlowFile var3);
}

