/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.LongHolder;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"tail", "file", "log", "text", "source"})
@CapabilityDescription(value="\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor does not support ingesting files that have been compressed when 'rolled over'.")
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. State is stored either local or clustered depend on the <File Location> property.")
public class TailFile
extends AbstractProcessor {
    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "File is located on a local disk drive. Each node in a cluster will tail a different file.");
    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "File is located on a remote resource. This Processor will store state across the cluster so that it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
    static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
    static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", "Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
    static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time", "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written.");
    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("File to Tail").description("Fully-qualified filename of the file that should be tailed").expressionLanguageSupported(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder().name("Rolling Filename Pattern").description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. This pattern supports wildcard characters * and ? and will assume that the files that have rolled over live in the same directory as the file being tailed.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(false).build();
    static final PropertyDescriptor FILE_LOCATION = new PropertyDescriptor.Builder().name("File Location").description("Specifies where the file is located, so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi").required(true).allowableValues(new AllowableValue[]{LOCATION_LOCAL, LOCATION_REMOTE}).defaultValue(LOCATION_LOCAL.getValue()).build();
    static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder().name("State File").description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(false).build();
    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("Initial Start Position").description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, the Processor will continue from the last point from which it has received data.").allowableValues(new AllowableValue[]{START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME}).defaultValue(START_CURRENT_FILE.getValue()).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this Relationship.").build();
    private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
    private volatile Long expectedRecoveryChecksum;
    private volatile boolean tailFileChanged = false;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(FILENAME);
        properties.add(ROLLING_FILENAME_PATTERN);
        properties.add(STATE_FILE);
        properties.add(START_POSITION);
        properties.add(FILE_LOCATION);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && FILENAME.equals((Object)descriptor)) {
            this.state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
            this.tailFileChanged = true;
        }
    }

    @OnScheduled
    public void recoverState(ProcessContext context) throws IOException {
        Scope scope = this.getStateScope(context);
        StateMap stateMap = context.getStateManager().getState(scope);
        if (stateMap.getVersion() == -1L) {
            Map<String, String> stateFromFile = this.recoverStateValuesFromFile(context);
            if (!stateFromFile.isEmpty()) {
                this.persistState(stateFromFile, context);
                this.recoverState(context, stateFromFile);
            }
            return;
        }
        this.recoverState(context, stateMap.toMap());
    }

    private Map<String, String> recoverStateValuesFromFile(ProcessContext context) throws IOException {
        String stateFilename = context.getProperty(STATE_FILE).getValue();
        if (stateFilename == null) {
            return Collections.emptyMap();
        }
        HashMap<String, String> stateValues = new HashMap<String, String>(4);
        File stateFile = new File(stateFilename);
        try (FileInputStream fis = new FileInputStream(stateFile);
             DataInputStream dis = new DataInputStream(fis);){
            int encodingVersion = dis.readInt();
            if (encodingVersion > 0) {
                throw new IOException("Unable to recover state because State File was encoded in a more recent version than Version 1");
            }
            if (encodingVersion == 0) {
                String filename = dis.readUTF();
                long position = dis.readLong();
                long timestamp = dis.readLong();
                boolean checksumPresent = dis.readBoolean();
                Long checksumValue = checksumPresent ? Long.valueOf(dis.readLong()) : null;
                stateValues.put("filename", filename);
                stateValues.put("position", String.valueOf(position));
                stateValues.put("timestamp", String.valueOf(timestamp));
                stateValues.put("checksum", checksumValue == null ? null : String.valueOf(checksumValue));
            }
        }
        catch (FileNotFoundException fileNotFoundException) {
            // empty catch block
        }
        return stateValues;
    }

    private void recoverState(ProcessContext context, Map<String, String> stateValues) throws IOException {
        if (stateValues == null) {
            return;
        }
        if (!stateValues.containsKey("filename")) {
            return;
        }
        if (!stateValues.containsKey("position")) {
            return;
        }
        if (!stateValues.containsKey("timestamp")) {
            return;
        }
        String currentFilename = context.getProperty(FILENAME).getValue();
        String checksumValue = stateValues.get("checksum");
        boolean checksumPresent = checksumValue != null;
        String storedStateFilename = stateValues.get("filename");
        long position = Long.parseLong(stateValues.get("position"));
        long timestamp = Long.parseLong(stateValues.get("timestamp"));
        FileChannel reader = null;
        File tailFile = null;
        if (checksumPresent && currentFilename.equals(storedStateFilename)) {
            this.expectedRecoveryChecksum = Long.parseLong(checksumValue);
            CRC32 checksum = new CRC32();
            File existingTailFile = new File(storedStateFilename);
            if (existingTailFile.length() >= position) {
                try (FileInputStream tailFileIs = new FileInputStream(existingTailFile);
                     CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum);){
                    StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)this.state.getPosition());
                    long checksumResult = in.getChecksum().getValue();
                    if (checksumResult == this.expectedRecoveryChecksum) {
                        this.getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
                        tailFile = existingTailFile;
                        reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
                        this.getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[]{reader, tailFile});
                        reader.position(position);
                    }
                    this.getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
                }
            } else {
                this.getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position});
            }
            this.state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
        } else {
            this.expectedRecoveryChecksum = null;
            this.state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
        }
        this.getLogger().debug("Recovered state {}", new Object[]{this.state});
    }

    @OnStopped
    public void cleanup() {
        TailFileState state = this.state;
        if (state == null) {
            return;
        }
        FileChannel reader = state.getReader();
        if (reader == null) {
            return;
        }
        try {
            reader.close();
        }
        catch (IOException ioe) {
            this.getLogger().warn("Failed to close file handle during cleanup");
        }
        this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
        this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        TailFileState updatedState;
        boolean rolloverOccurred;
        if (this.tailFileChanged) {
            rolloverOccurred = false;
            String recoverPosition = context.getProperty(START_POSITION).getValue();
            if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                this.recoverRolledFiles(context, session, this.expectedRecoveryChecksum, this.state.getTimestamp(), this.state.getPosition());
            } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
                this.cleanup();
                this.state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, this.state.getBuffer());
            } else {
                String filename = context.getProperty(FILENAME).getValue();
                File file = new File(filename);
                try {
                    FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
                    this.getLogger().debug("Created FileChannel {} for {}", new Object[]{fileChannel, file});
                    CRC32 checksum = new CRC32();
                    long position = file.length();
                    long timestamp = file.lastModified();
                    try (FileInputStream fis = new FileInputStream(file);
                         CheckedInputStream in = new CheckedInputStream(fis, checksum);){
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                    }
                    fileChannel.position(position);
                    this.cleanup();
                    this.state = new TailFileState(filename, file, fileChannel, position, timestamp, checksum, this.state.getBuffer());
                }
                catch (IOException ioe) {
                    this.getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, (Throwable)ioe);
                    context.yield();
                    return;
                }
            }
            this.tailFileChanged = false;
        } else {
            Long expectedChecksumValue = this.expectedRecoveryChecksum;
            if (expectedChecksumValue == null) {
                expectedChecksumValue = this.state.getChecksum() == null ? null : Long.valueOf(this.state.getChecksum().getValue());
            }
            rolloverOccurred = this.recoverRolledFiles(context, session, expectedChecksumValue, this.state.getTimestamp(), this.state.getPosition());
            this.expectedRecoveryChecksum = null;
        }
        TailFileState state = this.state;
        File file = state.getFile();
        FileChannel reader = state.getReader();
        Checksum checksum = state.getChecksum();
        if (checksum == null) {
            checksum = new CRC32();
        }
        long position = state.getPosition();
        long timestamp = state.getTimestamp();
        if ((file == null || reader == null) && (reader = this.createReader(file = new File(context.getProperty(FILENAME).getValue()), position)) == null) {
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        if (rolloverOccurred) {
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader, reader});
            }
            catch (IOException ioe) {
                this.getLogger().warn("Failed to close reader for {} due to {}", new Object[]{file, ioe});
            }
            reader = this.createReader(file, 0L);
            position = 0L;
            checksum.reset();
        }
        if (file.length() == position) {
            this.getLogger().debug("No data to consume; created no FlowFiles");
            state = this.state = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
            this.persistState(state, context);
            context.yield();
            return;
        }
        final TailFileState currentState = state;
        final Checksum chksum = checksum;
        FlowFile flowFile = session.create();
        final LongHolder positionHolder = new LongHolder(position);
        final FileChannel fileReader = reader;
        if ((flowFile = session.write(flowFile, new OutputStreamCallback(){

            public void process(OutputStream rawOut) throws IOException {
                try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                    positionHolder.set((Object)TailFile.this.readLines(fileReader, currentState.getBuffer(), out, chksum));
                }
            }
        })).getSize() == 0L) {
            session.remove(flowFile);
            this.getLogger().debug("No data to consume; removed created FlowFile");
        } else {
            String tailFilename = file.getName();
            String baseName = StringUtils.substringBeforeLast((String)tailFilename, (String)".");
            String flowFileName = baseName.length() < tailFilename.length() ? baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast((String)tailFilename, (String)".") : baseName + "." + position + "-" + positionHolder.get();
            HashMap<String, String> attributes = new HashMap<String, String>(2);
            attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
            session.transfer(flowFile, REL_SUCCESS);
            position = (Long)positionHolder.get();
            timestamp = Math.max(state.getTimestamp(), file.lastModified());
            this.getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
        }
        this.state = updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
        session.commit();
        this.persistState(updatedState, context);
    }

    private long readLines(FileChannel reader, ByteBuffer buffer, OutputStream out, Checksum checksum) throws IOException {
        this.getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            int num;
            long pos;
            long rePos = pos = reader.position();
            int linesRead = 0;
            boolean seenCR = false;
            buffer.clear();
            while ((num = reader.read(buffer)) != -1) {
                buffer.flip();
                block14: for (int i = 0; i < num; ++i) {
                    byte ch = buffer.get(i);
                    switch (ch) {
                        case 10: {
                            baos.write((int)ch);
                            seenCR = false;
                            baos.writeTo(out);
                            checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
                            if (this.getLogger().isTraceEnabled()) {
                                this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
                            }
                            baos.reset();
                            rePos = pos + (long)i + 1L;
                            ++linesRead;
                            continue block14;
                        }
                        case 13: {
                            baos.write((int)ch);
                            seenCR = true;
                            continue block14;
                        }
                        default: {
                            if (seenCR) {
                                seenCR = false;
                                baos.writeTo(out);
                                checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
                                if (this.getLogger().isTraceEnabled()) {
                                    this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
                                }
                                ++linesRead;
                                baos.reset();
                                baos.write((int)ch);
                                rePos = pos + (long)i;
                                continue block14;
                            }
                            baos.write((int)ch);
                        }
                    }
                }
                pos = reader.position();
            }
            if (rePos < reader.position()) {
                this.getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
                reader.position(rePos);
            }
            long l = rePos;
            return l;
        }
    }

    private List<File> getRolledOffFiles(ProcessContext context, long minTimestamp) throws IOException {
        String rollingPattern;
        String tailFilename = context.getProperty(FILENAME).getValue();
        File tailFile = new File(tailFilename);
        File directory = tailFile.getParentFile();
        if (directory == null) {
            directory = new File(".");
        }
        if ((rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue()) == null) {
            return Collections.emptyList();
        }
        ArrayList<File> rolledOffFiles = new ArrayList<File>();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);){
            for (Path path : dirStream) {
                File file = path.toFile();
                long lastMod = file.lastModified();
                if (file.lastModified() < minTimestamp) {
                    this.getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", new Object[]{file, lastMod, minTimestamp});
                    continue;
                }
                if (file.equals(tailFile)) continue;
                rolledOffFiles.add(file);
            }
        }
        Collections.sort(rolledOffFiles, new Comparator<File>(){

            @Override
            public int compare(File o1, File o2) {
                int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
                if (lastModifiedComp != 0) {
                    return lastModifiedComp;
                }
                return o1.getName().compareTo(o2.getName());
            }
        });
        return rolledOffFiles;
    }

    private Scope getStateScope(ProcessContext context) {
        String location = context.getProperty(FILE_LOCATION).getValue();
        if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
            return Scope.CLUSTER;
        }
        return Scope.LOCAL;
    }

    private void persistState(TailFileState state, ProcessContext context) {
        this.persistState(state.toStateMap(), context);
    }

    private void persistState(Map<String, String> state, ProcessContext context) {
        try {
            context.getStateManager().setState(state, this.getStateScope(context));
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
        }
    }

    private FileChannel createReader(File file, long position) {
        FileChannel reader;
        try {
            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
        }
        catch (IOException ioe) {
            this.getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed: {}", new Object[]{file, ioe});
            return null;
        }
        this.getLogger().debug("Created FileChannel {} for {}", new Object[]{reader, file});
        try {
            reader.position(position);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to read from {} due to {}", new Object[]{file, ioe});
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return null;
        }
        return reader;
    }

    TailFileState getState() {
        return this.state;
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, Long expectedChecksum, long timestamp, long position) {
        try {
            List<File> rolledOffFiles = this.getRolledOffFiles(context, timestamp);
            return this.recoverRolledFiles(context, session, rolledOffFiles, expectedChecksum, timestamp, position);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
            return false;
        }
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, List<File> rolledOffFiles, Long expectedChecksum, long timestamp, long position) {
        try {
            boolean rolloverOccurred;
            this.getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
            boolean bl = rolloverOccurred = !rolledOffFiles.isEmpty();
            if (rolloverOccurred && expectedChecksum != null && rolledOffFiles.get(0).length() >= position) {
                File firstFile = rolledOffFiles.get(0);
                long startNanos = System.nanoTime();
                if (position > 0L) {
                    try (FileInputStream fis = new FileInputStream(firstFile);
                         CheckedInputStream in = new CheckedInputStream(fis, new CRC32());){
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                        long checksumResult = in.getChecksum().getValue();
                        if (checksumResult == expectedChecksum) {
                            this.getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{firstFile, position});
                            rolledOffFiles.remove(0);
                            FlowFile flowFile = session.create();
                            flowFile = session.importFrom((InputStream)in, flowFile);
                            if (flowFile.getSize() == 0L) {
                                session.remove(flowFile);
                                this.cleanup();
                                this.state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, this.state.getBuffer());
                            } else {
                                flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
                                session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                session.transfer(flowFile, REL_SUCCESS);
                                this.getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
                                this.cleanup();
                                this.state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, this.state.getBuffer());
                                session.commit();
                                this.persistState(this.state, context);
                            }
                        } else {
                            this.getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", new Object[]{firstFile, checksumResult, expectedChecksum});
                        }
                    }
                }
            }
            for (File file : rolledOffFiles) {
                this.state = this.consumeFileFully(file, context, session, this.state);
            }
            return rolloverOccurred;
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
            return false;
        }
    }

    private TailFileState consumeFileFully(File file, ProcessContext context, ProcessSession session, TailFileState state) {
        FlowFile flowFile = session.create();
        flowFile = session.importFrom(file.toPath(), true, flowFile);
        if (flowFile.getSize() == 0L) {
            session.remove(flowFile);
        } else {
            flowFile = session.putAttribute(flowFile, "filename", file.getName());
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
            session.transfer(flowFile, REL_SUCCESS);
            this.getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
            this.cleanup();
            state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
            session.commit();
            this.persistState(state, context);
        }
        return state;
    }

    static class TailFileState {
        private final String filename;
        private final File file;
        private final FileChannel reader;
        private final long position;
        private final long timestamp;
        private final Checksum checksum;
        private final ByteBuffer buffer;

        public TailFileState(String filename, File file, FileChannel reader, long position, long timestamp, Checksum checksum, ByteBuffer buffer) {
            this.filename = filename;
            this.file = file;
            this.reader = reader;
            this.position = position;
            this.timestamp = timestamp;
            this.checksum = checksum;
            this.buffer = buffer;
        }

        public String getFilename() {
            return this.filename;
        }

        public File getFile() {
            return this.file;
        }

        public FileChannel getReader() {
            return this.reader;
        }

        public long getPosition() {
            return this.position;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public Checksum getChecksum() {
            return this.checksum;
        }

        public ByteBuffer getBuffer() {
            return this.buffer;
        }

        public String toString() {
            return "TailFileState[filename=" + this.filename + ", position=" + this.position + ", timestamp=" + this.timestamp + ", checksum=" + (this.checksum == null ? "null" : Long.valueOf(this.checksum.getValue())) + "]";
        }

        public Map<String, String> toStateMap() {
            HashMap<String, String> map = new HashMap<String, String>(4);
            map.put("filename", this.filename);
            map.put("position", String.valueOf(this.position));
            map.put("timestamp", String.valueOf(this.timestamp));
            map.put("checksum", this.checksum == null ? null : String.valueOf(this.checksum.getValue()));
            return map;
        }

        private static class StateKeys {
            public static final String FILENAME = "filename";
            public static final String POSITION = "position";
            public static final String TIMESTAMP = "timestamp";
            public static final String CHECKSUM = "checksum";

            private StateKeys() {
            }
        }
    }
}

