/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.HadoopValidators;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;

@TriggerWhenEmpty
@DefaultSettings(yieldDuration="100 ms")
public abstract class AbstractPutHDFSRecord
extends AbstractHadoopProcessor {
    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder().name("compression-type").displayName("Compression Type").description("The type of compression for the file being written.").required(true).build();
    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder().name("overwrite").displayName("Overwrite Files").description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, flow files will be routed to failure when a file exists in the same directory with the same name.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder().name("permissions-umask").displayName("Permissions umask").description("A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode").addValidator(HadoopValidators.UMASK_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("remote-owner").displayName("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("remote-group").displayName("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Flow Files that have been successfully processed are transferred to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile String remoteOwner;
    private volatile String remoteGroup;
    private volatile Set<Relationship> putHdfsRecordRelationships;
    private volatile List<PropertyDescriptor> putHdfsRecordProperties;

    protected final void init(ProcessorInitializationContext context) {
        super.init(context);
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_RETRY);
        rels.add(REL_FAILURE);
        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.add(RECORD_READER);
        props.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(DIRECTORY).description("The parent directory to which files should be written. Will be created if it doesn't exist.").build());
        AllowableValue[] compressionTypes = this.getCompressionTypes(context).toArray(new AllowableValue[0]);
        props.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(COMPRESSION_TYPE).allowableValues(compressionTypes).defaultValue(this.getDefaultCompressionType(context)).build());
        props.add(OVERWRITE);
        props.add(UMASK);
        props.add(REMOTE_GROUP);
        props.add(REMOTE_OWNER);
        props.addAll(this.getAdditionalProperties());
        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    }

    public abstract List<AllowableValue> getCompressionTypes(ProcessorInitializationContext var1);

    public abstract String getDefaultCompressionType(ProcessorInitializationContext var1);

    public List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.emptyList();
    }

    public final Set<Relationship> getRelationships() {
        return this.putHdfsRecordRelationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.putHdfsRecordProperties;
    }

    protected void preProcessConfiguration(Configuration config, ProcessContext context) {
        PropertyValue umaskProp = context.getProperty(UMASK);
        short dfsUmask = umaskProp.isSet() ? (short)Short.parseShort(umaskProp.getValue(), 8) : (short)18;
        FsPermission.setUMask((Configuration)config, (FsPermission)new FsPermission(dfsUmask));
    }

    @OnScheduled
    public final void onScheduled(ProcessContext context) throws IOException {
        super.abstractOnScheduled(context);
        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    }

    public abstract HDFSRecordWriter createHDFSRecordWriter(ProcessContext var1, FlowFile var2, Configuration var3, Path var4, RecordSchema var5) throws IOException, SchemaNotFoundException;

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FileSystem fileSystem = this.getFileSystem();
        Configuration configuration = this.getConfiguration();
        UserGroupInformation ugi = this.getUserGroupInformation();
        if (configuration == null || fileSystem == null || ugi == null) {
            this.getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
            context.yield();
            return;
        }
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            context.yield();
            return;
        }
        ugi.doAs(() -> {
            Path tempDotCopyFile = null;
            FlowFile putFlowFile = flowFile;
            try {
                String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
                String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
                Path directoryPath = new Path(directoryValue);
                this.createDirectory(fileSystem, directoryPath, this.remoteOwner, this.remoteGroup);
                Path tempFile = new Path(directoryPath, "." + filenameValue);
                Path destFile = new Path(directoryPath, filenameValue);
                boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
                boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
                if (destinationExists && !shouldOverwrite) {
                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
                    this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
                    return null;
                }
                AtomicReference<Object> exceptionHolder = new AtomicReference<Object>(null);
                AtomicReference writeResult = new AtomicReference();
                RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
                FlowFile flowFileIn = putFlowFile;
                StopWatch stopWatch = new StopWatch(true);
                session.read(putFlowFile, in -> {
                    RecordReader recordReader = null;
                    HDFSRecordWriter recordWriter = null;
                    try {
                        recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, this.getLogger());
                    }
                    catch (Exception e) {
                        RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
                        exceptionHolder.set(rrfe);
                        IOUtils.closeQuietly((Closeable)recordReader);
                        IOUtils.closeQuietly(recordWriter);
                        return;
                    }
                    try {
                        RecordSet recordSet = recordReader.createRecordSet();
                        recordWriter = this.createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
                        writeResult.set(recordWriter.write(recordSet));
                    }
                    catch (Exception e) {
                        try {
                            exceptionHolder.set(e);
                        }
                        catch (Throwable throwable) {
                            IOUtils.closeQuietly(recordReader);
                            IOUtils.closeQuietly(recordWriter);
                            throw throwable;
                        }
                        IOUtils.closeQuietly((Closeable)recordReader);
                        IOUtils.closeQuietly(recordWriter);
                    }
                    IOUtils.closeQuietly((Closeable)recordReader);
                    IOUtils.closeQuietly((Closeable)recordWriter);
                });
                stopWatch.stop();
                String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
                long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                tempDotCopyFile = tempFile;
                if (exceptionHolder.get() != null) {
                    throw (Throwable)exceptionHolder.get();
                }
                this.rename(fileSystem, tempFile, destFile);
                this.changeOwner(fileSystem, destFile, this.remoteOwner, this.remoteGroup);
                this.getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
                putFlowFile = this.postProcess(context, session, putFlowFile, destFile);
                String newFilename = destFile.getName();
                String hdfsPath = destFile.getParent().toString();
                HashMap<String, String> attributes = new HashMap<String, String>(((WriteResult)writeResult.get()).getAttributes());
                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
                attributes.put("absolute.hdfs.path", hdfsPath);
                attributes.put(RECORD_COUNT_ATTR, String.valueOf(((WriteResult)writeResult.get()).getRecordCount()));
                putFlowFile = session.putAllAttributes(putFlowFile, attributes);
                Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
                session.transfer(putFlowFile, REL_SUCCESS);
            }
            catch (IOException | FlowFileAccessException e) {
                this.deleteQuietly(fileSystem, tempDotCopyFile);
                this.getLogger().error("Failed to write due to {}", new Object[]{e});
                session.transfer(session.penalize(putFlowFile), REL_RETRY);
                context.yield();
            }
            catch (Throwable t) {
                this.deleteQuietly(fileSystem, tempDotCopyFile);
                this.getLogger().error("Failed to write due to {}", new Object[]{t});
                session.transfer(putFlowFile, REL_FAILURE);
            }
            return null;
        });
    }

    protected FlowFile postProcess(ProcessContext context, ProcessSession session, FlowFile flowFile, Path destFile) {
        return flowFile;
    }

    protected void rename(FileSystem fileSystem, Path srcFile, Path destFile) throws IOException, InterruptedException, FailureException {
        boolean renamed = false;
        for (int i = 0; i < 10; ++i) {
            if (fileSystem.rename(srcFile, destFile)) {
                renamed = true;
                break;
            }
            Thread.sleep(200L);
        }
        if (!renamed) {
            fileSystem.delete(srcFile, false);
            throw new FailureException("Could not rename file " + srcFile + " to its final filename");
        }
    }

    protected void deleteQuietly(FileSystem fileSystem, Path file) {
        if (file != null) {
            try {
                fileSystem.delete(file, false);
            }
            catch (Exception e) {
                this.getLogger().error("Unable to remove file {} due to {}", new Object[]{file, e});
            }
        }
    }

    protected void changeOwner(FileSystem fileSystem, Path path, String remoteOwner, String remoteGroup) {
        try {
            if (remoteOwner != null || remoteGroup != null) {
                fileSystem.setOwner(path, remoteOwner, remoteGroup);
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e});
        }
    }

    protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup) throws IOException, FailureException {
        try {
            if (!fileSystem.getFileStatus(directory).isDirectory()) {
                throw new FailureException(directory.toString() + " already exists and is not a directory");
            }
        }
        catch (FileNotFoundException fe) {
            if (!fileSystem.mkdirs(directory)) {
                throw new FailureException(directory.toString() + " could not be created");
            }
            this.changeOwner(fileSystem, directory, remoteOwner, remoteGroup);
        }
    }
}

