/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source.extractor.filebased;

import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
import org.apache.gobblin.metrics.Counters;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelper;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.FileDownloader;
import org.apache.gobblin.source.extractor.filebased.SingleFileDownloader;
import org.apache.gobblin.source.extractor.filebased.SizeAwareFileBasedHelper;
import org.apache.gobblin.source.extractor.filebased.SizeAwareFileBasedHelperDecorator;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileBasedExtractor<S, D>
extends InstrumentedExtractor<S, D> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedExtractor.class);
    protected final WorkUnit workUnit;
    protected final WorkUnitState workUnitState;
    protected final List<String> filesToPull;
    protected final FileDownloader<D> fileDownloader;
    private final int statusCount;
    private long totalRecordCount = 0L;
    private Iterator<D> currentFileItr;
    private String currentFile;
    private boolean hasNext = false;
    protected final Closer closer = Closer.create();
    private final boolean shouldSkipFirstRecord;
    protected final SizeAwareFileBasedHelper fsHelper;
    protected Counters<CounterNames> counters = new Counters();

    public FileBasedExtractor(WorkUnitState workUnitState, FileBasedHelper fsHelper) {
        super(workUnitState);
        this.workUnitState = workUnitState;
        this.workUnit = workUnitState.getWorkunit();
        this.filesToPull = Lists.newArrayList((Iterable)workUnitState.getPropAsList("source.filebased.files.to.pull", ""));
        this.statusCount = this.workUnit.getPropAsInt("filebased.report.status.on.count", 10000);
        this.shouldSkipFirstRecord = this.workUnitState.getPropAsBoolean("source.skip.first.record", false);
        this.fsHelper = fsHelper instanceof SizeAwareFileBasedHelper ? (SizeAwareFileBasedHelper)fsHelper : new SizeAwareFileBasedHelperDecorator(fsHelper);
        try {
            this.fsHelper.connect();
        }
        catch (FileBasedHelperException e) {
            throw new RuntimeException(e);
        }
        if (workUnitState.contains("source.filebased.downloader.class")) {
            try {
                this.fileDownloader = (FileDownloader)ConstructorUtils.invokeConstructor(Class.forName(workUnitState.getProp("source.filebased.downloader.class")), (Object[])new Object[]{this});
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        } else {
            this.fileDownloader = new SingleFileDownloader(this);
        }
        this.counters.initialize(this.getMetricContext(), CounterNames.class, ((Object)((Object)this)).getClass());
    }

    public D readRecordImpl(@Deprecated D reuse) throws DataRecordException, IOException {
        ++this.totalRecordCount;
        if (this.statusCount > 0 && this.totalRecordCount % (long)this.statusCount == 0L) {
            LOG.info("Total number of records processed so far: " + this.totalRecordCount);
        }
        if (this.currentFile != null && this.currentFileItr != null) {
            this.hasNext = this.currentFileItr.hasNext();
            if (!this.hasNext) {
                this.getNextFileToRead();
            }
        } else {
            this.getNextFileToRead();
        }
        if (this.hasNext) {
            return this.currentFileItr.next();
        }
        LOG.info("Finished reading records from all files");
        return null;
    }

    private void getNextFileToRead() throws IOException {
        if (this.currentFile != null && this.currentFileItr != null) {
            this.closeCurrentFile();
            this.incrementBytesReadCounter();
            this.currentFileItr = null;
        }
        while (!this.hasNext && !this.filesToPull.isEmpty()) {
            this.currentFile = this.filesToPull.remove(0);
            this.currentFileItr = this.downloadFile(this.currentFile);
            this.hasNext = this.currentFileItr == null ? false : this.currentFileItr.hasNext();
            LOG.info("Will start downloading file: " + this.currentFile);
        }
    }

    public S getSchema() {
        return (S)this.workUnit.getProp("source.schema");
    }

    public long getExpectedRecordCount() {
        return -1L;
    }

    public long getHighWatermark() {
        LOG.info("High Watermark is -1 for file based extractors");
        return -1L;
    }

    public Iterator<D> downloadFile(String file) throws IOException {
        return this.fileDownloader.downloadFile(file);
    }

    public void closeCurrentFile() {
        block2: {
            try {
                this.closer.close();
            }
            catch (IOException e) {
                if (this.currentFile == null) break block2;
                LOG.error("Failed to close file: " + this.currentFile, (Throwable)e);
            }
        }
    }

    public void close() throws IOException {
        try {
            this.fsHelper.close();
        }
        catch (IOException e) {
            LOG.error("Could not successfully close file system helper due to error: " + e.getMessage(), (Throwable)e);
        }
    }

    private void incrementBytesReadCounter() {
        try {
            this.counters.inc((Enum)CounterNames.FileBytesRead, this.fsHelper.getFileSize(this.currentFile));
        }
        catch (FileBasedHelperException e) {
            LOG.info("Unable to get file size. Will skip increment to bytes counter " + e.getMessage());
            LOG.debug(e.getMessage(), (Throwable)e);
        }
        catch (UnsupportedOperationException e) {
            LOG.info("Unable to get file size. Will skip increment to bytes counter " + e.getMessage());
            LOG.debug(e.getMessage(), (Throwable)e);
        }
    }

    public Closer getCloser() {
        return this.closer;
    }

    public boolean isShouldSkipFirstRecord() {
        return this.shouldSkipFirstRecord;
    }

    public SizeAwareFileBasedHelper getFsHelper() {
        return this.fsHelper;
    }

    protected static enum CounterNames {
        FileBytesRead;

    }
}

