/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor;
import org.apache.nifi.processors.kite.AbstractKiteProcessor;
import org.apache.nifi.processors.kite.AvroUtil;
import org.apache.nifi.processors.kite.FailureTracker;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.JSONFileReader;

@Tags(value={"kite", "json", "avro"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Converts JSON files to Avro according to an Avro Schema")
public class ConvertJSONToAvro
extends AbstractKiteConvertProcessor {
    private static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Avro content that was converted successfully from JSON").build();
    private static final Relationship FAILURE = new Relationship.Builder().name("failure").description("JSON content that could not be processed").build();
    private static final Relationship INCOMPATIBLE = new Relationship.Builder().name("incompatible").description("JSON content that could not be converted").build();
    @VisibleForTesting
    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder().name("Record schema").description("Outgoing Avro schema for each record created from a JSON object").addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.builder().addAll(AbstractKiteProcessor.getProperties()).add((Object)SCHEMA).add((Object)COMPRESSION_TYPE).build();
    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.builder().add((Object)SUCCESS).add((Object)FAILURE).add((Object)INCOMPATIBLE).build();

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Schema schema;
        FlowFile incomingJSON = session.get();
        if (incomingJSON == null) {
            return;
        }
        String schemaProperty = context.getProperty(SCHEMA).evaluateAttributeExpressions(incomingJSON).getValue();
        try {
            schema = ConvertJSONToAvro.getSchema(schemaProperty, DefaultConfiguration.get());
        }
        catch (SchemaNotFoundException e) {
            this.getLogger().error("Cannot find schema: " + schemaProperty);
            session.transfer(incomingJSON, FAILURE);
            return;
        }
        final DataFileWriter writer = new DataFileWriter(AvroUtil.newDatumWriter(schema, GenericData.Record.class));
        writer.setCodec(this.getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
        try {
            final AtomicLong written = new AtomicLong(0L);
            final FailureTracker failures = new FailureTracker();
            FlowFile badRecords = session.clone(incomingJSON);
            FlowFile outgoingAvro = session.write(incomingJSON, new StreamCallback(){

                public void process(InputStream in, OutputStream out) throws IOException {
                    try (JSONFileReader reader = new JSONFileReader(in, schema, GenericData.Record.class);){
                        reader.initialize();
                        try (DataFileWriter w = writer.create(schema, out);){
                            while (reader.hasNext()) {
                                try {
                                    GenericData.Record record = (GenericData.Record)reader.next();
                                    w.append((Object)record);
                                    written.incrementAndGet();
                                }
                                catch (DatasetRecordException e) {
                                    failures.add(e);
                                }
                            }
                        }
                    }
                }
            });
            long errors = failures.count();
            session.adjustCounter("Converted records", written.get(), false);
            session.adjustCounter("Conversion errors", errors, false);
            if (written.get() > 0L) {
                outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                session.transfer(outgoingAvro, SUCCESS);
                if (errors > 0L) {
                    this.getLogger().warn("Failed to convert {}/{} records from JSON to Avro", new Object[]{errors, errors + written.get()});
                    badRecords = session.putAttribute(badRecords, "errors", failures.summary());
                    session.transfer(badRecords, INCOMPATIBLE);
                } else {
                    session.remove(badRecords);
                }
            } else {
                session.remove(outgoingAvro);
                if (errors > 0L) {
                    this.getLogger().warn("Failed to convert {}/{} records from JSON to Avro", new Object[]{errors, errors});
                    badRecords = session.putAttribute(badRecords, "errors", failures.summary());
                } else {
                    badRecords = session.putAttribute(badRecords, "errors", "No incoming records");
                }
                session.transfer(badRecords, FAILURE);
            }
        }
        catch (ProcessException | DatasetIOException e) {
            this.getLogger().error("Failed reading or writing", e);
            session.transfer(incomingJSON, FAILURE);
        }
        catch (DatasetException e) {
            this.getLogger().error("Failed to read FlowFile", (Throwable)e);
            session.transfer(incomingJSON, FAILURE);
        }
        finally {
            try {
                writer.close();
            }
            catch (IOException e) {
                this.getLogger().warn("Unable to close writer ressource", (Throwable)e);
            }
        }
    }
}

