/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Evaluates one or more SQL queries against the contents of a FlowFile. The result of the SQL query then becomes the content of the output FlowFile. This can be used, for example, for field-specific filtering, transformation, and row-level filtering. Columns can be renamed, simple calculations and aggregations performed, etc. The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. The Processor must be configured with at least one user-defined property. The name of the Property is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated relationship. If the Record Writer chooses to inherit the schema from the Record, it is important to note that the schema that is inherited will be from the ResultSet, rather than the input Record. This allows a single instance of the QueryRecord processor to have multiple queries, each of which returns a different set of columns and aggregations. As a result, though, the schema that is derived will have no schema name, so it is important that the configured Record Writer not attempt to write the Schema Name as an attribute if inheriting the Schema from the Record. See the Processor Usage documentation for more information.")
@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
@DynamicProperty(name="The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this relationship.", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data that is selected being routed to the relationship whose name is the property name")
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute="record.count", description="The number of records selected by the query")})
public class QueryRecord
extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing results to a FlowFile").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder().name("include-zero-record-flowfiles").displayName("Include Zero Record FlowFiles").description("When running the SQL statement against an incoming FlowFile, if the result has no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder().name("cache-schema").displayName("Cache Schema").description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in input data), the original FlowFile it will be routed to this relationship").build();
    private List<PropertyDescriptor> properties;
    private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet());
    private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<String, BlockingQueue<CachedStatement>>();

    protected void init(ProcessorInitializationContext context) {
        try {
            DriverManager.registerDriver((Driver)new org.apache.calcite.jdbc.Driver());
        }
        catch (SQLException e) {
            throw new ProcessException("Failed to load Calcite JDBC Driver", (Throwable)e);
        }
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER_FACTORY);
        properties.add(RECORD_WRITER_FACTORY);
        properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
        properties.add(CACHE_SCHEMA);
        this.properties = Collections.unmodifiableList(properties);
        this.relationships.add(REL_FAILURE);
        this.relationships.add(REL_ORIGINAL);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (!descriptor.isDynamic()) {
            return;
        }
        Relationship relationship = new Relationship.Builder().name(descriptor.getName()).description("User-defined relationship that specifies where data that matches the specified SQL query should be routed").build();
        if (newValue == null) {
            this.relationships.remove(relationship);
        } else {
            this.relationships.add(relationship);
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean();
        if (cache) {
            for (PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
                if (!descriptor.isDynamic() || !validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) continue;
                return Collections.singleton(new ValidationResult.Builder().subject("Cache Schema").input("true").valid(false).explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language").build());
            }
        }
        return Collections.emptyList();
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("SQL select statement specifies how data should be filtered/transformed. SQL SELECT should select from the FLOWFILE table").required(false).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator((Validator)new SqlValidator()).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) {
        RecordSchema readerSchema;
        FlowFile original = session.get();
        if (original == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        final RecordSetWriterFactory recordSetWriterFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        HashMap<FlowFile, Relationship> transformedFlowFiles = new HashMap<FlowFile, Relationship>();
        HashSet<FlowFile> createdFlowFiles = new HashSet<FlowFile>();
        try (InputStream rawIn = session.read(original);){
            Map originalAttributes = original.getAttributes();
            RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, this.getLogger());
            RecordSchema inputSchema = reader.getSchema();
            readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[]{original, e});
            session.transfer(original, REL_FAILURE);
            return;
        }
        final Map originalAttributes = original.getAttributes();
        int recordsRead = 0;
        try {
            for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
                if (!descriptor.isDynamic()) continue;
                Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
                FlowFile transformed = session.create(original);
                boolean flowFileRemoved = false;
                try {
                    String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
                    final AtomicReference writeResultRef = new AtomicReference();
                    QueryResult queryResult = context.getProperty(CACHE_SCHEMA).asBoolean() != false ? this.queryWithCache(session, original, sql, context, recordReaderFactory) : this.query(session, original, sql, context, recordReaderFactory);
                    final AtomicReference mimeTypeRef = new AtomicReference();
                    try {
                        final ResultSet rs = queryResult.getResultSet();
                        transformed = session.write(transformed, new OutputStreamCallback(){

                            public void process(OutputStream out) throws IOException {
                                RecordSchema writeSchema;
                                ResultSetRecordSet recordSet;
                                try {
                                    recordSet = new ResultSetRecordSet(rs, readerSchema);
                                    RecordSchema resultSetSchema = recordSet.getSchema();
                                    writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
                                }
                                catch (SQLException | SchemaNotFoundException e) {
                                    throw new ProcessException(e);
                                }
                                try (RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(QueryRecord.this.getLogger(), writeSchema, out);){
                                    writeResultRef.set(resultSetWriter.write((RecordSet)recordSet));
                                    mimeTypeRef.set(resultSetWriter.getMimeType());
                                }
                                catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        });
                    }
                    catch (Throwable throwable) {
                        this.closeQuietly(queryResult);
                        throw throwable;
                    }
                    this.closeQuietly(queryResult);
                    recordsRead = Math.max(recordsRead, queryResult.getRecordsRead());
                    WriteResult result = (WriteResult)writeResultRef.get();
                    if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean().booleanValue()) {
                        session.remove(transformed);
                        flowFileRemoved = true;
                        transformedFlowFiles.remove(transformed);
                        this.getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[]{original});
                        continue;
                    }
                    HashMap attributesToAdd = new HashMap();
                    if (result.getAttributes() != null) {
                        attributesToAdd.putAll(result.getAttributes());
                    }
                    attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                    attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
                    transformed = session.putAllAttributes(transformed, attributesToAdd);
                    transformedFlowFiles.put(transformed, relationship);
                    session.adjustCounter("Records Written", (long)result.getRecordCount(), false);
                }
                finally {
                    if (flowFileRemoved) continue;
                    createdFlowFiles.add(transformed);
                }
            }
            long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
            if (transformedFlowFiles.size() > 0) {
                session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
                for (Map.Entry entry : transformedFlowFiles.entrySet()) {
                    FlowFile transformed = (FlowFile)entry.getKey();
                    Relationship relationship = (Relationship)entry.getValue();
                    session.getProvenanceReporter().route(transformed, relationship);
                    session.transfer(transformed, relationship);
                }
            }
            this.getLogger().info("Successfully queried {} in {} millis", new Object[]{original, elapsedMillis});
            session.transfer(original, REL_ORIGINAL);
        }
        catch (SQLException e) {
            this.getLogger().error("Unable to query {} due to {}", new Object[]{original, e.getCause() == null ? e : e.getCause()});
            session.remove(createdFlowFiles);
            session.transfer(original, REL_FAILURE);
        }
        catch (Exception e) {
            this.getLogger().error("Unable to query {} due to {}", new Object[]{original, e});
            session.remove(createdFlowFiles);
            session.transfer(original, REL_FAILURE);
        }
        session.adjustCounter("Records Read", (long)recordsRead, false);
    }

    private synchronized CachedStatement getStatement(String sql, Supplier<CalciteConnection> connectionSupplier, ProcessSession session, FlowFile flowFile, RecordReaderFactory recordReaderFactory) throws SQLException {
        BlockingQueue<CachedStatement> statementQueue = this.statementQueues.get(sql);
        if (statementQueue == null) {
            return this.buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
        }
        CachedStatement cachedStmt = (CachedStatement)statementQueue.poll();
        if (cachedStmt != null) {
            return cachedStmt;
        }
        return this.buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
    }

    private CachedStatement buildCachedStatement(String sql, Supplier<CalciteConnection> connectionSupplier, ProcessSession session, FlowFile flowFile, RecordReaderFactory recordReaderFactory) throws SQLException {
        CalciteConnection connection = connectionSupplier.get();
        SchemaPlus rootSchema = connection.getRootSchema();
        FlowFileTable flowFileTable = new FlowFileTable(session, flowFile, recordReaderFactory, this.getLogger());
        rootSchema.add("FLOWFILE", flowFileTable);
        rootSchema.setCacheEnabled(false);
        PreparedStatement stmt = connection.prepareStatement(sql);
        return new CachedStatement(stmt, flowFileTable, (Connection)connection);
    }

    @OnStopped
    public synchronized void cleanup() {
        for (BlockingQueue<CachedStatement> statementQueue : this.statementQueues.values()) {
            CachedStatement stmt;
            while ((stmt = (CachedStatement)statementQueue.poll()) != null) {
                this.closeQuietly(stmt.getStatement(), stmt.getConnection());
            }
        }
        this.statementQueues.clear();
    }

    @OnScheduled
    public synchronized void setupQueues(ProcessContext context) {
        for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
            if (!descriptor.isDynamic()) continue;
            String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
            LinkedBlockingQueue queue = new LinkedBlockingQueue(context.getMaxConcurrentTasks());
            this.statementQueues.put(sql, queue);
        }
    }

    protected QueryResult queryWithCache(ProcessSession session, FlowFile flowFile, final String sql, ProcessContext context, RecordReaderFactory recordParserFactory) throws SQLException {
        ResultSet rs;
        Supplier<CalciteConnection> connectionSupplier = () -> {
            Properties properties = new Properties();
            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
            try {
                Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
                CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
                return calciteConnection;
            }
            catch (Exception e) {
                throw new ProcessException((Throwable)e);
            }
        };
        final CachedStatement cachedStatement = this.getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory);
        PreparedStatement stmt = cachedStatement.getStatement();
        final FlowFileTable<?, ?> table = cachedStatement.getTable();
        table.setFlowFile(session, flowFile);
        try {
            rs = stmt.executeQuery();
        }
        catch (Throwable t) {
            table.close();
            throw t;
        }
        return new QueryResult(){

            @Override
            public void close() throws IOException {
                table.close();
                BlockingQueue statementQueue = (BlockingQueue)QueryRecord.this.statementQueues.get(sql);
                if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
                    try {
                        cachedStatement.getConnection().close();
                    }
                    catch (SQLException e) {
                        throw new IOException("Failed to close statement", e);
                    }
                }
            }

            @Override
            public ResultSet getResultSet() {
                return rs;
            }

            @Override
            public int getRecordsRead() {
                return table.getRecordsRead();
            }
        };
    }

    protected QueryResult query(ProcessSession session, FlowFile flowFile, String sql, ProcessContext context, RecordReaderFactory recordParserFactory) throws SQLException {
        Properties properties = new Properties();
        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
        Connection connection = null;
        ResultSet resultSet = null;
        Statement statement = null;
        try {
            connection = DriverManager.getConnection("jdbc:calcite:", properties);
            CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
            SchemaPlus rootSchema = calciteConnection.getRootSchema();
            final FlowFileTable flowFileTable = new FlowFileTable(session, flowFile, recordParserFactory, this.getLogger());
            rootSchema.add("FLOWFILE", flowFileTable);
            rootSchema.setCacheEnabled(false);
            statement = connection.createStatement();
            try {
                resultSet = statement.executeQuery(sql);
            }
            catch (Throwable t) {
                flowFileTable.close();
                throw t;
            }
            final ResultSet rs = resultSet;
            final Statement stmt = statement;
            final Connection conn = connection;
            return new QueryResult(){

                @Override
                public void close() throws IOException {
                    QueryRecord.this.closeQuietly(new AutoCloseable[]{rs, stmt, conn});
                }

                @Override
                public ResultSet getResultSet() {
                    return rs;
                }

                @Override
                public int getRecordsRead() {
                    return flowFileTable.getRecordsRead();
                }
            };
        }
        catch (Exception e) {
            this.closeQuietly(resultSet, statement, connection);
            throw e;
        }
    }

    private void closeQuietly(AutoCloseable ... closeables) {
        if (closeables == null) {
            return;
        }
        for (AutoCloseable closeable : closeables) {
            if (closeable == null) continue;
            try {
                closeable.close();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to close SQL resource", (Throwable)e);
            }
        }
    }

    private static class CachedStatement {
        private final FlowFileTable<?, ?> table;
        private final PreparedStatement statement;
        private final Connection connection;

        public CachedStatement(PreparedStatement statement, FlowFileTable<?, ?> table, Connection connection) {
            this.statement = statement;
            this.table = table;
            this.connection = connection;
        }

        public FlowFileTable<?, ?> getTable() {
            return this.table;
        }

        public PreparedStatement getStatement() {
            return this.statement;
        }

        public Connection getConnection() {
            return this.connection;
        }
    }

    private static interface QueryResult
    extends Closeable {
        public ResultSet getResultSet();

        public int getRecordsRead();
    }

    private static class SqlValidator
    implements Validator {
        private SqlValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            if (context.isExpressionLanguagePresent(input)) {
                return new ValidationResult.Builder().input(input).subject(subject).valid(true).explanation("Expression Language Present").build();
            }
            String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
            SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL_ANSI).build();
            SqlParser parser = SqlParser.create((String)substituted, (SqlParser.Config)config);
            try {
                parser.parseStmt();
                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
            }
            catch (Exception e) {
                return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Not a valid SQL Statement: " + e.getMessage()).build();
            }
        }
    }
}

