/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.commons.lang.LocaleUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor;
import org.apache.nifi.processors.kite.AbstractKiteProcessor;
import org.apache.nifi.processors.kite.AvroRecordConverter;
import org.apache.nifi.processors.kite.AvroUtil;
import org.apache.nifi.processors.kite.FailureTracker;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration;

@Tags(value={"avro", "convert", "kite"})
@CapabilityDescription(value="Convert records from one Avro schema to another, including support for flattening and simple type conversions")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name="Field name from input schema", value="Field name for output schema", description="Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
public class ConvertAvroSchema
extends AbstractKiteConvertProcessor {
    private static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Avro content that converted successfully").build();
    private static final Relationship FAILURE = new Relationship.Builder().name("failure").description("Avro content that failed to convert").build();
    protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator(){

        public ValidationResult validate(String subject, String uri, ValidationContext context) {
            boolean elPresent;
            Configuration conf = AbstractKiteProcessor.getConfiguration(context.getProperty(AbstractKiteProcessor.CONF_XML_FILES).getValue());
            String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
            String error = null;
            boolean bl = elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
            if (!elPresent) {
                try {
                    Schema outputSchema = AbstractKiteProcessor.getSchema(uri, conf);
                    Schema inputSchema = AbstractKiteProcessor.getSchema(inputUri, conf);
                    HashMap<String, String> fieldMapping = new HashMap<String, String>();
                    for (Map.Entry entry : context.getProperties().entrySet()) {
                        if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
                        fieldMapping.put(((PropertyDescriptor)entry.getKey()).getName(), (String)entry.getValue());
                    }
                    AvroRecordConverter converter = new AvroRecordConverter(inputSchema, outputSchema, fieldMapping);
                    Collection<String> unmappedFields = converter.getUnmappedFields();
                    if (unmappedFields.size() > 0) {
                        error = "The following fields are unmapped: " + unmappedFields;
                    }
                }
                catch (SchemaNotFoundException e) {
                    error = e.getMessage();
                }
            }
            return new ValidationResult.Builder().subject(subject).input(uri).explanation(error).valid(error == null).build();
        }
    };
    public static final String DEFAULT_LOCALE_VALUE = "default";
    public static final Validator LOCALE_VALIDATOR = new Validator(){

        public ValidationResult validate(String subject, String value, ValidationContext context) {
            String reason = null;
            if (!value.equals(ConvertAvroSchema.DEFAULT_LOCALE_VALUE)) {
                try {
                    Locale locale = LocaleUtils.toLocale((String)value);
                    if (locale == null) {
                        reason = "null locale returned";
                    } else if (!LocaleUtils.isAvailableLocale((Locale)locale)) {
                        reason = "locale not available";
                    }
                }
                catch (IllegalArgumentException e) {
                    reason = "invalid format for locale";
                }
            }
            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
        }
    };
    @VisibleForTesting
    static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder().name("Input Schema").description("Avro Schema of Input Flowfiles.  This can be a URI (dataset, view, or resource) or literal JSON schema.").addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    @VisibleForTesting
    static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder().name("Output Schema").description("Avro Schema of Output Flowfiles.  This can be a URI (dataset, view, or resource) or literal JSON schema.").addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    @VisibleForTesting
    static final PropertyDescriptor LOCALE = new PropertyDescriptor.Builder().name("Locale").description("Locale to use for scanning data (see https://docs.oracle.com/javase/7/docs/api/java/util/Locale.html)or \" default\" for JVM default").addValidator(LOCALE_VALIDATOR).defaultValue("default").build();
    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.builder().add((Object)INPUT_SCHEMA).add((Object)OUTPUT_SCHEMA).add((Object)LOCALE).add((Object)COMPRESSION_TYPE).build();
    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.builder().add((Object)SUCCESS).add((Object)FAILURE).build();
    private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern.compile("[A-Za-z_][A-Za-z0-9_\\.]*");
    protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator(){

        public ValidationResult validate(String subject, String value, ValidationContext context) {
            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
            }
            String reason = "";
            if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
                reason = subject + " is not a valid Avro fieldname";
            }
            if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
                reason = reason + value + " is not a valid Avro fieldname";
            }
            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason.equals("")).build();
        }
    };

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("Field mapping between schemas. The property name is the field name for the input schema, and the property value is the field name for the output schema. For fields not listed, the processor tries to match names from the input to the output record.").dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Schema outputSchema;
        Schema inputSchema;
        FlowFile incomingAvro = session.get();
        if (incomingAvro == null) {
            return;
        }
        String inputSchemaProperty = context.getProperty(INPUT_SCHEMA).evaluateAttributeExpressions(incomingAvro).getValue();
        try {
            inputSchema = ConvertAvroSchema.getSchema(inputSchemaProperty, DefaultConfiguration.get());
        }
        catch (SchemaNotFoundException e) {
            this.getLogger().error("Cannot find schema: " + inputSchemaProperty);
            session.transfer(incomingAvro, FAILURE);
            return;
        }
        String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA).evaluateAttributeExpressions(incomingAvro).getValue();
        try {
            outputSchema = ConvertAvroSchema.getSchema(outputSchemaProperty, DefaultConfiguration.get());
        }
        catch (SchemaNotFoundException e) {
            this.getLogger().error("Cannot find schema: " + outputSchemaProperty);
            session.transfer(incomingAvro, FAILURE);
            return;
        }
        HashMap<String, String> fieldMapping = new HashMap<String, String>();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            fieldMapping.put(((PropertyDescriptor)entry.getKey()).getName(), (String)entry.getValue());
        }
        String localeProperty = context.getProperty(LOCALE).getValue();
        Locale locale = localeProperty.equals(DEFAULT_LOCALE_VALUE) ? Locale.getDefault() : LocaleUtils.toLocale((String)localeProperty);
        final AvroRecordConverter converter = new AvroRecordConverter(inputSchema, outputSchema, fieldMapping, locale);
        final DataFileWriter writer = new DataFileWriter(AvroUtil.newDatumWriter(outputSchema, GenericData.Record.class));
        writer.setCodec(this.getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
        final DataFileWriter failureWriter = new DataFileWriter(AvroUtil.newDatumWriter(outputSchema, GenericData.Record.class));
        failureWriter.setCodec(this.getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
        try {
            final AtomicLong written = new AtomicLong(0L);
            final FailureTracker failures = new FailureTracker();
            final LinkedList badRecords = Lists.newLinkedList();
            FlowFile incomingAvroCopy = session.clone(incomingAvro);
            FlowFile outgoingAvro = session.write(incomingAvro, new StreamCallback(){

                public void process(InputStream in, OutputStream out) throws IOException {
                    try (DataFileStream stream = new DataFileStream(in, (DatumReader)new GenericDatumReader(converter.getInputSchema()));
                         DataFileWriter w = writer.create(outputSchema, out);){
                        for (GenericData.Record record : stream) {
                            try {
                                GenericData.Record converted = converter.convert(record);
                                w.append((Object)converted);
                                written.incrementAndGet();
                            }
                            catch (AvroRecordConverter.AvroConversionException e) {
                                failures.add(e);
                                ConvertAvroSchema.this.getLogger().error("Error converting data: " + e.getMessage());
                                badRecords.add(record);
                            }
                        }
                    }
                }
            });
            FlowFile badOutput = session.write(incomingAvroCopy, new StreamCallback(){

                public void process(InputStream in, OutputStream out) throws IOException {
                    try (DataFileWriter w = failureWriter.create(inputSchema, out);){
                        for (GenericData.Record record : badRecords) {
                            w.append((Object)record);
                        }
                    }
                }
            });
            long errors = failures.count();
            session.adjustCounter("Converted records", written.get(), false);
            session.adjustCounter("Conversion errors", errors, false);
            if (written.get() > 0L) {
                outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                session.transfer(outgoingAvro, SUCCESS);
            } else {
                session.remove(outgoingAvro);
                if (errors == 0L) {
                    badOutput = session.putAttribute(badOutput, "errors", "No incoming records");
                    session.transfer(badOutput, FAILURE);
                }
            }
            if (errors > 0L) {
                this.getLogger().warn("Failed to convert {}/{} records between Avro Schemas", new Object[]{errors, errors + written.get()});
                badOutput = session.putAttribute(badOutput, "errors", failures.summary());
                session.transfer(badOutput, FAILURE);
            } else {
                session.remove(badOutput);
            }
        }
        catch (ProcessException | DatasetIOException e) {
            this.getLogger().error("Failed reading or writing", e);
            session.transfer(incomingAvro, FAILURE);
        }
        catch (DatasetException e) {
            this.getLogger().error("Failed to read FlowFile", (Throwable)e);
            session.transfer(incomingAvro, FAILURE);
        }
        finally {
            try {
                writer.close();
            }
            catch (IOException e) {
                this.getLogger().warn("Unable to close writer ressource", (Throwable)e);
            }
            try {
                failureWriter.close();
            }
            catch (IOException e) {
                this.getLogger().warn("Unable to close writer ressource", (Throwable)e);
            }
        }
    }
}

