/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;

@EventDriven
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@Tags(value={"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription(value="Executes provided SQL select query. Query result will be converted to Avro format. Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@ReadsAttributes(value={@ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute="sql.args.N.format", description="This attribute is always optional, but default options may not always work for your data. Incoming FlowFiles are expected to be parametrized SQL statements. In some cases a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. base64: the string is a Base64 encoded string that can be decoded to bytes. hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. Dates/Times/Timestamps - Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') as specified according to java.time.format.DateTimeFormatter. If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in 'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), 'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")})
@WritesAttributes(value={@WritesAttribute(attribute="executesql.row.count", description="Contains the number of rows returned in the select query"), @WritesAttribute(attribute="executesql.query.duration", description="Duration of the query in milliseconds"), @WritesAttribute(attribute="executesql.resultset.index", description="Assuming multiple result sets are returned, the zero based index of this result set.")})
public class ExecuteSQL
extends AbstractProcessor {
    public static final String RESULT_ROW_COUNT = "executesql.row.count";
    public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
    public static final String RESULTSET_INDEX = "executesql.resultset.index";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successfully created FlowFile from SQL query result set.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship").build();
    private final Set<Relationship> relationships;
    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("The Controller Service that is used to obtain connection to database").required(true).identifiesControllerService(DBCPService.class).build();
    public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder().name("SQL select query").description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression Language is not evaluated for flow file contents.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a running SQL select query  , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("0 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).sensitive(false).build();
    private final List<PropertyDescriptor> propDescriptors;

    public ExecuteSQL() {
        HashSet<Relationship> r = new HashSet<Relationship>();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(r);
        ArrayList<PropertyDescriptor> pds = new ArrayList<PropertyDescriptor>();
        pds.add(DBCP_SERVICE);
        pds.add(SQL_SELECT_QUERY);
        pds.add(QUERY_TIMEOUT);
        pds.add(JdbcCommon.NORMALIZE_NAMES_FOR_AVRO);
        pds.add(JdbcCommon.USE_AVRO_LOGICAL_TYPES);
        pds.add(JdbcCommon.DEFAULT_PRECISION);
        pds.add(JdbcCommon.DEFAULT_SCALE);
        this.propDescriptors = Collections.unmodifiableList(pds);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
            String errorString = "Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query";
            this.getLogger().error("Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query");
            throw new ProcessException("Either the Select Query must be specified or there must be an incoming connection providing flowfile(s) containing a SQL select query");
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String selectQuery;
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection() && (fileToProcess = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        ComponentLog logger = this.getLogger();
        DBCPService dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
        boolean convertNamesForAvro = context.getProperty(JdbcCommon.NORMALIZE_NAMES_FOR_AVRO).asBoolean();
        Boolean useAvroLogicalTypes = context.getProperty(JdbcCommon.USE_AVRO_LOGICAL_TYPES).asBoolean();
        Integer defaultPrecision = context.getProperty(JdbcCommon.DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
        Integer defaultScale = context.getProperty(JdbcCommon.DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
        StopWatch stopWatch = new StopWatch(true);
        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
            selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
        } else {
            StringBuilder queryContents = new StringBuilder();
            session.read(fileToProcess, in -> queryContents.append(IOUtils.toString((InputStream)in, (Charset)Charset.defaultCharset())));
            selectQuery = queryContents.toString();
        }
        int resultCount = 0;
        try (Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
             PreparedStatement st = con.prepareStatement(selectQuery);){
            FlowFile resultSetFF;
            boolean hasUpdateCount;
            st.setQueryTimeout(queryTimeout);
            if (fileToProcess != null) {
                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
            }
            logger.debug("Executing query {}", new Object[]{selectQuery});
            boolean hasResults = st.execute();
            boolean bl = hasUpdateCount = st.getUpdateCount() != -1;
            while (hasResults || hasUpdateCount) {
                if (hasResults) {
                    if (fileToProcess == null) {
                        resultSetFF = session.create();
                    } else {
                        resultSetFF = session.create(fileToProcess);
                        resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
                    }
                    AtomicLong nrOfRows = new AtomicLong(0L);
                    resultSetFF = session.write(resultSetFF, out -> {
                        try {
                            ResultSet resultSet = st.getResultSet();
                            JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder().convertNames(convertNamesForAvro).useLogicalTypes(useAvroLogicalTypes).defaultPrecision(defaultPrecision).defaultScale(defaultScale).build();
                            nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
                        }
                        catch (SQLException e) {
                            throw new ProcessException((Throwable)e);
                        }
                    });
                    long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
                    resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
                    resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                    resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
                    logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
                    session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
                    session.transfer(resultSetFF, REL_SUCCESS);
                    ++resultCount;
                }
                try {
                    hasResults = st.getMoreResults(1);
                    hasUpdateCount = st.getUpdateCount() != -1;
                }
                catch (SQLException ex) {
                    hasResults = false;
                    hasUpdateCount = false;
                }
            }
            if (fileToProcess != null) {
                if (resultCount > 0) {
                    session.remove(fileToProcess);
                } else {
                    fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
                    fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
                    fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                    session.transfer(fileToProcess, REL_SUCCESS);
                }
            } else if (resultCount == 0) {
                resultSetFF = session.create();
                resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                session.transfer(resultSetFF, REL_SUCCESS);
            }
        }
        catch (SQLException | ProcessException e) {
            if (fileToProcess == null) {
                logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", new Object[]{selectQuery, e});
                context.yield();
            }
            if (context.hasIncomingConnection()) {
                logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[]{selectQuery, fileToProcess, e});
                fileToProcess = session.penalize(fileToProcess);
            } else {
                logger.error("Unable to execute SQL select query {} due to {}; routing to failure", new Object[]{selectQuery, e});
                context.yield();
            }
            session.transfer(fileToProcess, REL_FAILURE);
        }
    }
}

