/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source.extractor.filebased;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class FileBasedSource<S, D>
extends AbstractSource<S, D> {
    private static final Logger log = LoggerFactory.getLogger(FileBasedSource.class);
    protected TimestampAwareFileBasedHelper fsHelper;
    protected String splitPattern = ":::";
    protected Optional<LineageInfo> lineageInfo;

    protected void initLogger(SourceState state) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        sb.append(Strings.nullToEmpty((String)state.getProp("source.entity")));
        sb.append("]");
        MDC.put((String)"sourceInfo", (String)sb.toString());
    }

    public List<WorkUnit> getWorkunits(SourceState state) {
        this.initLogger(state);
        this.lineageInfo = LineageInfo.getLineageInfo((SharedResourcesBroker)state.getBroker());
        try {
            this.initFileSystemHelper((State)state);
        }
        catch (FileBasedHelperException e) {
            Throwables.propagate((Throwable)e);
        }
        log.info("Getting work units");
        String nameSpaceName = state.getProp("extract.namespace");
        String entityName = state.getProp("source.entity");
        String extractTableName = state.getProp("extract.table.name");
        if (Strings.isNullOrEmpty((String)extractTableName)) {
            extractTableName = entityName;
        }
        Extract.TableType tableType = Extract.TableType.valueOf((String)state.getProp("extract.table.type").toUpperCase());
        ArrayList previousWorkunits = Lists.newArrayList((Iterable)state.getPreviousWorkUnitStates());
        HashSet prevFsSnapshot = Sets.newHashSet();
        if (!previousWorkunits.isEmpty()) {
            if (((WorkUnitState)previousWorkunits.get(0)).getWorkunit().contains("source.filebased.fs.snapshot")) {
                prevFsSnapshot.addAll(((WorkUnitState)previousWorkunits.get(0)).getWorkunit().getPropAsSet("source.filebased.fs.snapshot"));
            } else if (state.getPropAsBoolean("source.filebased.fs.prior.snapshot.required", false)) {
                throw new RuntimeException(String.format("No '%s' found on state of prior job", "source.filebased.fs.snapshot"));
            }
        }
        ArrayList workUnits = Lists.newArrayList();
        List previousWorkUnitsForRetry = this.getPreviousWorkUnitsForRetry(state);
        log.info("Total number of work units from the previous failed runs: " + previousWorkUnitsForRetry.size());
        for (WorkUnit previousWorkUnitForRetry : previousWorkUnitsForRetry) {
            prevFsSnapshot.addAll(previousWorkUnitForRetry.getPropAsSet("source.filebased.files.to.pull"));
            workUnits.add(previousWorkUnitForRetry);
        }
        List<String> currentFsSnapshot = this.getcurrentFsSnapshot((State)state);
        ArrayList effectiveSnapshot = Lists.newArrayList();
        ArrayList filesToPull = Lists.newArrayList();
        int maxFilesToPull = state.getPropAsInt("source.filebased.maxFilesPerRun", Integer.MAX_VALUE);
        int filesSelectedForPull = 0;
        if (currentFsSnapshot.size() > maxFilesToPull) {
            Collections.sort(currentFsSnapshot);
        }
        for (String file : currentFsSnapshot) {
            if (prevFsSnapshot.contains(file)) {
                effectiveSnapshot.add(file);
                continue;
            }
            if (filesSelectedForPull++ >= maxFilesToPull) continue;
            filesToPull.add(file.split(this.splitPattern)[0]);
            effectiveSnapshot.add(file);
        }
        for (WorkUnit workUnit : previousWorkUnitsForRetry) {
            workUnit.setProp("source.filebased.fs.snapshot", (Object)StringUtils.join((Iterable)effectiveSnapshot, (String)","));
        }
        if (!filesToPull.isEmpty()) {
            int numPartitions;
            this.logFilesToPull(filesToPull);
            int n = numPartitions = state.contains("source.max.number.of.partitions") && state.getPropAsInt("source.max.number.of.partitions") <= filesToPull.size() ? state.getPropAsInt("source.max.number.of.partitions") : filesToPull.size();
            if (numPartitions <= 0) {
                throw new IllegalArgumentException("The number of partitions should be positive");
            }
            int filesPerPartition = filesToPull.size() % numPartitions == 0 ? filesToPull.size() / numPartitions : filesToPull.size() / numPartitions + 1;
            for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += filesPerPartition) {
                SourceState extractState = new SourceState();
                extractState.setProp("extract.extractIdTimeZone", (Object)state.getProp("extract.extractIdTimeZone", "UTC"));
                extractState.setProp("extract.is.full", (Object)state.getProp("extract.is.full", "false"));
                Extract extract = new Extract(extractState, tableType, nameSpaceName, extractTableName);
                WorkUnit workUnit = WorkUnit.create((Extract)extract);
                workUnit.setProp("source.filebased.fs.snapshot", (Object)StringUtils.join((Iterable)effectiveSnapshot, (String)","));
                List partitionFilesToPull = filesToPull.subList(fileOffset, fileOffset + filesPerPartition > filesToPull.size() ? filesToPull.size() : fileOffset + filesPerPartition);
                workUnit.setProp("source.filebased.files.to.pull", (Object)StringUtils.join(partitionFilesToPull, (String)","));
                if (state.getPropAsBoolean("source.filebased.preserve.file.name", false)) {
                    if (partitionFilesToPull.size() != 1) {
                        throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files");
                    }
                    workUnit.setProp("data.publisher.final.dir", (Object)workUnit.getProp("source.filebased.files.to.pull"));
                }
                workUnits.add(workUnit);
            }
            log.info("Total number of work units for the current run: " + (workUnits.size() - previousWorkUnitsForRetry.size()));
        }
        this.addLineageSourceInfo(workUnits, (State)state);
        return workUnits;
    }

    protected void addLineageSourceInfo(List<WorkUnit> workUnits, State state) {
        workUnits.forEach(workUnit -> {
            if (workUnit instanceof MultiWorkUnit) {
                ((MultiWorkUnit)workUnit).getWorkUnits().forEach(wu -> this.addLineageSourceInfo((WorkUnit)wu, state));
            } else {
                this.addLineageSourceInfo((WorkUnit)workUnit, state);
            }
        });
    }

    protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
        if (!this.lineageInfo.isPresent()) {
            log.info("Lineage is not enabled");
            return;
        }
        String platform = state.getProp("source.filebased.platform", "hdfs");
        Path dataDir = new Path(state.getProp("source.filebased.data.directory"));
        String dataset = Path.getPathWithoutSchemeAndAuthority((Path)dataDir).toString();
        DatasetDescriptor source = new DatasetDescriptor(platform, dataset);
        ((LineageInfo)this.lineageInfo.get()).setSource((Descriptor)source, (State)workUnit);
    }

    public List<String> getcurrentFsSnapshot(State state) {
        List<String> results;
        String path = this.getLsPattern(state);
        try {
            log.info("Running ls command with input " + path);
            results = this.fsHelper.ls(path);
            for (int i = 0; i < results.size(); ++i) {
                URI uri = new URI(results.get(i));
                String filePath = uri.toString();
                if (!uri.isAbsolute()) {
                    File file = new File(state.getProp("source.filebased.data.directory"), uri.toString());
                    filePath = file.getAbsolutePath();
                }
                results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath));
            }
        }
        catch (URISyntaxException | FileBasedHelperException e) {
            String errMsg = String.format("Not able to fetch the filename/file modified time to %s. Will not pull any files", e.getMessage());
            log.error(errMsg, (Throwable)e);
            throw new RuntimeException(errMsg, e);
        }
        return results;
    }

    protected String getLsPattern(State state) {
        return state.getProp("source.filebased.data.directory") + "/*" + state.getProp("source.entity") + "*";
    }

    public void shutdown(SourceState state) {
        if (this.fsHelper != null) {
            log.info("Shutting down the FileSystemHelper connection");
            try {
                this.fsHelper.close();
            }
            catch (IOException e) {
                log.error("Unable to shutdown FileSystemHelper", (Throwable)e);
            }
        }
    }

    public abstract void initFileSystemHelper(State var1) throws FileBasedHelperException;

    private void logFilesToPull(List<String> filesToPull) {
        int filesToLog = Math.min(2000, filesToPull.size());
        String remainingString = "";
        if (filesToLog < filesToPull.size()) {
            remainingString = "and " + (filesToPull.size() - filesToLog) + " more ";
        }
        log.info(String.format("Will pull the following files %s in this run: %s", remainingString, Arrays.toString(filesToPull.subList(0, filesToLog).toArray())));
    }
}

