/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import org.apache.nifi.util.file.monitor.UpdateMonitor;
import org.apache.nifi.util.search.Search;
import org.apache.nifi.util.search.SearchTerm;
import org.apache.nifi.util.search.ahocorasick.AhoCorasick;
import org.apache.nifi.util.search.ahocorasick.SearchState;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"aho-corasick", "scan", "content", "byte sequence", "search", "find", "dictionary"})
@CapabilityDescription(value="Scans the content of FlowFiles for terms that are found in a user-supplied dictionary. If a term is matched, the UTF-8 encoded version of the term will be added to the FlowFile using the 'matching.term' attribute")
@WritesAttribute(attribute="matching.term", description="The term that caused the Processor to route the FlowFile to the 'matched' relationship; if FlowFile is routed to the 'unmatched' relationship, this attribute is not added")
public class ScanContent
extends AbstractProcessor {
    public static final String TEXT_ENCODING = "text";
    public static final String BINARY_ENCODING = "binary";
    public static final String MATCH_ATTRIBUTE_KEY = "matching.term";
    public static final PropertyDescriptor DICTIONARY = new PropertyDescriptor.Builder().name("Dictionary File").description("The filename of the terms dictionary").required(true).addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).build();
    public static final PropertyDescriptor DICTIONARY_ENCODING = new PropertyDescriptor.Builder().name("Dictionary Encoding").description("Indicates how the dictionary is encoded. If 'text', dictionary terms are new-line delimited and UTF-8 encoded; if 'binary', dictionary terms are denoted by a 4-byte integer indicating the term length followed by the term itself").required(true).allowableValues(new String[]{"text", "binary"}).defaultValue("text").build();
    public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles that match at least one term in the dictionary are routed to this relationship").build();
    public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles that do not match any term in the dictionary are routed to this relationship").build();
    public static final Charset UTF8 = Charset.forName("UTF-8");
    private final AtomicReference<SynchronousFileWatcher> fileWatcherRef = new AtomicReference();
    private final AtomicReference<Search<byte[]>> searchRef = new AtomicReference();
    private final ReentrantLock dictionaryUpdateLock = new ReentrantLock();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(DICTIONARY);
        properties.add(DICTIONARY_ENCODING);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_MATCH);
        relationships.add(REL_NO_MATCH);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)DICTIONARY)) {
            this.fileWatcherRef.set(new SynchronousFileWatcher(Paths.get(newValue, new String[0]), (UpdateMonitor)new LastModifiedMonitor(), 60000L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reloadDictionary(ProcessContext context, boolean force, ProcessorLog logger) throws IOException {
        boolean obtainedLock;
        if (force) {
            this.dictionaryUpdateLock.lock();
            obtainedLock = true;
        } else {
            obtainedLock = this.dictionaryUpdateLock.tryLock();
        }
        if (obtainedLock) {
            try {
                boolean bl;
                AhoCorasick search = new AhoCorasick();
                HashSet<SearchTerm<byte[]>> terms = new HashSet<SearchTerm<byte[]>>();
                InputStream inStream = Files.newInputStream(Paths.get(context.getProperty(DICTIONARY).getValue(), new String[0]), StandardOpenOption.READ);
                TermLoader termLoader = context.getProperty(DICTIONARY_ENCODING).getValue().equalsIgnoreCase(TEXT_ENCODING) ? new TextualTermLoader(inStream) : new BinaryTermLoader(inStream);
                try {
                    SearchTerm<byte[]> term;
                    while ((term = termLoader.nextTerm()) != null) {
                        terms.add(term);
                    }
                    search.initializeDictionary(terms);
                    this.searchRef.set((Search<byte[]>)search);
                    logger.info("Loaded search dictionary from {}", new Object[]{context.getProperty(DICTIONARY).getValue()});
                    bl = true;
                }
                catch (Throwable throwable) {
                    termLoader.close();
                    throw throwable;
                }
                termLoader.close();
                return bl;
            }
            finally {
                this.dictionaryUpdateLock.unlock();
            }
        }
        return false;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ProcessorLog logger = this.getLogger();
        SynchronousFileWatcher fileWatcher = this.fileWatcherRef.get();
        try {
            if (fileWatcher.checkAndReset()) {
                this.reloadDictionary(context, true, logger);
            }
        }
        catch (IOException e) {
            throw new ProcessException((Throwable)e);
        }
        Search<byte[]> search = this.searchRef.get();
        try {
            if (search == null && this.reloadDictionary(context, false, logger)) {
                search = this.searchRef.get();
            }
        }
        catch (IOException e) {
            throw new ProcessException((Throwable)e);
        }
        if (search == null) {
            return;
        }
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final Search<byte[]> finalSearch = search;
        final ObjectHolder termRef = new ObjectHolder(null);
        termRef.set(null);
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream rawIn) throws IOException {
                try (BufferedInputStream in = new BufferedInputStream(rawIn);){
                    SearchState searchResult = finalSearch.search((InputStream)in, false);
                    if (searchResult.foundMatch()) {
                        termRef.set(searchResult.getResults().keySet().iterator().next());
                    }
                }
            }
        });
        SearchTerm matchingTerm = (SearchTerm)termRef.get();
        if (matchingTerm == null) {
            logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
            session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
            session.transfer(flowFile, REL_NO_MATCH);
        } else {
            String matchingTermString = matchingTerm.toString(UTF8);
            logger.info("Routing {} to 'matched' because it matched term {}", new Object[]{flowFile, matchingTermString});
            flowFile = session.putAttribute(flowFile, MATCH_ATTRIBUTE_KEY, matchingTermString);
            session.getProvenanceReporter().route(flowFile, REL_MATCH);
            session.transfer(flowFile, REL_MATCH);
        }
    }

    private static class BinaryTermLoader
    implements TermLoader {
        private final DataInputStream inStream;

        public BinaryTermLoader(InputStream inStream) {
            this.inStream = new DataInputStream((InputStream)new BufferedInputStream(inStream));
        }

        @Override
        public SearchTerm<byte[]> nextTerm() throws IOException {
            this.inStream.mark(1);
            int nextByte = this.inStream.read();
            if (nextByte == -1) {
                return null;
            }
            this.inStream.reset();
            int termLength = this.inStream.readInt();
            byte[] term = new byte[termLength];
            this.inStream.readFully(term);
            return new SearchTerm(term);
        }

        @Override
        public void close() throws IOException {
            this.inStream.close();
        }
    }

    private static class TextualTermLoader
    implements TermLoader {
        private final BufferedReader reader;

        public TextualTermLoader(InputStream inStream) {
            this.reader = new BufferedReader(new InputStreamReader(inStream));
        }

        @Override
        public SearchTerm<byte[]> nextTerm() throws IOException {
            String nextLine = this.reader.readLine();
            if (nextLine == null || nextLine.isEmpty()) {
                return null;
            }
            return new SearchTerm(nextLine.getBytes("UTF-8"));
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
        }
    }

    private static interface TermLoader
    extends Closeable {
        public SearchTerm<byte[]> nextTerm() throws IOException;
    }
}

