/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;

@TriggerWhenEmpty
@DefaultSettings(yieldDuration="100 ms")
public abstract class AbstractFetchHDFSRecord
extends AbstractHadoopProcessor {
    public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("filename").displayName("Filename").description("The name of the file to retrieve").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${path}/${filename}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The service for writing records to the FlowFile content").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be routed to this relationship once they have been updated with the content of the file").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved and trying again will likely not be helpful. This would occur, for instance, if the file is not found or if there is a permissions issue").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved, but might be able to be in the future if tried again. This generally indicates that the Fetch should be tried again.").build();
    public static final String FETCH_FAILURE_REASON_ATTR = "fetch.failure.reason";
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile Set<Relationship> fetchHdfsRecordRelationships;
    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;

    protected final void init(ProcessorInitializationContext context) {
        super.init(context);
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_RETRY);
        rels.add(REL_FAILURE);
        this.fetchHdfsRecordRelationships = Collections.unmodifiableSet(rels);
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.add(FILENAME);
        props.add(RECORD_WRITER);
        props.addAll(this.getAdditionalProperties());
        this.fetchHdfsRecordProperties = Collections.unmodifiableList(props);
    }

    public List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.emptyList();
    }

    public final Set<Relationship> getRelationships() {
        return this.fetchHdfsRecordRelationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.fetchHdfsRecordProperties;
    }

    public abstract HDFSRecordReader createHDFSRecordReader(ProcessContext var1, FlowFile var2, Configuration var3, Path var4) throws IOException;

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FileSystem fileSystem = this.getFileSystem();
        Configuration configuration = this.getConfiguration();
        UserGroupInformation ugi = this.getUserGroupInformation();
        if (configuration == null || fileSystem == null || ugi == null) {
            this.getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
            context.yield();
            return;
        }
        FlowFile originalFlowFile = session.get();
        if (originalFlowFile == null) {
            context.yield();
            return;
        }
        ugi.doAs(() -> {
            FlowFile child = null;
            String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
            try {
                Path path = new Path(filenameValue);
                AtomicReference<Object> exceptionHolder = new AtomicReference<Object>(null);
                AtomicReference writeResult = new AtomicReference();
                RecordSetWriterFactory recordSetWriterFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
                StopWatch stopWatch = new StopWatch(true);
                child = session.create(originalFlowFile);
                AtomicReference mimeTypeRef = new AtomicReference();
                child = session.write(child, rawOut -> {
                    try (BufferedOutputStream out = new BufferedOutputStream(rawOut);
                         HDFSRecordReader recordReader = this.createHDFSRecordReader(context, originalFlowFile, configuration, path);){
                        Record record = recordReader.nextRecord();
                        RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile.getAttributes(), record == null ? null : record.getSchema());
                        try (RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(this.getLogger(), schema, (OutputStream)out);){
                            recordSetWriter.beginRecordSet();
                            if (record != null) {
                                recordSetWriter.write(record);
                            }
                            while ((record = recordReader.nextRecord()) != null) {
                                recordSetWriter.write(record);
                            }
                            writeResult.set(recordSetWriter.finishRecordSet());
                            mimeTypeRef.set(recordSetWriter.getMimeType());
                        }
                    }
                    catch (Exception e) {
                        exceptionHolder.set(e);
                    }
                });
                stopWatch.stop();
                if (exceptionHolder.get() != null) {
                    throw (Throwable)exceptionHolder.get();
                }
                FlowFile successFlowFile = this.postProcess(context, session, child, path);
                HashMap<String, String> attributes = new HashMap<String, String>(((WriteResult)writeResult.get()).getAttributes());
                attributes.put(RECORD_COUNT_ATTR, String.valueOf(((WriteResult)writeResult.get()).getRecordCount()));
                attributes.put(CoreAttributes.MIME_TYPE.key(), (String)mimeTypeRef.get());
                successFlowFile = session.putAllAttributes(successFlowFile, attributes);
                Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                this.getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[]{qualifiedPath, successFlowFile, stopWatch.getDuration()});
                session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                session.transfer(successFlowFile, REL_SUCCESS);
                session.remove(originalFlowFile);
                return null;
            }
            catch (FileNotFoundException | AccessControlException e) {
                this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, originalFlowFile, e});
                FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage());
                session.transfer(failureFlowFile, REL_FAILURE);
            }
            catch (IOException | FlowFileAccessException e) {
                this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[]{filenameValue, originalFlowFile, e});
                session.transfer(session.penalize(originalFlowFile), REL_RETRY);
                context.yield();
            }
            catch (Throwable t) {
                this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, originalFlowFile, t});
                FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage());
                session.transfer(failureFlowFile, REL_FAILURE);
            }
            if (child != null) {
                session.remove(child);
            }
            return null;
        });
    }

    protected FlowFile postProcess(ProcessContext context, ProcessSession session, FlowFile flowFile, Path fetchPath) {
        return flowFile;
    }
}

