/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;

@EventDriven
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@Tags(value={"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription(value="Execute provided SQL select query. Query result will be converted to Avro format. Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
public class ExecuteSQL
extends AbstractProcessor {
    public static final String RESULT_ROW_COUNT = "executesql.row.count";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successfully created FlowFile from SQL query result set.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship").build();
    private final Set<Relationship> relationships;
    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("The Controller Service that is used to obtain connection to database").required(true).identifiesControllerService(DBCPService.class).build();
    public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder().name("SQL select query").description("SQL select query").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a running SQL select query  , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("0 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).sensitive(false).build();
    private final List<PropertyDescriptor> propDescriptors;

    public ExecuteSQL() {
        HashSet<Relationship> r = new HashSet<Relationship>();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(r);
        ArrayList<PropertyDescriptor> pds = new ArrayList<PropertyDescriptor>();
        pds.add(DBCP_SERVICE);
        pds.add(SQL_SELECT_QUERY);
        pds.add(QUERY_TIMEOUT);
        this.propDescriptors = Collections.unmodifiableList(pds);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile incoming = null;
        if (context.hasIncomingConnection() && (incoming = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        final ProcessorLog logger = this.getLogger();
        DBCPService dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
        Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
        StopWatch stopWatch = new StopWatch(true);
        try (Connection con = dbcpService.getConnection();
             final Statement st = con.createStatement();){
            st.setQueryTimeout(queryTimeout);
            final LongHolder nrOfRows = new LongHolder(0L);
            FlowFile outgoing = incoming == null ? session.create() : incoming;
            outgoing = session.write(outgoing, new OutputStreamCallback(){

                public void process(OutputStream out) throws IOException {
                    try {
                        logger.debug("Executing query {}", new Object[]{selectQuery});
                        ResultSet resultSet = st.executeQuery(selectQuery);
                        nrOfRows.set((Object)JdbcCommon.convertToAvroStream(resultSet, out));
                    }
                    catch (SQLException e) {
                        throw new ProcessException((Throwable)e);
                    }
                }
            });
            outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, ((Long)nrOfRows.get()).toString());
            logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{outgoing, nrOfRows.get()});
            session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(outgoing, REL_SUCCESS);
        }
        catch (SQLException | ProcessException e) {
            if (incoming == null) {
                logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[]{selectQuery, e});
            }
            logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[]{selectQuery, incoming, e});
            session.transfer(incoming, REL_FAILURE);
        }
    }
}

