package org.wso2.extension.siddhi.io.file;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.ServerConnector;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
import org.wso2.carbon.transport.file.connector.sender.VFSClientConnector;
import org.wso2.carbon.transport.file.connector.server.FileServerConnector;
import org.wso2.carbon.transport.file.connector.server.FileServerConnectorProvider;
import org.wso2.carbon.transport.remotefilesystem.RemoteFileSystemConnectorFactory;
import org.wso2.carbon.transport.remotefilesystem.exception.RemoteFileSystemConnectorException;
import org.wso2.carbon.transport.remotefilesystem.server.connector.contract.RemoteFileSystemServerConnector;
import org.wso2.extension.siddhi.io.file.listeners.FileSystemListener;
import org.wso2.extension.siddhi.io.file.processors.FileProcessor;
import org.wso2.extension.siddhi.io.file.util.Constants;
import org.wso2.extension.siddhi.io.file.util.FileSourceConfiguration;
import org.wso2.extension.siddhi.io.file.util.FileSourceServiceProvider;
import org.wso2.extension.siddhi.io.file.util.VFSClientConnectorCallback;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "file", namespace = "source", description = "File Source provides the functionality for user to feed data to siddhi from files. Both text and binary files are supported by file source.", parameters = {@Parameter(name = Constants.DIR_URI, description = "Used to specify a directory to be processed. \nAll the files inside this directory will be processed. \nOnly one of 'dir.uri' and 'file.uri' should be provided.\nThis uri MUST have the respective protocol specified.", type = {DataType.STRING}), @Parameter(name = Constants.FILE_URI, description = "Used to specify a file to be processed. \n Only one of 'dir.uri' and 'file.uri' should be provided.\nThis uri MUST have the respective protocol specified.\n", type = {DataType.STRING}), @Parameter(name = Constants.MODE, description = "This parameter is used to specify how files in given directory should.Possible values for this parameter are,\n1. TEXT.FULL : to read a text file completely at once.\n2. BINARY.FULL : to read a binary file completely at once.\n3. LINE : to read a text file line by line.\n4. REGEX : to read a text file and extract data using a regex.\n", type = {DataType.STRING}, optional = true, defaultValue = Constants.LINE), @Parameter(name = Constants.TAILING, description = "This can either have value true or false. By default it will be true. \nThis attribute allows user to specify whether the file should be tailed or not. \nIf tailing is enabled, the first file of the directory will be tailed.\nAlso tailing should not be enabled in 'binary.full' or 'text.full' modes.\n", type = {DataType.BOOL}, optional = true, defaultValue = Constants.TRUE), @Parameter(name = Constants.ACTION_AFTER_PROCESS, description = "This parameter is used to specify the action which should be carried out \nafter processing a file in the given directory. \nIt can be either DELETE or MOVE and default value will be 'DELETE'.\nIf the action.after.process is MOVE, user must specify the location to move consumed files using 'move.after.process' parameter.\n", type = {DataType.STRING}, optional = true, defaultValue = "delete"), @Parameter(name = Constants.ACTION_AFTER_FAILURE, description = "This parameter is used to specify the action which should be carried out if a failure occurred during the process. \nIt can be either DELETE or MOVE and default value will be 'DELETE'.\nIf the action.after.failure is MOVE, user must specify the location to move consumed files using 'move.after.failure' parameter.\n", type = {DataType.STRING}, optional = true, defaultValue = "delete"), @Parameter(name = Constants.MOVE_AFTER_PROCESS, description = "If action.after.process is MOVE, user must specify the location to move consumed files using 'move.after.process' parameter.\nThis should be the absolute path of the file that going to be created after moving is done.\nThis uri MUST have the respective protocol specified.\n", type = {DataType.STRING}), @Parameter(name = Constants.MOVE_AFTER_FAILURE, description = "If action.after.failure is MOVE, user must specify the location to move consumed files using 'move.after.failure' parameter.\nThis should be the absolute path of the file that going to be created after moving is done.\nThis uri MUST have the respective protocol specified.\n", type = {DataType.STRING}), @Parameter(name = Constants.BEGIN_REGEX, description = "This will define the regex to be matched at the beginning of the retrieved content.\n", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = Constants.END_REGEX, description = "This will define the regex to be matched at the end of the retrieved content.\n", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = Constants.FILE_POLLING_INTERVAL, description = "This parameter is used to specify the time period (in milliseconds) of a polling cycle for a file.\n", type = {DataType.STRING}, optional = true, defaultValue = "1000"), @Parameter(name = Constants.DIRECTORY_POLLING_INTERVAL, description = "This parameter is used to specify the time period (in milliseconds) of a polling cycle for a directory.\n", type = {DataType.STRING}, optional = true, defaultValue = "1000"), @Parameter(name = "timeout", description = "This parameter is used to specify the maximum time period (in milliseconds)  for waiting until a file is processed.\n", type = {DataType.STRING}, optional = true, defaultValue = "5000")}, examples = {@Example(syntax = "@source(type='file',\nmode='text.full',\ntailing='false'\n dir.uri='file://abc/xyz',\naction.after.process='delete',\n@map(type='json')) \ndefine stream FooStream (symbol string, price float, volume long); \n", description = "Under above configuration, all the files in directory will be picked and read one by one.\nIn this case, it's assumed that all the files contains json valid json strings with keys 'symbol','price' and 'volume'.\nOnce a file is read, its content will be converted to an event using siddhi-map-json extension and then, that event will be received to the FooStream.\nFinally, after reading is finished, the file will be deleted.\n"), @Example(syntax = "@source(type='file',\nmode='files.repo.line',\ntailing='true',\ndir.uri='file://abc/xyz',\n@map(type='json')) \ndefine stream FooStream (symbol string, price float, volume long);\n ", description = "Under above configuration, the first file in directory '/abc/xyz'  will be picked and read line by line.\nIn this case, it is assumed that the file contains lines json strings.\nFor each line, line content will be converted to an event using siddhi-map-json extension and then, that event will be received to the FooStream.\nOnce file content is completely read, it will keep checking whether a new entry is added to the file or not.\nIf such entry is added, it will be immediately picked up and processed.\n")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/file/FileSource.class */
public class FileSource extends Source {
    private static final Logger log = Logger.getLogger(FileSource.class);
    private SourceEventListener sourceEventListener;
    private FileSourceConfiguration fileSourceConfiguration;
    private RemoteFileSystemConnectorFactory fileSystemConnectorFactory;
    private FileSourceServiceProvider fileSourceServiceProvider;
    private RemoteFileSystemServerConnector fileSystemServerConnector;
    private String[] requiredProperties;
    private SiddhiAppContext siddhiAppContext;
    private String mode;
    private String actionAfterProcess;
    private String moveAfterProcess;
    private String tailing;
    private String beginRegex;
    private String endRegex;
    private String tailedFileURI;
    private String dirUri;
    private String fileUri;
    private String dirPollingInterval;
    private String filePollingInterval;
    private String filePointer = "0";
    private boolean isTailingEnabled = true;
    private String actionAfterFailure = null;
    private String moveAfterFailure = null;
    private long timeout = 5000;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.siddhiAppContext = siddhiAppContext;
        this.requiredProperties = (String[]) strArr.clone();
        this.fileSourceConfiguration = new FileSourceConfiguration();
        this.fileSourceServiceProvider = FileSourceServiceProvider.getInstance();
        this.fileSystemConnectorFactory = this.fileSourceServiceProvider.getFileSystemConnectorFactory();
        if (optionHolder.isOptionExists(Constants.DIR_URI)) {
            this.dirUri = optionHolder.validateAndGetStaticValue(Constants.DIR_URI);
            validateURL(this.dirUri, Constants.DIR_URI);
        }
        if (optionHolder.isOptionExists(Constants.FILE_URI)) {
            this.fileUri = optionHolder.validateAndGetStaticValue(Constants.FILE_URI);
            validateURL(this.fileUri, Constants.FILE_URI);
        }
        if (this.dirUri != null && this.fileUri != null) {
            throw new SiddhiAppCreationException("Only one of directory uri or file uri should be provided. But both have been provided.");
        }
        if (this.dirUri == null && this.fileUri == null) {
            throw new SiddhiAppCreationException("Either directory uri or file uri must be provided. But none of themfound.");
        }
        this.mode = optionHolder.validateAndGetStaticValue(Constants.MODE, Constants.LINE);
        if (Constants.TEXT_FULL.equalsIgnoreCase(this.mode) || Constants.BINARY_FULL.equalsIgnoreCase(this.mode)) {
            this.tailing = optionHolder.validateAndGetStaticValue(Constants.TAILING, Constants.FALSE);
        } else {
            this.tailing = optionHolder.validateAndGetStaticValue(Constants.TAILING, Constants.TRUE);
        }
        this.isTailingEnabled = Boolean.parseBoolean(this.tailing);
        if (this.isTailingEnabled) {
            this.actionAfterProcess = optionHolder.validateAndGetStaticValue(Constants.ACTION_AFTER_PROCESS, Constants.NONE);
        } else {
            this.actionAfterProcess = optionHolder.validateAndGetStaticValue(Constants.ACTION_AFTER_PROCESS, "delete");
        }
        this.actionAfterFailure = optionHolder.validateAndGetStaticValue(Constants.ACTION_AFTER_FAILURE, "delete");
        if (optionHolder.isOptionExists(Constants.MOVE_AFTER_PROCESS)) {
            this.moveAfterProcess = optionHolder.validateAndGetStaticValue(Constants.MOVE_AFTER_PROCESS);
            validateURL(this.moveAfterProcess, "moveAfterProcess");
        }
        if (optionHolder.isOptionExists(Constants.MOVE_AFTER_FAILURE)) {
            this.moveAfterFailure = optionHolder.validateAndGetStaticValue(Constants.MOVE_AFTER_FAILURE);
            validateURL(this.moveAfterFailure, "moveAfterFailure");
        }
        this.dirPollingInterval = optionHolder.validateAndGetStaticValue(Constants.DIRECTORY_POLLING_INTERVAL, "1000");
        this.filePollingInterval = optionHolder.validateAndGetStaticValue(Constants.FILE_POLLING_INTERVAL, "1000");
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue("timeout", "5000");
        try {
            this.timeout = Long.parseLong(validateAndGetStaticValue);
            this.beginRegex = optionHolder.validateAndGetStaticValue(Constants.BEGIN_REGEX, (String) null);
            this.endRegex = optionHolder.validateAndGetStaticValue(Constants.END_REGEX, (String) null);
            validateParameters();
            createInitialSourceConf();
            updateSourceConf();
            getPattern();
            siddhiAppContext.getSnapshotService().addSnapshotable("siddhi-io-file", this);
        } catch (NumberFormatException e) {
            throw new SiddhiAppRuntimeException("Value provided for timeout, " + validateAndGetStaticValue + " is invalid.", e);
        }
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        updateSourceConf();
        deployServers();
    }

    public void disconnect() {
        try {
            if (this.fileSystemServerConnector != null) {
                this.fileSystemServerConnector.stop();
                this.fileSystemServerConnector = null;
            }
            if (this.isTailingEnabled) {
                this.fileSourceConfiguration.getFileServerConnector().stop();
                this.fileSourceConfiguration.setFileServerConnector(null);
            }
            ExecutorService executorService = this.fileSourceConfiguration.getExecutorService();
            if (executorService != null && !executorService.isShutdown()) {
                executorService.shutdown();
            }
        } catch (ServerConnectorException e) {
            throw new SiddhiAppRuntimeException("Failed to stop the file server when shutting down the siddhi app '" + this.siddhiAppContext.getName() + "' due to " + e.getMessage(), e);
        }
    }

    public void destroy() {
    }

    public void pause() {
        try {
            if (this.fileSystemServerConnector != null) {
                this.fileSystemServerConnector.stop();
            }
            if (this.isTailingEnabled && this.fileSourceConfiguration.getFileServerConnector() != null) {
                this.fileSourceConfiguration.getFileServerConnector().stop();
            }
        } catch (ServerConnectorException e) {
            throw new SiddhiAppRuntimeException("Failed to stop the file server.", e);
        }
    }

    public void resume() {
        try {
            updateSourceConf();
            deployServers();
        } catch (ConnectionUnavailableException e) {
            throw new SiddhiAppRuntimeException("Failed to resume siddhi app runtime.", e);
        }
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.FILE_POINTER, this.fileSourceConfiguration.getFilePointer());
        hashMap.put(Constants.TAILED_FILE, this.fileSourceConfiguration.getTailedFileURI());
        hashMap.put(Constants.TAILING_REGEX_STRING_BUILDER, this.fileSourceConfiguration.getTailingRegexStringBuilder());
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.filePointer = map.get(Constants.FILE_POINTER).toString();
        this.tailedFileURI = map.get(Constants.TAILED_FILE).toString();
        this.fileSourceConfiguration.setFilePointer(this.filePointer);
        this.fileSourceConfiguration.setTailedFileURI(this.tailedFileURI);
        this.fileSourceConfiguration.updateTailingRegexStringBuilder((StringBuilder) map.get(Constants.TAILING_REGEX_STRING_BUILDER));
    }

    private void createInitialSourceConf() {
        this.fileSourceConfiguration.setBeginRegex(this.beginRegex);
        this.fileSourceConfiguration.setEndRegex(this.endRegex);
        this.fileSourceConfiguration.setMode(this.mode);
        this.fileSourceConfiguration.setTailingEnabled(Boolean.parseBoolean(this.tailing));
        this.fileSourceConfiguration.setFilePollingInterval(this.filePollingInterval);
        this.fileSourceConfiguration.setRequiredProperties(this.requiredProperties);
        this.fileSourceConfiguration.setActionAfterProcess(this.actionAfterProcess);
        this.fileSourceConfiguration.setMoveAfterProcess(this.moveAfterProcess);
        this.fileSourceConfiguration.setTimeout(this.timeout);
    }

    private void updateSourceConf() {
        this.fileSourceConfiguration.setFilePointer(this.filePointer);
        this.fileSourceConfiguration.setTailedFileURI(this.tailedFileURI);
    }

    private Map<String, String> getFileSystemServerProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("dirURI", this.dirUri);
        if (this.actionAfterProcess != null) {
            hashMap.put("actionAfterProcess", this.actionAfterProcess.toUpperCase(Locale.ENGLISH));
        }
        hashMap.put("moveAfterProcess".toUpperCase(Locale.ENGLISH), this.moveAfterProcess);
        hashMap.put("pollingInterval", this.dirPollingInterval);
        hashMap.put("fileSortAttribute", "name");
        hashMap.put("fileSortAscending", Constants.TRUE.toUpperCase(Locale.ENGLISH));
        hashMap.put("createMoveDir", Constants.TRUE.toUpperCase(Locale.ENGLISH));
        hashMap.put(Constants.ACK_TIME_OUT, "5000");
        if (Constants.BINARY_FULL.equalsIgnoreCase(this.mode) || Constants.TEXT_FULL.equalsIgnoreCase(this.mode)) {
            hashMap.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE.toUpperCase(Locale.ENGLISH));
        } else {
            hashMap.put(Constants.READ_FILE_FROM_BEGINNING, Constants.FALSE.toUpperCase(Locale.ENGLISH));
        }
        if (this.actionAfterFailure != null) {
            hashMap.put("actionAfterFailure", this.actionAfterFailure.toUpperCase(Locale.ENGLISH));
        }
        if (this.moveAfterFailure != null) {
            hashMap.put("moveAfterFailure", this.moveAfterFailure.toUpperCase(Locale.ENGLISH));
        }
        return hashMap;
    }

    private void validateParameters() {
        if (Constants.TEXT_FULL.equalsIgnoreCase(this.mode) || Constants.BINARY_FULL.equalsIgnoreCase(this.mode)) {
            if (this.isTailingEnabled) {
                throw new SiddhiAppCreationException("Tailing has been enabled by user or by default.But tailing can't be enabled in '" + this.mode + "' mode.");
            }
            if (Constants.BINARY_FULL.equalsIgnoreCase(this.mode) && this.beginRegex != null && this.endRegex != null) {
                throw new SiddhiAppCreationException("'begin.regex' and 'end.regex' can be only provided if the mode is 'regex'. But provided mode is '" + this.mode + "'.");
            }
        }
        if (this.isTailingEnabled && this.moveAfterProcess != null) {
            throw new SiddhiAppCreationException("Tailing has been enabled by user or by default.'moveAfterProcess' cannot be used when tailing is enabled. Hence stopping the SiddhiApp. ");
        }
        if ("delete".equalsIgnoreCase(this.actionAfterProcess) && this.moveAfterProcess != null) {
            throw new SiddhiAppCreationException("'moveAfterProcess' can only be used when 'action.after.process' is 'move'. But it has been used when 'action.after.process' is 'delete'.Hence stopping the SiddhiApp. ");
        }
        if ("move".equalsIgnoreCase(this.actionAfterProcess) && this.moveAfterProcess == null) {
            throw new SiddhiAppCreationException("'moveAfterProcess' has not been provided where it is mandatory when 'actionAfterProcess' is 'move'. Hence stopping the SiddhiApp. ");
        }
        if (Constants.REGEX.equalsIgnoreCase(this.mode) && this.beginRegex == null && this.endRegex == null) {
            this.mode = Constants.LINE;
        }
    }

    private void deployServers() throws ConnectionUnavailableException {
        ExecutorService executorService = this.siddhiAppContext.getExecutorService();
        createInitialSourceConf();
        this.fileSourceConfiguration.setExecutorService(executorService);
        if (this.dirUri != null) {
            try {
                this.fileSystemServerConnector = this.fileSystemConnectorFactory.createServerConnector(this.siddhiAppContext.getName(), getFileSystemServerProperties(), new FileSystemListener(this.sourceEventListener, this.fileSourceConfiguration));
                this.fileSourceConfiguration.setFileSystemServerConnector(this.fileSystemServerConnector);
                this.fileSystemServerConnector.start();
                return;
            } catch (RemoteFileSystemConnectorException e) {
                throw new ConnectionUnavailableException("Failed to connect to the remote file system server. ", e);
            }
        }
        if (this.fileUri != null) {
            HashMap hashMap = new HashMap();
            hashMap.put("action", "read");
            hashMap.put("maxLinesPerPoll", "10");
            hashMap.put("pollingInterval", this.filePollingInterval);
            if (this.actionAfterFailure != null) {
                hashMap.put("actionAfterFailure", this.actionAfterFailure);
            }
            if (this.moveAfterFailure != null) {
                hashMap.put("moveAfterFailure", this.moveAfterFailure);
            }
            if (!this.fileSourceConfiguration.isTailingEnabled()) {
                hashMap.put("uri", this.fileUri);
                hashMap.put(Constants.ACK_TIME_OUT, "1000");
                VFSClientConnector vFSClientConnector = new VFSClientConnector();
                vFSClientConnector.setMessageProcessor(new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration));
                VFSClientConnectorCallback vFSClientConnectorCallback = new VFSClientConnectorCallback();
                this.fileSourceConfiguration.getExecutorService().execute(() -> {
                    try {
                        vFSClientConnector.send(null, vFSClientConnectorCallback, hashMap);
                        vFSClientConnectorCallback.waitTillDone(this.timeout, this.fileUri);
                        if (this.actionAfterProcess != null) {
                            hashMap.put("uri", this.fileUri);
                            hashMap.put("action", this.actionAfterProcess);
                            if (this.moveAfterProcess != null) {
                                hashMap.put("destination", this.moveAfterProcess);
                            }
                            vFSClientConnector.send(null, vFSClientConnectorCallback, hashMap);
                            vFSClientConnectorCallback.waitTillDone(this.timeout, this.fileUri);
                        }
                    } catch (InterruptedException e2) {
                        log.error(String.format("Failed to get callback from vfs-client  for file '%s'.", this.fileUri), e2);
                    } catch (ClientConnectorException e3) {
                        log.error(String.format("Failure occurred in vfs-client while reading the file '%s'.", this.fileUri), e3);
                    }
                });
                return;
            }
            if (this.fileSourceConfiguration.getTailedFileURI() == null) {
                this.fileSourceConfiguration.setTailedFileURI(this.fileUri);
            }
            if (this.fileSourceConfiguration.getTailedFileURI().equalsIgnoreCase(this.fileUri)) {
                hashMap.put("startPosition", this.fileSourceConfiguration.getFilePointer());
                hashMap.put("path", this.fileUri);
                FileServerConnectorProvider fileServerConnectorProvider = this.fileSourceServiceProvider.getFileServerConnectorProvider();
                FileProcessor fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration);
                ServerConnector createConnector = fileServerConnectorProvider.createConnector("file-server-connector", hashMap);
                createConnector.setMessageProcessor(fileProcessor);
                this.fileSourceConfiguration.setFileServerConnector((FileServerConnector) createConnector);
                this.fileSourceConfiguration.getExecutorService().execute(() -> {
                    try {
                        createConnector.start();
                    } catch (ServerConnectorException e2) {
                        log.error(String.format("Failed to start the server for file '%s'. Hence starting to process next file.", this.fileUri));
                    }
                });
            }
        }
    }

    private void getPattern() {
        String beginRegex = this.fileSourceConfiguration.getBeginRegex();
        String endRegex = this.fileSourceConfiguration.getEndRegex();
        try {
            this.fileSourceConfiguration.setPattern((beginRegex == null || endRegex == null) ? beginRegex != null ? Pattern.compile(beginRegex + "((.|\n)*?)" + beginRegex) : endRegex != null ? Pattern.compile("((.|\n)*?)(" + endRegex + ")") : Pattern.compile("(\n$)") : Pattern.compile(beginRegex + "((.|\n)*?)" + endRegex));
        } catch (PatternSyntaxException e) {
            throw new SiddhiAppCreationException("Cannot compile the regex '" + beginRegex + "' and '" + endRegex + "'. Hence shutting down the siddhiApp. ");
        }
    }

    private void validateURL(String str, String str2) {
        try {
            new URL(str);
            this.fileSourceConfiguration.setProtocolForMoveAfterProcess(str.split(File.separatorChar == '\\' ? "\\\\" : File.separator)[0]);
        } catch (MalformedURLException e) {
            throw new SiddhiAppCreationException(String.format("Provided uri for '%s' parameter '%s' is invalid.", str2, str), e);
        }
    }
}
