/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"tail", "file", "log", "text", "source", "restricted"})
@CapabilityDescription(value="\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor does not support ingesting files that have been compressed when 'rolled over'.")
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. State is stored either local or clustered depend on the <File Location> property.")
@WritesAttributes(value={@WritesAttribute(attribute="tailfile.original.path", description="Path of the original file the flow file comes from.")})
@Restricted(value="Provides operator the ability to read from any file that NiFi has access to.")
public class TailFile
extends AbstractProcessor {
    static final String MAP_PREFIX = "file.";
    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "State is stored locally. Each node in a cluster will tail a different file.");
    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "State is located on a remote resource. This Processor will store state across the cluster so that it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
    static final AllowableValue MODE_SINGLEFILE = new AllowableValue("Single file", "Single file", "In this mode, only the one file indicated in the 'Files to tail' property will be watched by the processor. In this mode, the file may not exist when starting the processor.");
    static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files", "In this mode, the 'Files to tail' property accepts a regular expression and the processor will look for files in 'Base directory' to list the files to tail by the processor. In this mode, only the files existing when starting the processor will be used.");
    static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files where the log messages are appended have always the same name.");
    static final AllowableValue CHANGING_NAME = new AllowableValue("Changing name", "Changing name", "With this rolling strategy, the files where the log messages are appended have not a fixed name (for example: filename contaning the current day.");
    static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
    static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", "Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
    static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time", "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written.");
    static final PropertyDescriptor BASE_DIRECTORY = new PropertyDescriptor.Builder().name("tail-base-directory").displayName("Base directory").description("Base directory used to look for files to tail. This property is required when using Multifile mode.").expressionLanguageSupported(true).addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(false).build();
    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("tail-mode").displayName("Tailing mode").description("Mode to use: single file will tail only one file, multiple file will look for a list of file. In Multiple mode the Base directory is required.").expressionLanguageSupported(false).required(true).allowableValues(new AllowableValue[]{MODE_SINGLEFILE, MODE_MULTIFILE}).defaultValue(MODE_SINGLEFILE.getValue()).build();
    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().displayName("File(s) to Tail").name("File to Tail").description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the path starting from the base directory (see additional details for examples).").expressionLanguageSupported(true).addValidator(StandardValidators.createRegexValidator((int)0, (int)Integer.MAX_VALUE, (boolean)true)).required(true).build();
    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder().name("Rolling Filename Pattern").description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. This pattern supports wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file (without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. The same glob pattern will be used for all files.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(false).build();
    static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder().displayName("State Location").name("File Location").description("Specifies where the state is located either local or cluster so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi").required(true).allowableValues(new AllowableValue[]{LOCATION_LOCAL, LOCATION_REMOTE}).defaultValue(LOCATION_LOCAL.getValue()).build();
    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("Initial Start Position").description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.").allowableValues(new AllowableValue[]{START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME}).defaultValue(START_CURRENT_FILE.getValue()).required(true).build();
    static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder().name("tailfile-recursive-lookup").displayName("Recursive lookup").description("When using Multiple files mode, this property defines if files must be listed recursively or not in the base directory.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final PropertyDescriptor ROLLING_STRATEGY = new PropertyDescriptor.Builder().name("tailfile-rolling-strategy").displayName("Rolling Strategy").description("Specifies if the files to tail have a fixed name or not.").required(true).allowableValues(new AllowableValue[]{FIXED_NAME, CHANGING_NAME}).defaultValue(FIXED_NAME.getValue()).build();
    static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder().name("tailfile-lookup-frequency").displayName("Lookup frequency").description("Only used in Multiple files mode and Changing name rolling strategy. It specifies the minimum duration the processor will wait before listing again the files to tail.").required(false).defaultValue("10 minutes").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder().name("tailfile-maximum-age").displayName("Maximum age").description("Only used in Multiple files mode and Changing name rolling strategy. It specifies the necessary minimum duration to consider that no new messages will be appended in a file regarding its last modification date. This should not be set too low to avoid duplication of data in case new messages are appended at a lower frequency.").required(false).defaultValue("24 hours").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this Relationship.").build();
    private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
    private volatile AtomicLong lastLookup = new AtomicLong(0L);
    private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(MODE);
        properties.add(FILENAME);
        properties.add(ROLLING_FILENAME_PATTERN);
        properties.add(BASE_DIRECTORY);
        properties.add(START_POSITION);
        properties.add(STATE_LOCATION);
        properties.add(RECURSIVE);
        properties.add(ROLLING_STRATEGY);
        properties.add(LOOKUP_FREQUENCY);
        properties.add(MAXIMUM_AGE);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && FILENAME.equals((Object)descriptor)) {
            this.states = new HashMap<String, TailFileObject>();
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(context));
        if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
            String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
            if (path == null) {
                results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false).explanation("Base directory property cannot be empty in Multifile mode.").build());
            } else if (!new File(path).isDirectory()) {
                results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false).explanation(path + " is not a directory.").build());
            }
            if (context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())) {
                String maxAge;
                String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
                if (freq == null) {
                    results.add(new ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false).explanation("In Multiple files mode and Changing name rolling strategy, lookup frequency property must be specified.").build());
                }
                if ((maxAge = context.getProperty(MAXIMUM_AGE).getValue()) == null) {
                    results.add(new ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false).explanation("In Multiple files mode and Changing name rolling strategy, maximum age property must be specified.").build());
                }
            } else {
                long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
                List<String> filesToTail = this.getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(), context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), context.getProperty(RECURSIVE).asBoolean(), max);
                if (filesToTail.isEmpty()) {
                    results.add(new ValidationResult.Builder().subject(FILENAME.getName()).valid(false).explanation("There is no file to tail. Files must exist when starting this processor.").build());
                }
            }
        }
        return results;
    }

    @OnScheduled
    public void recoverState(ProcessContext context) throws IOException {
        this.isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()) && context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()));
        this.lastLookup.set(new Date().getTime());
        long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        ArrayList<String> filesToTail = new ArrayList<String>();
        if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
            filesToTail.addAll(this.getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(), context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), context.getProperty(RECURSIVE).asBoolean(), maxAge));
        } else {
            filesToTail.add(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue());
        }
        Scope scope = this.getStateScope(context);
        StateMap stateMap = context.getStateManager().getState(scope);
        if (stateMap.getVersion() == -1L) {
            this.initStates(filesToTail, Collections.emptyMap(), true);
            this.recoverState(context, filesToTail, Collections.emptyMap());
            return;
        }
        Map statesMap = stateMap.toMap();
        this.initStates(filesToTail, statesMap, false);
        this.recoverState(context, filesToTail, statesMap);
    }

    private void initStates(List<String> filesToTail, Map<String, String> statesMap, boolean isCleared) {
        int i = 0;
        if (isCleared) {
            this.states.clear();
        } else {
            if (this.states.isEmpty() && !statesMap.isEmpty()) {
                for (String string : statesMap.keySet()) {
                    if (!string.endsWith("filename")) continue;
                    int index = Integer.valueOf(string.split("\\.")[1]);
                    this.states.put(statesMap.get(string), new TailFileObject(index, statesMap));
                }
            }
            ArrayList<String> toBeRemoved = new ArrayList<String>();
            for (String file : this.states.keySet()) {
                if (filesToTail.contains(file)) continue;
                toBeRemoved.add(file);
                this.cleanReader(this.states.get(file));
            }
            this.states.keySet().removeAll(toBeRemoved);
            for (String file : this.states.keySet()) {
                if (i > this.states.get(file).getFilenameIndex()) continue;
                i = this.states.get(file).getFilenameIndex() + 1;
            }
        }
        for (String string : filesToTail) {
            if (!isCleared && this.states.containsKey(string)) continue;
            this.states.put(string, new TailFileObject(i));
            ++i;
        }
    }

    private void recoverState(ProcessContext context, List<String> filesToTail, Map<String, String> map) throws IOException {
        for (String file : filesToTail) {
            this.recoverState(context, map, file);
        }
    }

    private List<String> getFilesToTail(String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
        Collection files = FileUtils.listFiles((File)new File(baseDir), null, (boolean)isRecursive);
        ArrayList<String> result = new ArrayList<String>();
        String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : baseDir;
        String fullRegex = File.separator.equals("/") ? baseDirNoTrailingSeparator + File.separator + fileRegex : baseDirNoTrailingSeparator + Pattern.quote(File.separator) + fileRegex;
        Pattern p = Pattern.compile(fullRegex);
        for (File file : files) {
            String path = file.getPath();
            if (!p.matcher(path).matches()) continue;
            if (this.isMultiChanging.get()) {
                if (new Date().getTime() - file.lastModified() >= maxAge) continue;
                result.add(path);
                continue;
            }
            result.add(path);
        }
        return result;
    }

    private void recoverState(ProcessContext context, Map<String, String> stateValues, String filePath) throws IOException {
        String prefix = MAP_PREFIX + this.states.get(filePath).getFilenameIndex() + '.';
        if (!stateValues.containsKey(prefix + "filename")) {
            this.resetState(filePath);
            return;
        }
        if (!stateValues.containsKey(prefix + "position")) {
            this.resetState(filePath);
            return;
        }
        if (!stateValues.containsKey(prefix + "timestamp")) {
            this.resetState(filePath);
            return;
        }
        if (!stateValues.containsKey(prefix + "length")) {
            this.resetState(filePath);
            return;
        }
        String checksumValue = stateValues.get(prefix + "checksum");
        boolean checksumPresent = checksumValue != null;
        String storedStateFilename = stateValues.get(prefix + "filename");
        long position = Long.parseLong(stateValues.get(prefix + "position"));
        long timestamp = Long.parseLong(stateValues.get(prefix + "timestamp"));
        long length = Long.parseLong(stateValues.get(prefix + "length"));
        FileChannel reader = null;
        File tailFile = null;
        if (checksumPresent && filePath.equals(storedStateFilename)) {
            this.states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
            CRC32 checksum = new CRC32();
            File existingTailFile = new File(storedStateFilename);
            if (existingTailFile.length() >= position) {
                try (FileInputStream tailFileIs = new FileInputStream(existingTailFile);
                     CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum);){
                    StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)this.states.get(filePath).getState().getPosition());
                    long checksumResult = in.getChecksum().getValue();
                    if (checksumResult == this.states.get(filePath).getExpectedRecoveryChecksum()) {
                        this.getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
                        tailFile = existingTailFile;
                        reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
                        this.getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[]{reader, tailFile});
                        reader.position(position);
                    }
                    this.getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
                }
            } else {
                this.getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position});
            }
            this.states.get(filePath).setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(65536)));
        } else {
            this.resetState(filePath);
        }
        this.getLogger().debug("Recovered state {}", new Object[]{this.states.get(filePath).getState()});
    }

    private void resetState(String filePath) {
        this.states.get(filePath).setExpectedRecoveryChecksum(null);
        this.states.get(filePath).setState(new TailFileState(filePath, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536)));
    }

    @OnStopped
    public void cleanup() {
        for (TailFileObject tfo : this.states.values()) {
            this.cleanReader(tfo);
            TailFileState state = tfo.getState();
            tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer()));
        }
    }

    private void cleanReader(TailFileObject tfo) {
        if (tfo.getState() == null) {
            return;
        }
        FileChannel reader = tfo.getState().getReader();
        if (reader == null) {
            return;
        }
        try {
            reader.close();
            this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
        }
        catch (IOException ioe) {
            this.getLogger().warn("Failed to close file handle during cleanup");
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        long timeSinceLastLookup;
        if (this.isMultiChanging.get() && (timeSinceLastLookup = new Date().getTime() - this.lastLookup.get()) > context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS)) {
            try {
                this.recoverState(context);
            }
            catch (IOException e) {
                this.getLogger().error("Exception raised while looking up for new files", (Throwable)e);
                context.yield();
                return;
            }
        }
        if (this.states.isEmpty()) {
            context.yield();
            return;
        }
        for (String tailFile : this.states.keySet()) {
            this.processTailFile(context, session, tailFile);
        }
    }

    private void processTailFile(ProcessContext context, ProcessSession session, String tailFile) {
        boolean rolloverOccurred;
        TailFileObject tfo = this.states.get(tailFile);
        if (tfo.isTailFileChanged()) {
            rolloverOccurred = false;
            String recoverPosition = context.getProperty(START_POSITION).getValue();
            if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                this.recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
            } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
                this.cleanup();
                tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
            } else {
                String filename = tailFile;
                File file = new File(filename);
                try {
                    FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
                    this.getLogger().debug("Created FileChannel {} for {}", new Object[]{fileChannel, file});
                    CRC32 checksum = new CRC32();
                    long position = file.length();
                    long timestamp = file.lastModified();
                    try (FileInputStream fis = new FileInputStream(file);
                         CheckedInputStream in = new CheckedInputStream(fis, checksum);){
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                    }
                    fileChannel.position(position);
                    this.cleanup();
                    tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
                }
                catch (IOException ioe) {
                    this.getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, (Throwable)ioe);
                    context.yield();
                    return;
                }
            }
            tfo.setTailFileChanged(false);
        } else {
            Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
            if (expectedChecksumValue == null) {
                expectedChecksumValue = tfo.getState().getChecksum() == null ? null : Long.valueOf(tfo.getState().getChecksum().getValue());
            }
            rolloverOccurred = this.recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
            tfo.setExpectedRecoveryChecksum(null);
        }
        TailFileState state = tfo.getState();
        File file = state.getFile();
        FileChannel reader = state.getReader();
        Checksum checksum = state.getChecksum();
        if (checksum == null) {
            checksum = new CRC32();
        }
        long position = state.getPosition();
        long timestamp = state.getTimestamp();
        long length = state.getLength();
        if ((file == null || reader == null) && (reader = this.createReader(file = new File(tailFile), position)) == null) {
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        if (rolloverOccurred || timestamp <= file.lastModified() && length > file.length() || timestamp < file.lastModified() && length >= file.length()) {
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader, reader});
            }
            catch (IOException ioe) {
                this.getLogger().warn("Failed to close reader for {} due to {}", new Object[]{file, ioe});
            }
            reader = this.createReader(file, 0L);
            position = 0L;
            checksum.reset();
        }
        if (file.length() == position || !file.exists()) {
            this.getLogger().debug("No data to consume; created no FlowFiles");
            tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
            this.persistState(tfo, context);
            context.yield();
            return;
        }
        final TailFileState currentState = state;
        final Checksum chksum = checksum;
        FlowFile flowFile = session.create();
        final AtomicLong positionHolder = new AtomicLong(position);
        final FileChannel fileReader = reader;
        if ((flowFile = session.write(flowFile, new OutputStreamCallback(){

            public void process(OutputStream rawOut) throws IOException {
                try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                    positionHolder.set(TailFile.this.readLines(fileReader, currentState.getBuffer(), out, chksum));
                }
            }
        })).getSize() == 0L) {
            session.remove(flowFile);
            this.getLogger().debug("No data to consume; removed created FlowFile");
        } else {
            String tailFilename = file.getName();
            String baseName = StringUtils.substringBeforeLast((String)tailFilename, (String)".");
            String flowFileName = baseName.length() < tailFilename.length() ? baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast((String)tailFilename, (String)".") : baseName + "." + position + "-" + positionHolder.get();
            HashMap<String, String> attributes = new HashMap<String, String>(3);
            attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
            attributes.put("tailfile.original.path", tailFile);
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
            session.transfer(flowFile, REL_SUCCESS);
            position = positionHolder.get();
            timestamp = Math.max(state.getTimestamp(), file.lastModified());
            length = file.length();
            this.getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
        }
        tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
        session.commit();
        this.persistState(tfo, context);
    }

    private long readLines(FileChannel reader, ByteBuffer buffer, OutputStream out, Checksum checksum) throws IOException {
        this.getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            int num;
            long pos;
            long rePos = pos = reader.position();
            int linesRead = 0;
            boolean seenCR = false;
            buffer.clear();
            while ((num = reader.read(buffer)) != -1) {
                buffer.flip();
                block14: for (int i = 0; i < num; ++i) {
                    byte ch = buffer.get(i);
                    switch (ch) {
                        case 10: {
                            baos.write((int)ch);
                            seenCR = false;
                            baos.writeTo(out);
                            checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
                            if (this.getLogger().isTraceEnabled()) {
                                this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
                            }
                            baos.reset();
                            rePos = pos + (long)i + 1L;
                            ++linesRead;
                            continue block14;
                        }
                        case 13: {
                            baos.write((int)ch);
                            seenCR = true;
                            continue block14;
                        }
                        default: {
                            if (seenCR) {
                                seenCR = false;
                                baos.writeTo(out);
                                checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
                                if (this.getLogger().isTraceEnabled()) {
                                    this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
                                }
                                ++linesRead;
                                baos.reset();
                                baos.write((int)ch);
                                rePos = pos + (long)i;
                                continue block14;
                            }
                            baos.write((int)ch);
                        }
                    }
                }
                pos = reader.position();
            }
            if (rePos < reader.position()) {
                this.getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
                reader.position(rePos);
            }
            long l = rePos;
            return l;
        }
    }

    private List<File> getRolledOffFiles(ProcessContext context, long minTimestamp, String tailFilePath) throws IOException {
        String rollingPattern;
        File tailFile = new File(tailFilePath);
        File directory = tailFile.getParentFile();
        if (directory == null) {
            directory = new File(".");
        }
        if ((rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue()) == null) {
            return Collections.emptyList();
        }
        rollingPattern = rollingPattern.replace("${filename}", StringUtils.substringBeforeLast((String)tailFile.getName(), (String)"."));
        ArrayList<File> rolledOffFiles = new ArrayList<File>();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);){
            for (Path path : dirStream) {
                File file = path.toFile();
                long lastMod = file.lastModified();
                if (file.lastModified() < minTimestamp) {
                    this.getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", new Object[]{file, lastMod, minTimestamp});
                    continue;
                }
                if (file.equals(tailFile)) continue;
                rolledOffFiles.add(file);
            }
        }
        Collections.sort(rolledOffFiles, new Comparator<File>(){

            @Override
            public int compare(File o1, File o2) {
                int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
                if (lastModifiedComp != 0) {
                    return lastModifiedComp;
                }
                return o1.getName().compareTo(o2.getName());
            }
        });
        return rolledOffFiles;
    }

    private Scope getStateScope(ProcessContext context) {
        String location = context.getProperty(STATE_LOCATION).getValue();
        if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
            return Scope.CLUSTER;
        }
        return Scope.LOCAL;
    }

    private void persistState(TailFileObject tfo, ProcessContext context) {
        this.persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), context);
    }

    private void persistState(Map<String, String> state, ProcessContext context) {
        try {
            StateMap oldState = context.getStateManager().getState(this.getStateScope(context));
            HashMap<String, String> updatedState = new HashMap<String, String>();
            for (String key : oldState.toMap().keySet()) {
                updatedState.put(key, oldState.get(key));
            }
            updatedState.putAll(state);
            context.getStateManager().setState(updatedState, this.getStateScope(context));
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
        }
    }

    private FileChannel createReader(File file, long position) {
        FileChannel reader;
        try {
            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
        }
        catch (IOException ioe) {
            this.getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed: {}", new Object[]{file, ioe});
            return null;
        }
        this.getLogger().debug("Created FileChannel {} for {}", new Object[]{reader, file});
        try {
            reader.position(position);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to read from {} due to {}", new Object[]{file, ioe});
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return null;
        }
        return reader;
    }

    Map<String, TailFileObject> getState() {
        return this.states;
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, String tailFile, Long expectedChecksum, long timestamp, long position) {
        try {
            List<File> rolledOffFiles = this.getRolledOffFiles(context, timestamp, tailFile);
            return this.recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, timestamp, position);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
            return false;
        }
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, String tailFile, List<File> rolledOffFiles, Long expectedChecksum, long timestamp, long position) {
        try {
            boolean rolloverOccurred;
            this.getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
            TailFileObject tfo = this.states.get(tailFile);
            boolean bl = rolloverOccurred = !rolledOffFiles.isEmpty();
            if (rolloverOccurred && expectedChecksum != null && rolledOffFiles.get(0).length() >= position) {
                File firstFile = rolledOffFiles.get(0);
                long startNanos = System.nanoTime();
                if (position > 0L) {
                    try (FileInputStream fis = new FileInputStream(firstFile);
                         CheckedInputStream in = new CheckedInputStream(fis, new CRC32());){
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                        long checksumResult = in.getChecksum().getValue();
                        if (checksumResult == expectedChecksum) {
                            this.getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{firstFile, position});
                            rolledOffFiles.remove(0);
                            FlowFile flowFile = session.create();
                            flowFile = session.importFrom((InputStream)in, flowFile);
                            if (flowFile.getSize() == 0L) {
                                session.remove(flowFile);
                                this.cleanup();
                                tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
                            } else {
                                HashMap<String, String> attributes = new HashMap<String, String>(3);
                                attributes.put(CoreAttributes.FILENAME.key(), firstFile.getName());
                                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
                                attributes.put("tailfile.original.path", tailFile);
                                flowFile = session.putAllAttributes(flowFile, attributes);
                                session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                session.transfer(flowFile, REL_SUCCESS);
                                this.getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
                                this.cleanup();
                                tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
                                session.commit();
                                this.persistState(tfo, context);
                            }
                        } else {
                            this.getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", new Object[]{firstFile, checksumResult, expectedChecksum});
                        }
                    }
                }
            }
            for (File file : rolledOffFiles) {
                tfo.setState(this.consumeFileFully(file, context, session, tfo));
            }
            return rolloverOccurred;
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
            return false;
        }
    }

    private TailFileState consumeFileFully(File file, ProcessContext context, ProcessSession session, TailFileObject tfo) {
        FlowFile flowFile = session.create();
        flowFile = session.importFrom(file.toPath(), true, flowFile);
        if (flowFile.getSize() == 0L) {
            session.remove(flowFile);
        } else {
            HashMap<String, String> attributes = new HashMap<String, String>(3);
            attributes.put(CoreAttributes.FILENAME.key(), file.getName());
            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
            attributes.put("tailfile.original.path", tfo.getState().getFilename());
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
            session.transfer(flowFile, REL_SUCCESS);
            this.getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
            this.cleanup();
            tfo.setState(new TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null, tfo.getState().getBuffer()));
            session.commit();
            this.persistState(tfo, context);
        }
        return tfo.getState();
    }

    static class TailFileState {
        private final String filename;
        private final File file;
        private final FileChannel reader;
        private final long position;
        private final long timestamp;
        private final long length;
        private final Checksum checksum;
        private final ByteBuffer buffer;

        public TailFileState(String filename, File file, FileChannel reader, long position, long timestamp, long length, Checksum checksum, ByteBuffer buffer) {
            this.filename = filename;
            this.file = file;
            this.reader = reader;
            this.position = position;
            this.length = length;
            this.timestamp = timestamp;
            this.checksum = checksum;
            this.buffer = buffer;
        }

        public String getFilename() {
            return this.filename;
        }

        public File getFile() {
            return this.file;
        }

        public FileChannel getReader() {
            return this.reader;
        }

        public long getPosition() {
            return this.position;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public long getLength() {
            return this.length;
        }

        public Checksum getChecksum() {
            return this.checksum;
        }

        public ByteBuffer getBuffer() {
            return this.buffer;
        }

        public String toString() {
            return "TailFileState[filename=" + this.filename + ", position=" + this.position + ", timestamp=" + this.timestamp + ", checksum=" + (this.checksum == null ? "null" : Long.valueOf(this.checksum.getValue())) + "]";
        }

        public Map<String, String> toStateMap(int index) {
            String prefix = TailFile.MAP_PREFIX + index + '.';
            HashMap<String, String> map = new HashMap<String, String>(4);
            map.put(prefix + "filename", this.filename);
            map.put(prefix + "position", String.valueOf(this.position));
            map.put(prefix + "length", String.valueOf(this.length));
            map.put(prefix + "timestamp", String.valueOf(this.timestamp));
            map.put(prefix + "checksum", this.checksum == null ? null : String.valueOf(this.checksum.getValue()));
            return map;
        }

        private static class StateKeys {
            public static final String FILENAME = "filename";
            public static final String POSITION = "position";
            public static final String TIMESTAMP = "timestamp";
            public static final String CHECKSUM = "checksum";
            public static final String LENGTH = "length";

            private StateKeys() {
            }
        }
    }

    static class TailFileObject {
        private TailFileState state = new TailFileState(null, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
        private Long expectedRecoveryChecksum;
        private int filenameIndex;
        private boolean tailFileChanged = true;

        public TailFileObject(int i) {
            this.filenameIndex = i;
        }

        public TailFileObject(int index, Map<String, String> statesMap) {
            this.filenameIndex = index;
            this.tailFileChanged = false;
            String prefix = TailFile.MAP_PREFIX + index + '.';
            String filename = statesMap.get(prefix + "filename");
            long position = Long.valueOf(statesMap.get(prefix + "position"));
            long timestamp = Long.valueOf(statesMap.get(prefix + "timestamp"));
            long length = Long.valueOf(statesMap.get(prefix + "length"));
            this.state = new TailFileState(filename, new File(filename), null, position, timestamp, length, null, ByteBuffer.allocate(65536));
        }

        public int getFilenameIndex() {
            return this.filenameIndex;
        }

        public void setFilenameIndex(int filenameIndex) {
            this.filenameIndex = filenameIndex;
        }

        public TailFileState getState() {
            return this.state;
        }

        public void setState(TailFileState state) {
            this.state = state;
        }

        public Long getExpectedRecoveryChecksum() {
            return this.expectedRecoveryChecksum;
        }

        public void setExpectedRecoveryChecksum(Long expectedRecoveryChecksum) {
            this.expectedRecoveryChecksum = expectedRecoveryChecksum;
        }

        public boolean isTailFileChanged() {
            return this.tailFileChanged;
        }

        public void setTailFileChanged(boolean tailFileChanged) {
            this.tailFileChanged = tailFileChanged;
        }
    }
}

