/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.PartitionAwareFileRetriever;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
import org.apache.gobblin.source.extractor.hadoop.AvroFsHelper;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnitWeightedQueue;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.DatePartitionType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PartitionedFileSourceBase<SCHEMA, DATA>
extends FileBasedSource<SCHEMA, DATA> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedFileSourceBase.class);
    public static final String DATE_PARTITIONED_SOURCE_PREFIX = "date.partitioned.source";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_PREFIX = "date.partitioned.source.partition.prefix";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX = "date.partitioned.source.partition.suffix";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN = "date.partitioned.source.partition.pattern";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY = "date.partitioned.source.partition.granularity";
    public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY = DatePartitionType.HOUR;
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = "date.partitioned.source.partition.lead_time.size";
    public static final Duration DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = new Duration(0L);
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY = "date.partitioned.source.partition.lead_time.granularity";
    public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY = DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY;
    public static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = "date.partitioned.source.min.watermark.value";
    private static final String DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = "date.partitioned.source.max.files.per.job";
    private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = "date.partitioned.source.max.workunits.per.job";
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = 2000;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = 500;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = 0;
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedFileSourceBase.class);
    private SourceState sourceState;
    private FileSystem fs;
    private long lowWaterMark;
    private int maxFilesPerJob;
    private int maxWorkUnitsPerJob;
    private int fileCount;
    private Extract.TableType tableType;
    private Path sourceDir;
    private final PartitionAwareFileRetriever retriever;

    protected PartitionedFileSourceBase(PartitionAwareFileRetriever retriever) {
        this.retriever = retriever;
    }

    protected void init(SourceState state) {
        this.retriever.init(state);
        try {
            this.initFileSystemHelper((State)state);
        }
        catch (FileBasedHelperException e) {
            Throwables.propagate((Throwable)e);
        }
        AvroFsHelper fsHelper = (AvroFsHelper)this.fsHelper;
        this.fs = fsHelper.getFileSystem();
        this.sourceState = state;
        this.lowWaterMark = this.getLowWaterMark(state.getPreviousWorkUnitStates(), state.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, String.valueOf(0)));
        this.maxFilesPerJob = state.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB, 2000);
        this.maxWorkUnitsPerJob = state.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB, 500);
        this.tableType = Extract.TableType.valueOf((String)state.getProp("extract.table.type").toUpperCase());
        this.fileCount = 0;
        this.sourceDir = new Path(state.getProp("source.filebased.data.directory"));
    }

    @Override
    public void initFileSystemHelper(State state) throws FileBasedHelperException {
        this.fsHelper = new AvroFsHelper(state);
        this.fsHelper.connect();
    }

    public abstract Extractor<SCHEMA, DATA> getExtractor(WorkUnitState var1) throws IOException;

    @Override
    public List<WorkUnit> getWorkunits(SourceState state) {
        this.lineageInfo = LineageInfo.getLineageInfo((SharedResourcesBroker)state.getBroker());
        DateTimeFormatter formatter = DateTimeFormat.fullDateTime();
        this.init(state);
        LOG.info("Will pull data from " + formatter.print(this.lowWaterMark) + " until " + this.maxFilesPerJob + " files have been processed, or until there is no more data to consume");
        LOG.info("Creating workunits");
        MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue = new MultiWorkUnitWeightedQueue(this.maxWorkUnitsPerJob);
        this.addFailedWorkUnits(this.getPreviousWorkUnitsForRetry(this.sourceState), multiWorkUnitWeightedQueue);
        if (this.fileCount >= this.maxFilesPerJob) {
            LOG.info("The number of work units from previous job has already reached the upper limit, no more workunits will be made");
            return multiWorkUnitWeightedQueue.getQueueAsList();
        }
        this.addNewWorkUnits(multiWorkUnitWeightedQueue);
        List<WorkUnit> workUnits = multiWorkUnitWeightedQueue.getQueueAsList();
        this.addLineageSourceInfo(workUnits, (State)state);
        return workUnits;
    }

    @Override
    protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
        if (!this.lineageInfo.isPresent()) {
            log.info("Lineage is not enabled");
            return;
        }
        String platform = state.getProp("source.filebased.platform", "hdfs");
        Path dataDir = new Path(state.getProp("source.filebased.data.directory"));
        String dataset = Path.getPathWithoutSchemeAndAuthority((Path)dataDir).toString();
        URI fileSystemUrl = URI.create(state.getProp("source.filebased.fs.uri", "file:///"));
        DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform, fileSystemUrl, dataset);
        String partitionName = workUnit.getProp("workunit.source.date.partitionName");
        PartitionDescriptor descriptor = new PartitionDescriptor(partitionName, datasetDescriptor);
        ((LineageInfo)this.lineageInfo.get()).setSource((Descriptor)descriptor, (State)workUnit);
    }

    private void addFailedWorkUnits(List<WorkUnit> previousWorkUnitsForRetry, MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        for (WorkUnit wu : previousWorkUnitsForRetry) {
            try {
                multiWorkUnitWeightedQueue.addWorkUnit(wu, this.fs.getFileStatus(new Path(wu.getProp("source.filebased.files.to.pull"))).getLen());
            }
            catch (IOException e) {
                Throwables.propagate((Throwable)e);
            }
            LOG.info("Will process file from previous workunit: " + wu.getProp("source.filebased.files.to.pull"));
            ++this.fileCount;
        }
    }

    private Extract getExtractForFile(PartitionAwareFileRetriever.FileInfo file, String topicName, String namespace, Map<Long, Extract> extractMap) {
        Extract extract = extractMap.get(file.getWatermarkMsSinceEpoch());
        if (extract == null) {
            extract = new Extract(this.tableType, namespace, topicName);
            LOG.info("Created extract: " + extract.getExtractId() + " for path " + topicName);
            extractMap.put(file.getWatermarkMsSinceEpoch(), extract);
        }
        return extract;
    }

    private void addNewWorkUnits(MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        try {
            List<PartitionAwareFileRetriever.FileInfo> filesToPull = this.retriever.getFilesToProcess(this.lowWaterMark, this.maxFilesPerJob - this.fileCount);
            Collections.sort(filesToPull);
            String topicName = this.sourceDir.getName();
            String namespace = this.sourceState.getProp("extract.namespace");
            HashMap<Long, Extract> extractMap = new HashMap<Long, Extract>();
            for (PartitionAwareFileRetriever.FileInfo file : filesToPull) {
                Extract extract = this.getExtractForFile(file, topicName, namespace, extractMap);
                LOG.info("Will process file " + file.getFilePath());
                WorkUnit singleWorkUnit = WorkUnit.create((Extract)extract);
                singleWorkUnit.setProp("source.entity", (Object)topicName);
                singleWorkUnit.setProp("source.filebased.files.to.pull", (Object)file.getFilePath());
                singleWorkUnit.setProp("workunit.low.water.mark", (Object)file.getWatermarkMsSinceEpoch());
                singleWorkUnit.setProp("workunit.high.water.mark", (Object)file.getWatermarkMsSinceEpoch());
                singleWorkUnit.setProp("workunit.source.date.partition", (Object)file.getWatermarkMsSinceEpoch());
                singleWorkUnit.setProp("workunit.source.date.partitionName", (Object)file.getPartitionName());
                if (this.sourceState.getPropAsBoolean("schema.in.source.dir", false)) {
                    this.addSchemaFile(file, singleWorkUnit);
                }
                multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize());
                ++this.fileCount;
            }
            LOG.info("Total number of files extracted for the current run: " + filesToPull.size());
        }
        catch (IOException e) {
            Throwables.propagate((Throwable)e);
        }
    }

    private void addSchemaFile(PartitionAwareFileRetriever.FileInfo dataFile, WorkUnit workUnit) throws IOException {
        Path schemaFile = new Path(new Path(dataFile.getFilePath()).getParent(), workUnit.getProp("schema.filename", "metadata.json"));
        if (!this.fs.exists(schemaFile)) {
            throw new IOException("Schema file " + schemaFile + " does not exist.");
        }
        workUnit.setProp("source.schema", (Object)schemaFile.toString());
    }

    private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowWaterMark) {
        long lowWaterMarkValue = this.retriever.getWatermarkFromString(lowWaterMark);
        for (WorkUnitState previousState : previousStates) {
            long previousHighWaterMark;
            if (!previousState.getWorkingState().equals((Object)WorkUnitState.WorkingState.COMMITTED) || (previousHighWaterMark = previousState.getWorkunit().getHighWaterMark()) <= lowWaterMarkValue) continue;
            lowWaterMarkValue = previousHighWaterMark;
        }
        return lowWaterMarkValue + this.getRetriever().getWatermarkIncrementMs();
    }

    protected PartitionAwareFileRetriever getRetriever() {
        return this.retriever;
    }
}

