/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.RecordSchema;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SideEffectFree
@Tags(value={"record", "generic", "schema", "json", "csv", "avro", "freeform", "text", "xml"})
@WritesAttributes(value={@WritesAttribute(attribute="record.error.message", description="This attribute provides on failure the error message encountered by the Reader."), @WritesAttribute(attribute="avro.schema", description="This attribute provides the schema extracted from the input FlowFile using the provided RecordReader.")})
@CapabilityDescription(value="Extracts the record schema from the FlowFile using the supplied Record Reader and writes it to the `avro.schema` attribute.")
public class ExtractRecordSchema
extends AbstractProcessor {
    public static final String SCHEMA_ATTRIBUTE_NAME = "avro.schema";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor SCHEMA_CACHE_SIZE = new PropertyDescriptor.Builder().name("cache-size").displayName("Schema Cache Size").description("Specifies the number of schemas to cache. This value should reflect the expected number of different schemas that may be in the incoming FlowFiles. This ensures more efficient retrieval of the schemas and thus the processor performance.").defaultValue("10").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles whose record schemas are successfully extracted will be routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile's record schema cannot be extracted from the configured input format, the FlowFile will be routed to this relationship").build();
    static final List<PropertyDescriptor> properties = Arrays.asList(RECORD_READER, SCHEMA_CACHE_SIZE);
    private LoadingCache<RecordSchema, String> avroSchemaTextCache;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        return relationships;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        int cacheSize = context.getProperty(SCHEMA_CACHE_SIZE).asInteger();
        this.avroSchemaTextCache = Caffeine.newBuilder().maximumSize((long)cacheSize).build(schema -> AvroTypeUtil.extractAvroSchema((RecordSchema)schema).toString());
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        RecordSchema recordSchema;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        Map originalAttributes = flowFile.getAttributes();
        try (InputStream inputStream = session.read(flowFile);
             RecordReader reader = readerFactory.createRecordReader(originalAttributes, inputStream, flowFile.getSize(), this.getLogger());){
            recordSchema = reader.getSchema();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
            Throwable c = e.getCause();
            if (c == null) {
                session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
            } else {
                session.putAttribute(flowFile, "record.error.message", c.getLocalizedMessage() != null ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
            }
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        session.putAttribute(flowFile, SCHEMA_ATTRIBUTE_NAME, (String)this.avroSchemaTextCache.get((Object)recordSchema));
        session.transfer(flowFile, REL_SUCCESS);
    }
}

