/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import org.apache.gobblin.codec.StreamCodec;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metadata.types.GlobalMetadata;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.FsDataWriterBuilder;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.MetadataAwareWriter;
import org.apache.gobblin.writer.PartitionIdentifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FsDataWriter<D>
implements DataWriter<D>,
FinalState,
MetadataAwareWriter,
SpeculativeAttemptAwareConstruct {
    private static final Logger LOG = LoggerFactory.getLogger(FsDataWriter.class);
    public static final String WRITER_INCLUDE_RECORD_COUNT_IN_FILE_NAMES = "writer.include.record.count.in.file.names";
    public static final String FS_WRITER_METRICS_KEY = "fs_writer_metrics";
    protected final State properties;
    protected final String id;
    protected final int numBranches;
    protected final int branchId;
    protected final String fileName;
    protected final FileSystem fs;
    protected final Path stagingFile;
    protected final String partitionKey;
    private final GlobalMetadata defaultMetadata;
    protected Path outputFile;
    protected final String allOutputFilesPropName;
    protected final boolean shouldIncludeRecordCountInFileName;
    protected final int bufferSize;
    protected final short replicationFactor;
    protected final long blockSize;
    protected final FsPermission filePermission;
    protected final FsPermission dirPermission;
    protected final Optional<String> group;
    protected final Closer closer = Closer.create();
    protected final Optional<String> writerAttemptIdOptional;
    protected Optional<Long> bytesWritten;
    private final List<StreamCodec> encoders;

    public FsDataWriter(FsDataWriterBuilder<?, ?> builder, State properties) throws IOException {
        this.properties = properties;
        this.id = builder.getWriterId();
        this.numBranches = builder.getBranches();
        this.branchId = builder.getBranch();
        this.fileName = builder.getFileName(properties);
        this.writerAttemptIdOptional = Optional.fromNullable((Object)builder.getWriterAttemptId());
        this.encoders = builder.getEncoders();
        Configuration conf = new Configuration();
        JobConfigurationUtils.putStateIntoConfiguration((State)properties, (Configuration)conf);
        this.fs = WriterUtils.getWriterFS((State)properties, (int)this.numBranches, (int)this.branchId);
        Path writerStagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir((State)properties, (int)this.numBranches, (int)this.branchId, (String)((String)this.writerAttemptIdOptional.get())) : WriterUtils.getWriterStagingDir((State)properties, (int)this.numBranches, (int)this.branchId);
        this.stagingFile = new Path(writerStagingDir, this.fileName);
        this.outputFile = new Path(WriterUtils.getWriterOutputDir((State)properties, (int)this.numBranches, (int)this.branchId), this.fileName);
        this.allOutputFilesPropName = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.final.output.file.paths", (int)this.numBranches, (int)this.branchId);
        if (this.fs.exists(this.stagingFile)) {
            LOG.warn(String.format("Task staging file %s already exists, deleting it", this.stagingFile));
            HadoopUtils.deletePath((FileSystem)this.fs, (Path)this.stagingFile, (boolean)false);
        }
        this.shouldIncludeRecordCountInFileName = properties.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch((String)WRITER_INCLUDE_RECORD_COUNT_IN_FILE_NAMES, (int)this.numBranches, (int)this.branchId), false);
        this.bufferSize = properties.getPropAsInt(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.buffer.size", (int)this.numBranches, (int)this.branchId), 4096);
        this.replicationFactor = properties.getPropAsShort(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.file.replication.factor", (int)this.numBranches, (int)this.branchId), this.fs.getDefaultReplication(this.outputFile));
        this.blockSize = properties.getPropAsLong(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.file.block.size", (int)this.numBranches, (int)this.branchId), this.fs.getDefaultBlockSize(this.outputFile));
        this.filePermission = HadoopUtils.deserializeWriterFilePermissions((State)properties, (int)this.numBranches, (int)this.branchId);
        this.dirPermission = HadoopUtils.deserializeWriterDirPermissions((State)properties, (int)this.numBranches, (int)this.branchId);
        this.group = Optional.fromNullable((Object)properties.getProp(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.group.name", (int)this.numBranches, (int)this.branchId)));
        WriterUtils.mkdirsWithRecursivePermission((FileSystem)this.fs, (Path)this.outputFile.getParent(), (FsPermission)this.dirPermission);
        this.bytesWritten = Optional.absent();
        this.defaultMetadata = new GlobalMetadata();
        for (StreamCodec c : this.getEncoders()) {
            this.defaultMetadata.addTransferEncoding(c.getTag());
        }
        this.partitionKey = builder.getPartitionPath(properties);
        if (builder.getPartitionPath(properties) != null) {
            properties.setProp("writer._internal.partition.path_" + builder.getWriterId(), (Object)this.partitionKey);
        }
    }

    protected OutputStream createStagingFileOutputStream() throws IOException {
        Object out = this.fs.create(this.stagingFile, this.filePermission, true, this.bufferSize, this.replicationFactor, this.blockSize, null);
        for (StreamCodec encoder : Lists.reverse(this.getEncoders())) {
            out = encoder.encodeOutputStream((OutputStream)out);
        }
        return (OutputStream)this.closer.register((Closeable)out);
    }

    protected void setStagingFileGroup() throws IOException {
        Preconditions.checkArgument((boolean)this.fs.exists(this.stagingFile), (Object)String.format("Staging output file %s does not exist", this.stagingFile));
        if (this.group.isPresent()) {
            HadoopUtils.setGroup((FileSystem)this.fs, (Path)this.stagingFile, (String)((String)this.group.get()));
        }
    }

    protected List<StreamCodec> getEncoders() {
        return this.encoders;
    }

    @Override
    public GlobalMetadata getDefaultMetadata() {
        return this.defaultMetadata;
    }

    public long bytesWritten() throws IOException {
        if (this.bytesWritten.isPresent()) {
            return (Long)this.bytesWritten.get();
        }
        return 0L;
    }

    public void commit() throws IOException {
        this.closer.close();
        this.setStagingFileGroup();
        if (!this.fs.exists(this.stagingFile)) {
            throw new IOException(String.format("File %s does not exist", this.stagingFile));
        }
        FileStatus stagingFileStatus = this.fs.getFileStatus(this.stagingFile);
        if (!stagingFileStatus.getPermission().equals((Object)this.filePermission)) {
            this.fs.setPermission(this.stagingFile, this.filePermission);
        }
        this.bytesWritten = Optional.of((Object)stagingFileStatus.getLen());
        LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile));
        if (this.fs.exists(this.outputFile)) {
            LOG.warn(String.format("Task output file %s already exists", this.outputFile));
            HadoopUtils.deletePath((FileSystem)this.fs, (Path)this.outputFile, (boolean)false);
        }
        HadoopUtils.renamePath((FileSystem)this.fs, (Path)this.stagingFile, (Path)this.outputFile);
    }

    public void cleanup() throws IOException {
        if (this.fs.exists(this.stagingFile)) {
            HadoopUtils.deletePath((FileSystem)this.fs, (Path)this.stagingFile, (boolean)false);
        }
    }

    public void close() throws IOException {
        this.closer.close();
        if (this.shouldIncludeRecordCountInFileName) {
            String filePathWithRecordCount = this.addRecordCountToFileName();
            this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount);
        } else {
            this.properties.appendToSetProp(this.allOutputFilesPropName, this.getOutputFilePath());
        }
        FsWriterMetrics metrics = new FsWriterMetrics(this.id, new PartitionIdentifier(this.partitionKey, this.branchId), (Collection)ImmutableSet.of((Object)new FsWriterMetrics.FileInfo(this.outputFile.getName(), this.recordsWritten())));
        this.properties.setProp(FS_WRITER_METRICS_KEY, (Object)metrics.toJson());
    }

    private synchronized String addRecordCountToFileName() throws IOException {
        String filePath = this.getOutputFilePath();
        String filePathWithRecordCount = IngestionRecordCountProvider.constructFilePath((String)filePath, (long)this.recordsWritten());
        LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
        HadoopUtils.renamePath((FileSystem)this.fs, (Path)new Path(filePath), (Path)new Path(filePathWithRecordCount));
        this.outputFile = new Path(filePathWithRecordCount);
        return filePathWithRecordCount;
    }

    public State getFinalState() {
        State state = new State();
        state.setProp("RecordsWritten", (Object)this.recordsWritten());
        try {
            state.setProp("BytesWritten", (Object)this.bytesWritten());
        }
        catch (Exception exception) {
            // empty catch block
        }
        return state;
    }

    public String getOutputFilePath() {
        return this.outputFile.toString();
    }

    public String getFullyQualifiedOutputFilePath() {
        return this.fs.makeQualified(this.outputFile).toString();
    }

    @Override
    public boolean isSpeculativeAttemptSafe() {
        return this.writerAttemptIdOptional.isPresent() && this.getClass() == FsDataWriter.class;
    }
}

