/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Evaluates one or more SQL queries against the contents of a FlowFile. The result of the SQL query then becomes the content of the output FlowFile. This can be used, for example, for field-specific filtering, transformation, and row-level filtering. Columns can be renamed, simple calculations and aggregations performed, etc. The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. The Processor must be configured with at least one user-defined property. The name of the Property is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated relationship. If the Record Writer chooses to inherit the schema from the Record, it is important to note that the schema that is inherited will be from the ResultSet, rather than the input Record. This allows a single instance of the QueryRecord processor to have multiple queries, each of which returns a different set of columns and aggregations. As a result, though, the schema that is derived will have no schema name, so it is important that the configured Record Writer not attempt to write the Schema Name as an attribute if inheriting the Schema from the Record. See the Processor Usage documentation for more information.")
@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
@DynamicProperty(name="The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this relationship.", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data that is selected being routed to the relationship whose name is the property name")
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute="record.count", description="The number of records selected by the query")})
public class QueryRecord
extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing results to a FlowFile").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder().name("include-zero-record-flowfiles").displayName("Include Zero Record FlowFiles").description("When running the SQL statement against an incoming FlowFile, if the result has no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder().name("cache-schema").displayName("Cache Schema").description("This property is no longer used. It remains solely for backward compatibility in order to avoid making existing Processors invalid upon upgrade. This property will be removed in future versions. Now, instead of forcing the user to understand the semantics of schema caching, the Processor caches up to 25 schemas and automatically rolls off the old schemas. This provides the same performance when caching was enabled previously and in some cases very significant performance improvements if caching was previously disabled.").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in input data), the original FlowFile it will be routed to this relationship").build();
    private List<PropertyDescriptor> properties;
    private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet());
    private final Cache<Tuple<String, RecordSchema>, BlockingQueue<CachedStatement>> statementQueues = Caffeine.newBuilder().maximumSize(25L).removalListener(this::onCacheEviction).build();

    protected void init(ProcessorInitializationContext context) {
        try {
            DriverManager.registerDriver((Driver)new org.apache.calcite.jdbc.Driver());
        }
        catch (SQLException e) {
            throw new ProcessException("Failed to load Calcite JDBC Driver", (Throwable)e);
        }
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER_FACTORY);
        properties.add(RECORD_WRITER_FACTORY);
        properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
        properties.add(CACHE_SCHEMA);
        this.properties = Collections.unmodifiableList(properties);
        this.relationships.add(REL_FAILURE);
        this.relationships.add(REL_ORIGINAL);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (!descriptor.isDynamic()) {
            return;
        }
        Relationship relationship = new Relationship.Builder().name(descriptor.getName()).description("User-defined relationship that specifies where data that matches the specified SQL query should be routed").build();
        if (newValue == null) {
            this.relationships.remove(relationship);
        } else {
            this.relationships.add(relationship);
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("SQL select statement specifies how data should be filtered/transformed. SQL SELECT should select from the FLOWFILE table").required(false).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator((Validator)new SqlValidator()).build();
    }

    @OnStopped
    public synchronized void cleanup() {
        for (BlockingQueue statementQueue : this.statementQueues.asMap().values()) {
            this.clearQueue(statementQueue);
        }
        this.statementQueues.invalidateAll();
    }

    private void onCacheEviction(Tuple<String, RecordSchema> key, BlockingQueue<CachedStatement> queue, RemovalCause cause) {
        this.clearQueue(queue);
    }

    private void clearQueue(BlockingQueue<CachedStatement> statementQueue) {
        CachedStatement stmt;
        while ((stmt = (CachedStatement)statementQueue.poll()) != null) {
            this.closeQuietly(stmt.getStatement(), stmt.getConnection());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) {
        RecordSchema writerSchema;
        RecordSchema readerSchema;
        final FlowFile original = session.get();
        if (original == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        final RecordSetWriterFactory recordSetWriterFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        HashMap<FlowFile, Relationship> transformedFlowFiles = new HashMap<FlowFile, Relationship>();
        HashSet<FlowFile> createdFlowFiles = new HashSet<FlowFile>();
        try (InputStream rawIn = session.read(original);){
            Map originalAttributes = original.getAttributes();
            RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, original.getSize(), this.getLogger());
            readerSchema = reader.getSchema();
            writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[]{original, e});
            session.transfer(original, REL_FAILURE);
            return;
        }
        final Map originalAttributes = original.getAttributes();
        int recordsRead = 0;
        try {
            for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
                if (!descriptor.isDynamic()) continue;
                Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
                FlowFile transformed = session.create(original);
                boolean flowFileRemoved = false;
                try {
                    String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
                    final AtomicReference writeResultRef = new AtomicReference();
                    QueryResult queryResult = this.query(session, original, readerSchema, sql, recordReaderFactory);
                    final AtomicReference mimeTypeRef = new AtomicReference();
                    try {
                        final ResultSet rs = queryResult.getResultSet();
                        transformed = session.write(transformed, new OutputStreamCallback(){

                            public void process(OutputStream out) throws IOException {
                                RecordSchema writeSchema;
                                ResultSetRecordSet recordSet;
                                try {
                                    recordSet = new ResultSetRecordSet(rs, writerSchema);
                                    RecordSchema resultSetSchema = recordSet.getSchema();
                                    writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
                                }
                                catch (SQLException | SchemaNotFoundException e) {
                                    throw new ProcessException(e);
                                }
                                try (RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(QueryRecord.this.getLogger(), writeSchema, out, original);){
                                    writeResultRef.set(resultSetWriter.write((RecordSet)recordSet));
                                    mimeTypeRef.set(resultSetWriter.getMimeType());
                                }
                                catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        });
                    }
                    catch (Throwable throwable) {
                        this.closeQuietly(queryResult);
                        throw throwable;
                    }
                    this.closeQuietly(queryResult);
                    recordsRead = Math.max(recordsRead, queryResult.getRecordsRead());
                    WriteResult result = (WriteResult)writeResultRef.get();
                    if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean().booleanValue()) {
                        session.remove(transformed);
                        flowFileRemoved = true;
                        transformedFlowFiles.remove(transformed);
                        this.getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[]{original});
                        continue;
                    }
                    HashMap attributesToAdd = new HashMap();
                    if (result.getAttributes() != null) {
                        attributesToAdd.putAll(result.getAttributes());
                    }
                    attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                    attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
                    transformed = session.putAllAttributes(transformed, attributesToAdd);
                    transformedFlowFiles.put(transformed, relationship);
                    session.adjustCounter("Records Written", (long)result.getRecordCount(), false);
                }
                finally {
                    if (flowFileRemoved) continue;
                    createdFlowFiles.add(transformed);
                }
            }
            long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
            if (transformedFlowFiles.size() > 0) {
                session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
                for (Map.Entry entry : transformedFlowFiles.entrySet()) {
                    FlowFile transformed = (FlowFile)entry.getKey();
                    Relationship relationship = (Relationship)entry.getValue();
                    session.getProvenanceReporter().route(transformed, relationship);
                    session.transfer(transformed, relationship);
                }
            }
            this.getLogger().info("Successfully queried {} in {} millis", new Object[]{original, elapsedMillis});
            session.transfer(original, REL_ORIGINAL);
        }
        catch (SQLException e) {
            this.getLogger().error("Unable to query {} due to {}", new Object[]{original, e.getCause() == null ? e : e.getCause()});
            session.remove(createdFlowFiles);
            session.transfer(original, REL_FAILURE);
        }
        catch (Exception e) {
            this.getLogger().error("Unable to query {} due to {}", new Object[]{original, e});
            session.remove(createdFlowFiles);
            session.transfer(original, REL_FAILURE);
        }
        session.adjustCounter("Records Read", (long)recordsRead, false);
    }

    private synchronized CachedStatement getStatement(String sql, RecordSchema schema, Supplier<CachedStatement> statementBuilder) {
        Tuple tuple = new Tuple((Object)sql, (Object)schema);
        BlockingQueue statementQueue = (BlockingQueue)this.statementQueues.get((Object)tuple, key -> new LinkedBlockingQueue());
        CachedStatement cachedStmt = (CachedStatement)statementQueue.poll();
        if (cachedStmt != null) {
            return cachedStmt;
        }
        return statementBuilder.get();
    }

    private CachedStatement buildCachedStatement(String sql, ProcessSession session, FlowFile flowFile, RecordSchema schema, RecordReaderFactory recordReaderFactory) {
        CalciteConnection connection = this.createConnection();
        SchemaPlus rootSchema = this.createRootSchema(connection);
        FlowFileTable flowFileTable = new FlowFileTable(session, flowFile, schema, recordReaderFactory, this.getLogger());
        rootSchema.add("FLOWFILE", (Table)flowFileTable);
        rootSchema.setCacheEnabled(false);
        try {
            PreparedStatement stmt = connection.prepareStatement(sql);
            return new CachedStatement(stmt, flowFileTable, (Connection)connection);
        }
        catch (SQLException e) {
            throw new ProcessException((Throwable)e);
        }
    }

    private CalciteConnection createConnection() {
        Properties properties = new Properties();
        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
        try {
            Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
            CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
            return calciteConnection;
        }
        catch (Exception e) {
            throw new ProcessException((Throwable)e);
        }
    }

    protected QueryResult query(ProcessSession session, FlowFile flowFile, final RecordSchema schema, final String sql, RecordReaderFactory recordReaderFactory) throws SQLException {
        ResultSet rs;
        Supplier<CachedStatement> statementBuilder = () -> this.buildCachedStatement(sql, session, flowFile, schema, recordReaderFactory);
        final CachedStatement cachedStatement = this.getStatement(sql, schema, statementBuilder);
        PreparedStatement stmt = cachedStatement.getStatement();
        final FlowFileTable table = cachedStatement.getTable();
        table.setFlowFile(session, flowFile);
        try {
            rs = stmt.executeQuery();
        }
        catch (Throwable t) {
            table.close();
            throw t;
        }
        return new QueryResult(){

            @Override
            public void close() throws IOException {
                table.close();
                BlockingQueue statementQueue = (BlockingQueue)QueryRecord.this.statementQueues.getIfPresent((Object)new Tuple((Object)sql, (Object)schema));
                if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
                    try {
                        cachedStatement.getConnection().close();
                    }
                    catch (SQLException e) {
                        throw new IOException("Failed to close statement", e);
                    }
                }
            }

            @Override
            public ResultSet getResultSet() {
                return rs;
            }

            @Override
            public int getRecordsRead() {
                return table.getRecordsRead();
            }
        };
    }

    private SchemaPlus createRootSchema(CalciteConnection calciteConnection) {
        SchemaPlus rootSchema = calciteConnection.getRootSchema();
        rootSchema.add("RPATH", (Function)ScalarFunctionImpl.create(ObjectRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_STRING", (Function)ScalarFunctionImpl.create(StringRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_INT", (Function)ScalarFunctionImpl.create(IntegerRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_LONG", (Function)ScalarFunctionImpl.create(LongRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_DATE", (Function)ScalarFunctionImpl.create(DateRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_DOUBLE", (Function)ScalarFunctionImpl.create(DoubleRecordPath.class, (String)"eval"));
        rootSchema.add("RPATH_FLOAT", (Function)ScalarFunctionImpl.create(FloatRecordPath.class, (String)"eval"));
        return rootSchema;
    }

    private void closeQuietly(AutoCloseable ... closeables) {
        if (closeables == null) {
            return;
        }
        for (AutoCloseable closeable : closeables) {
            if (closeable == null) continue;
            try {
                closeable.close();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to close SQL resource", (Throwable)e);
            }
        }
    }

    public static class RecordPathFunction {
        private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
        private static final RecordSchema ROOT_RECORD_SCHEMA = new SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
        private static final RecordField PARENT_RECORD_FIELD = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));
        protected static final RecordPathCache RECORD_PATH_CACHE = new RecordPathCache(100);

        protected <T> T eval(Object record, String recordPath, java.util.function.Function<Object, T> transform) {
            if (record == null) {
                return null;
            }
            if (record instanceof Record) {
                return this.eval((Record)record, recordPath, transform);
            }
            if (record instanceof Record[]) {
                return this.eval((Record[])record, recordPath, transform);
            }
            if (record instanceof Iterable) {
                return this.eval((Iterable)record, recordPath, transform);
            }
            if (record instanceof Map) {
                return this.eval((Map)record, recordPath, transform);
            }
            throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
        }

        private <T> T eval(Map<?, ?> map, String recordPath, java.util.function.Function<Object, T> transform) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            MapRecord record = new MapRecord(ROOT_RECORD_SCHEMA, Collections.singletonMap("root", map));
            StandardFieldValue parentFieldValue = new StandardFieldValue((Object)record, PARENT_RECORD_FIELD, null);
            StandardFieldValue fieldValue = new StandardFieldValue(map, ROOT_RECORD_FIELD, (FieldValue)parentFieldValue);
            RecordPathResult result = compiled.evaluate((Record)record, (FieldValue)fieldValue);
            return this.evalResults(result.getSelectedFields(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
        }

        private <T> T eval(Record record, String recordPath, java.util.function.Function<Object, T> transform) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            RecordPathResult result = compiled.evaluate(record);
            return this.evalResults(result.getSelectedFields(), transform, () -> "RecordPath " + recordPath + " evaluated against " + record + " resulted in more than one return value. The RecordPath must be further constrained.");
        }

        private <T> T eval(Record[] records, String recordPath, java.util.function.Function<Object, T> transform) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            ArrayList selectedFields = new ArrayList();
            for (Record record : records) {
                RecordPathResult result = compiled.evaluate(record);
                result.getSelectedFields().forEach(selectedFields::add);
            }
            if (selectedFields.isEmpty()) {
                return null;
            }
            return this.evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
        }

        private <T> T eval(Iterable<Record> records, String recordPath, java.util.function.Function<Object, T> transform) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            ArrayList selectedFields = new ArrayList();
            for (Record record : records) {
                RecordPathResult result = compiled.evaluate(record);
                result.getSelectedFields().forEach(selectedFields::add);
            }
            if (selectedFields.isEmpty()) {
                return null;
            }
            return this.evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
        }

        private <T> T evalResults(Stream<FieldValue> fields, java.util.function.Function<Object, T> transform, Supplier<String> multipleReturnValueErrorSupplier) {
            return fields.map(FieldValue::getValue).filter(Objects::nonNull).map(transform).reduce((a, b) -> {
                throw new RuntimeException((String)multipleReturnValueErrorSupplier.get());
            }).orElse(null);
        }
    }

    public static class RecordRecordPath
    extends RecordPathFunction {
        public Record eval(Object record, String recordPath) {
            return this.eval(record, recordPath, Record.class::cast);
        }
    }

    public static class DateRecordPath
    extends RecordPathFunction {
        public Long eval(Object record, String recordPath) {
            return this.eval(record, recordPath, val -> {
                if (val instanceof Number) {
                    return ((Number)val).longValue();
                }
                if (val instanceof String) {
                    throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Date against " + record + " because the value returned is of type String. To parse a String value as a Date, please use the toDate function. For example, SELECT RPATH_DATE( record, 'toDate( /event/timestamp, \"yyyy-MM-dd\" )' ) AS eventDate FROM FLOWFILE");
                }
                if (val instanceof Date) {
                    return ((Date)val).getTime();
                }
                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Date against " + record + " because the value returned is of type " + val.getClass());
            });
        }
    }

    public static class DoubleRecordPath
    extends RecordPathFunction {
        public Double eval(Object record, String recordPath) {
            return this.eval(record, recordPath, val -> {
                if (val instanceof Number) {
                    return ((Number)val).doubleValue();
                }
                if (val instanceof String) {
                    return Double.parseDouble((String)val);
                }
                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Double against " + record + " because the value returned is of type " + val.getClass());
            });
        }
    }

    public static class FloatRecordPath
    extends RecordPathFunction {
        public Float eval(Object record, String recordPath) {
            return this.eval(record, recordPath, val -> {
                if (val instanceof Number) {
                    return Float.valueOf(((Number)val).floatValue());
                }
                if (val instanceof String) {
                    return Float.valueOf(Float.parseFloat((String)val));
                }
                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Float against " + record + " because the value returned is of type " + val.getClass());
            });
        }
    }

    public static class LongRecordPath
    extends RecordPathFunction {
        public Long eval(Object record, String recordPath) {
            return this.eval(record, recordPath, val -> {
                if (val instanceof Number) {
                    return ((Number)val).longValue();
                }
                if (val instanceof String) {
                    return Long.parseLong((String)val);
                }
                if (val instanceof Date) {
                    return ((Date)val).getTime();
                }
                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Long against " + record + " because the value returned is of type " + val.getClass());
            });
        }
    }

    public static class IntegerRecordPath
    extends RecordPathFunction {
        public Integer eval(Object record, String recordPath) {
            return this.eval(record, recordPath, val -> {
                if (val instanceof Number) {
                    return ((Number)val).intValue();
                }
                if (val instanceof String) {
                    return Integer.parseInt((String)val);
                }
                if (val instanceof Date) {
                    return (int)((Date)val).getTime();
                }
                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Integer against " + record + " because the value returned is of type " + val.getClass());
            });
        }
    }

    public static class StringRecordPath
    extends RecordPathFunction {
        public String eval(Object record, String recordPath) {
            return this.eval(record, recordPath, Object::toString);
        }
    }

    public static class ObjectRecordPath
    extends RecordPathFunction {
        private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
        private static final RecordSchema ROOT_RECORD_SCHEMA = new SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
        private static final RecordField PARENT_RECORD_FIELD = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));

        public Object eval(Object record, String recordPath) {
            if (record == null) {
                return null;
            }
            if (record instanceof Record) {
                return this.eval((Record)record, recordPath);
            }
            if (record instanceof Record[]) {
                return this.eval((Record[])record, recordPath);
            }
            if (record instanceof Iterable) {
                return this.eval((Iterable)record, recordPath);
            }
            if (record instanceof Map) {
                return this.eval((Map)record, recordPath);
            }
            throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
        }

        private Object eval(Map<?, ?> map, String recordPath) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            MapRecord record = new MapRecord(ROOT_RECORD_SCHEMA, Collections.singletonMap("root", map));
            StandardFieldValue parentFieldValue = new StandardFieldValue((Object)record, PARENT_RECORD_FIELD, null);
            StandardFieldValue fieldValue = new StandardFieldValue(map, ROOT_RECORD_FIELD, (FieldValue)parentFieldValue);
            RecordPathResult result = compiled.evaluate((Record)record, (FieldValue)fieldValue);
            List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
            return this.evalResults(selectedFields);
        }

        private Object eval(Record record, String recordPath) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            RecordPathResult result = compiled.evaluate(record);
            List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
            return this.evalResults(selectedFields);
        }

        private Object eval(Iterable<Record> records, String recordPath) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            ArrayList<FieldValue> selectedFields = new ArrayList<FieldValue>();
            for (Record record : records) {
                RecordPathResult result = compiled.evaluate(record);
                result.getSelectedFields().forEach(selectedFields::add);
            }
            return this.evalResults(selectedFields);
        }

        private Object eval(Record[] records, String recordPath) {
            RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
            ArrayList<FieldValue> selectedFields = new ArrayList<FieldValue>();
            for (Record record : records) {
                RecordPathResult result = compiled.evaluate(record);
                result.getSelectedFields().forEach(selectedFields::add);
            }
            return this.evalResults(selectedFields);
        }

        private Object evalResults(List<FieldValue> selectedFields) {
            if (selectedFields.isEmpty()) {
                return null;
            }
            if (selectedFields.size() == 1) {
                return selectedFields.get(0).getValue();
            }
            return selectedFields.stream().map(FieldValue::getValue).toArray();
        }
    }

    private static class CachedStatement {
        private final FlowFileTable table;
        private final PreparedStatement statement;
        private final Connection connection;

        public CachedStatement(PreparedStatement statement, FlowFileTable table, Connection connection) {
            this.statement = statement;
            this.table = table;
            this.connection = connection;
        }

        public FlowFileTable getTable() {
            return this.table;
        }

        public PreparedStatement getStatement() {
            return this.statement;
        }

        public Connection getConnection() {
            return this.connection;
        }
    }

    private static interface QueryResult
    extends Closeable {
        public ResultSet getResultSet();

        public int getRecordsRead();
    }

    private static class SqlValidator
    implements Validator {
        private SqlValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            if (context.isExpressionLanguagePresent(input)) {
                return new ValidationResult.Builder().input(input).subject(subject).valid(true).explanation("Expression Language Present").build();
            }
            String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
            SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL_ANSI).build();
            SqlParser parser = SqlParser.create((String)substituted, (SqlParser.Config)config);
            try {
                parser.parseStmt();
                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
            }
            catch (Exception e) {
                return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Not a valid SQL Statement: " + e.getMessage()).build();
            }
        }
    }
}

