/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.file;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.file.FileAlterationImpl;
import io.siddhi.extension.io.file.util.Status;
import io.siddhi.extension.io.file.util.Util;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;

@Extension(name="fileeventlistener", namespace="source", description="The 'fileeventlistener' component of the 'siddhi-io-fie' extension allows you to get the details of files that have been created, modified or deleted during execution time.", parameters={@Parameter(name="dir.uri", description="The path to the directory to be processed. During execution time, Siddhi by default processes all the files within this directory. However, if you have entered specific files to be processed via the 'file.name.list' parameter, only those files are processed. The URI specified must include the file handling protocol to be used for file processing.\n e.g., If the file handling protocol to be used is 'ftp', the URI must be provided as 'ftp://<DIRECTORY_PATH>>'.\n", optional=false, type={DataType.STRING}), @Parameter(name="monitoring.interval", description="The time duration (in milliseconds) for which the system must monitor changes to the files in the specified directory.\n", type={DataType.STRING}, optional=true, defaultValue="100"), @Parameter(name="file.name.list", description="If you want to carry out processing for only for one or more specific files in the the given directory URI, you can use this parameter to specify those files as a comma-separated list. \ne.g., 'abc.txt,xyz.csv'", optional=true, type={DataType.STRING}, defaultValue="<Empty_String>")}, examples={@Example(syntax="@source(type='fileeventlistener', dir.uri='file://abc/xyz, file.name.list = 'xyz.txt, test') \ndefine stream FileListenerStream (filepath string, filename string, status string);\n@sink(type='log')\ndefine stream FooStream (filepath string, filename string, status string); \nfrom FileListenerStream\nselect *\ninsert into FooStream;", description="In the above configuration, the system monitors the given directory URI to check whether any file named either 'xyz.txt' or 'test' gets created, modified or deleted. If any such activity is detected, an input event is generated in the 'FooStream' stream. The information included in the event are the filepath, filename, and the status of the file.\n"), @Example(syntax="@source(type='fileeventlistener',dir.uri='file://abc/xyz') \ndefine stream FileListenerStream (filepath string, filename string, status string);\n@sink(type='log')\ndefine stream FooStream (filepath string, filename string, status string); \nfrom FileListenerStream\nselect *\ninsert into FooStream;", description="In the above configuration, the system monitors the given directory URI to check whether any file gets created, modified or deleted. If any such activity is detected, an input event is generated in the 'FooStream' stream. The information included in the event are the filepath, filename, and the status of the file.\n"), @Example(syntax="@source(type='fileeventlistener',dir.uri='file://abc/xyz', monitoring.interval='200')\ndefine stream FileListenerStream (filepath string, filename string, status string);\n@sink(type='log')\ndefine stream FooStream (filepath string, filename string, status string);\nfrom FileListenerStream\nselect *\ninsert into FooStream;", description="In the above configuration, the system monitors the given directory URI every 200 milliseconds to check whether any file gets created, modified or deleted. If any such activity is detected, an input event is generated in the 'FooStream' stream. The information included in the event are the filepath, filename, and the status of the file.\n")})
public class FileHandler
extends Source<FileHandlerState> {
    private static final Logger log = Logger.getLogger(FileHandler.class);
    private static final String EMPTY_STRING = "";
    private SourceEventListener sourceEventListener;
    private long monitoringInterval = 100L;
    private String listeningDirUri;
    private FileAlterationMonitor monitor;
    private Map<String, Long> fileObjectMap = new ConcurrentHashMap<String, Long>();
    private static final String CURRENT_MAP_KEY = "current.map.key";
    private List<String> fileObjectList;

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public StateFactory<FileHandlerState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requiredProperties, ConfigReader configReader, SiddhiAppContext siddhiAppContext) throws SiddhiAppValidationException {
        this.sourceEventListener = sourceEventListener;
        if (optionHolder.isOptionExists("dir.uri")) {
            this.listeningDirUri = optionHolder.validateAndGetStaticValue("dir.uri");
        }
        if (this.listeningDirUri == null || this.listeningDirUri.isEmpty()) {
            throw new SiddhiAppCreationException("URI must be provided.");
        }
        FileObject listeningFileObject = Utils.getFileObject(this.listeningDirUri);
        try {
            if (!listeningFileObject.exists()) {
                throw new SiddhiAppCreationException("Directory " + listeningFileObject.getPublicURIString() + " is not found.");
            }
            if (listeningFileObject.isFile()) {
                throw new SiddhiAppCreationException("URI must belongs to a folder");
            }
            this.listeningDirUri = listeningFileObject.getName().getPath();
        }
        catch (FileSystemException e) {
            throw new SiddhiAppValidationException("Directory " + listeningFileObject.getPublicURIString() + " is not found.", (Throwable)e);
        }
        String fileNameList = optionHolder.validateAndGetStaticValue("file.name.list", EMPTY_STRING);
        fileNameList = fileNameList.replaceAll("\\s", EMPTY_STRING);
        this.fileObjectList = Arrays.asList(fileNameList.split(","));
        for (int i = 0; i < this.fileObjectList.size(); ++i) {
            String fileObjectPath = this.listeningDirUri + "/" + this.fileObjectList.get(i);
            listeningFileObject = Utils.getFileObject(fileObjectPath);
            try {
                if (!listeningFileObject.exists()) {
                    throw new SiddhiAppCreationException("File/Folder " + listeningFileObject.getPublicURIString() + " is not found.");
                }
            }
            catch (FileSystemException e) {
                log.error((Object)("File/Folder " + listeningFileObject.getPublicURIString() + " is not found."), (Throwable)e);
            }
            this.fileObjectList.set(i, fileObjectPath);
        }
        String monitoringValue = optionHolder.validateAndGetStaticValue("monitoring.interval", "100");
        try {
            this.monitoringInterval = Long.parseLong(monitoringValue);
        }
        catch (NumberFormatException e) {
            throw new SiddhiAppValidationException("Value provided for monitoring, " + monitoringValue + " is invalid.", (Throwable)e);
        }
        return () -> new FileHandlerState();
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Event.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, FileHandlerState fileHandlerState) {
        this.initiateFileAlterationObserver();
        File[] listOfFiles = new File(this.listeningDirUri).listFiles();
        if (listOfFiles != null) {
            for (File listOfFile : listOfFiles) {
                if (this.fileObjectMap.containsKey(listOfFile.getAbsolutePath())) continue;
                this.fileObjectMap.put(listOfFile.getAbsolutePath(), listOfFile.length());
                this.sourceEventListener.onEvent((Object)Util.getFileHandlerEvent(listOfFile, this.fileObjectList, Status.STATUS_NEW), null);
            }
            for (Map.Entry entry : this.fileObjectMap.entrySet()) {
                File fileObjectMapEntry;
                List<File> fileList = Arrays.asList(listOfFiles);
                if (fileList.contains(fileObjectMapEntry = new File((String)entry.getKey()))) continue;
                this.fileObjectMap.remove(fileObjectMapEntry.getAbsolutePath());
                this.sourceEventListener.onEvent((Object)Util.getFileHandlerEvent(fileObjectMapEntry, this.fileObjectList, Status.STATUS_REMOVE), null);
            }
        }
    }

    public void initiateFileAlterationObserver() {
        FileAlterationObserver observer = new FileAlterationObserver(this.listeningDirUri);
        observer.addListener((FileAlterationListener)new FileAlterationImpl(this.sourceEventListener, this.fileObjectList));
        this.monitor = new FileAlterationMonitor(this.monitoringInterval);
        this.monitor.addObserver(observer);
        try {
            this.monitor.start();
            log.debug((Object)("Directory monitoring has been started for folder/file : " + this.listeningDirUri + " ."));
        }
        catch (Exception e) {
            throw new SiddhiAppRuntimeException("Exception occurred when starting server to monitor " + this.listeningDirUri + ".", (Throwable)e);
        }
    }

    public void disconnect() {
        if (this.monitor != null) {
            try {
                this.monitor.stop();
                this.fileObjectMap.clear();
                log.debug((Object)("Directory monitoring has been stopped for folder/file : " + this.listeningDirUri + " ."));
            }
            catch (Exception e) {
                log.error((Object)("Exception occurred when stopping server while monitoring " + this.listeningDirUri + " ."), (Throwable)e);
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
        if (this.monitor != null) {
            log.debug((Object)("Directory monitoring has been paused for folder/file : " + this.listeningDirUri + " ."));
        }
    }

    public void resume() {
        if (this.monitor != null) {
            log.debug((Object)("Directory monitoring has been resumed for folder/file : " + this.listeningDirUri + " ."));
        }
    }

    public class FileHandlerState
    extends State {
        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> currentState = new HashMap<String, Object>();
            currentState.put(FileHandler.CURRENT_MAP_KEY, FileHandler.this.fileObjectMap);
            return currentState;
        }

        public void restore(Map<String, Object> state) {
            FileHandler.this.fileObjectMap = (Map)state.get(FileHandler.CURRENT_MAP_KEY);
        }
    }
}

