/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.file;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.file.FileSourcePoller;
import io.siddhi.extension.io.file.listeners.FileCronExecutor;
import io.siddhi.extension.io.file.listeners.FileSystemListener;
import io.siddhi.extension.io.file.metrics.SourceMetrics;
import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
import io.siddhi.extension.io.file.util.FileSourceServiceProvider;
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
import io.siddhi.extension.util.Utils;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.commons.vfs2.FileObject;
import org.apache.log4j.Logger;
import org.quartz.CronExpression;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.wso2.carbon.messaging.ServerConnector;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.file.connector.sender.VFSClientConnector;
import org.wso2.transport.file.connector.server.FileServerConnector;
import org.wso2.transport.file.connector.server.FileServerConnectorProvider;
import org.wso2.transport.remotefilesystem.RemoteFileSystemConnectorFactory;
import org.wso2.transport.remotefilesystem.exception.RemoteFileSystemConnectorException;
import org.wso2.transport.remotefilesystem.server.connector.contract.RemoteFileSystemServerConnector;

@Extension(name="file", namespace="source", description="The File Source component of the 'siddhi-io-fie' extension allows you to receive the input data to be processed by Siddhi via files. Both text files and binary files are supported.", parameters={@Parameter(name="dir.uri", description="The path to the directory to be processed. During execution time, Siddhi by default processes all the files within this directory. However, if you have entered specific files to be processed via the 'file.name.list' parameter, only those files are processed. The URI specified must include the file handling protocol to be used for file processing.\ne.g., If the file handling protocol to be used is 'ftp', the URI must be provided as 'ftp://<DIRECTORY_PATH>>'.\nAt a given time, you should provide a value only for one out of the 'dir.uri' and 'file.uri' parameters. You can provide the directory URI if you have multiple files that you want to process within a directory. You can provide the file URI if you only need to process one file.", type={DataType.STRING}, optional=true, defaultValue="file:/var/tmp"), @Parameter(name="file.uri", description="The path to the file to be processed. The URI specified must include the file handling protocol to be used for file processing.\n  Only one of 'dir.uri' and 'file.uri' should be provided.\n e.g., If the file handling protocol to be used is 'ftp', the URI must be provided as 'ftp://<FILE_PATH>>'.\nAt a given time, you should provide a value only for one out of the 'dir.uri' and 'file.uri' parameters. You can provide the directory URI if you have multiple files that you want to process within a directory. You can provide the file URI if you only need to process one file.", type={DataType.STRING}, optional=true, defaultValue="file:/var/temp/tmp.text"), @Parameter(name="mode", description="This specifies the mode in which the files in given directory must be read.Possible values for this parameter are as follows:\n- TEXT.FULL : to read a text file completely at once.\n- BINARY.FULL : to read a binary file completely at once.\n- BINARY.CHUNKED : to read a binary file chunk by chunk.\n- LINE : to read a text file line by line.\n- REGEX : to read a text file and extract data using a regex.\n", type={DataType.STRING}, optional=true, defaultValue="line"), @Parameter(name="tailing", description="If this parameter is set to 'true', the file/the first file of the directory is tailed. \nDo not set the parameter to 'true' and enable tailing if the mode is 'binary.full', 'text.full' or 'binary.chunked'.\n", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="action.after.process", description="The action to be carried out after processing the file/directory. Possible values are 'DELETE' and 'MOVE'. 'DELETE' is default. If you specify 'MOVE', you need to specify a value for the 'move.after.process' parameter to indicate the location to which the consumed files should be moved.", type={DataType.STRING}, optional=true, defaultValue="delete"), @Parameter(name="action.after.failure", description="The action to be taken if a failure occurs while the file/directory is being processed. Possible values are 'DELETE' and 'MOVE'. 'DELETE' is default. If you specify 'MOVE', you need to specify a value for the 'move.after.failure' parameter to indicate the location to which the files that could not be read need to be moved", type={DataType.STRING}, optional=true, defaultValue="delete"), @Parameter(name="move.after.process", description="If you specify 'MOVE' as the value for the 'action.after.process' parameter, use this parameter to specify the location to which the consumed files need to be moved.This should be the absolute path of the file that is going to be created after the moving is done.\nThis URI must include the file handling protocol used for file processing.\ne.g., If the file handling protocol is 'ftp', the URI must be provided as 'ftp://<FILE_PATH>>'.", type={DataType.STRING}, optional=true, defaultValue="<Empty_String>"), @Parameter(name="move.after.failure", description="If you specify 'MOVE' as the value for the 'action.after.failure' parameter, use this parameter to specify the location to which the files should be moved after the failure\nThis should be the absolute path of the file that is going to be created after the failure.\nThis URI must include the file handling protocol used for file processing.\ne.g., If the file handling protocol is 'ftp', the URI must be provided as 'ftp://<FILE_PATH>>'.", type={DataType.STRING}, optional=true, defaultValue="<Empty_String>"), @Parameter(name="begin.regex", description="The regex to be matched at the beginning of the retrieved content.\n", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="end.regex", description="The regex to be matched at the end of the retrieved content.\n", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="file.polling.interval", description="The time interval (in milliseconds) of a polling cycle for a file.\n", type={DataType.STRING}, optional=true, defaultValue="1000"), @Parameter(name="dir.polling.interval", description="The time period (in milliseconds) of a polling cycle for a directory.\n", type={DataType.STRING}, optional=true, defaultValue="1000"), @Parameter(name="timeout", description="The maximum time duration (in milliseconds) that the system should wait until a file is processed.\n", type={DataType.STRING}, optional=true, defaultValue="5000"), @Parameter(name="file.read.wait.timeout", description="The maximum time duration (in milliseconds) that the system should wait before retrying to read the full file content.\n", type={DataType.STRING}, optional=true, defaultValue="1000"), @Parameter(name="header.present", description="If this parameter is set to 'true', it indicates the file(s) to be processed includes a header line. In such a scenario, the header line is not processed.\n", optional=true, defaultValue="false", type={DataType.BOOL}), @Parameter(name="read.only.header", description="This parameter is applicable only if the value for the 'mode' parameter is 'LINE'. If this parameter is set to 'true', only the first line (i.e., the header line) of a text file (e.g., CSV) is read. If it is set to 'false', the full content of the file is read line by line.", optional=true, type={DataType.BOOL}, defaultValue="false"), @Parameter(name="buffer.size", description="This parameter used to get the buffer size for binary.chunked mode.", optional=true, type={DataType.STRING}, defaultValue="65536"), @Parameter(name="cron.expression", description="This is used to specify a timestamp in cron expression. The file or files in the given dir.uri or file.uri will be processed when the given expression satisfied by the system time.", optional=true, type={DataType.STRING}, defaultValue="None"), @Parameter(name="file.name.pattern", description="Regex pattern for the filenames that should be read from the directory. Note: This parameter is applicable only if the connector is reading from a directory", optional=true, type={DataType.STRING}, defaultValue="<Empty_String>"), @Parameter(name="file.system.options", description="The file options in key:value pairs separated by commas. \neg:'USER_DIR_IS_ROOT:false,PASSIVE_MODE:true,AVOID_PERMISSION_CHECK:true,IDENTITY:file://demo/.ssh/id_rsa,IDENTITY_PASS_PHRASE:wso2carbon'\nNote: when IDENTITY is used, use a RSA PRIVATE KEY", type={DataType.STRING}, optional=true, defaultValue="<Empty_String>")}, examples={@Example(syntax="@source(type='file',\nmode='text.full',\ntailing='false'\n dir.uri='file://abc/xyz',\naction.after.process='delete',\n@map(type='json')) \ndefine stream FooStream (symbol string, price float, volume long); \n", description="In the above configuration, all the files in the given directory are picked and read one by one.\nHere, it is assumed that all the files contain valid json strings with 'symbol', 'price', and 'volume' keys.\nOnce a file is read, its content is converted to events via the 'siddhi-map-json' extension. Those events are then received as input events in the the 'FooStream' stream.\nFinally, after the reading is completed, the file is deleted.\n"), @Example(syntax="@source(type='file',\nmode='files.repo.line',\ntailing='true',\ndir.uri='file://abc/xyz',\n@map(type='json')) \ndefine stream FooStream (symbol string, price float, volume long);\n ", description="In the above configuration, the first file in '/abc/xyz' directory is picked and read line by line.\nHere, it is assumed that the file contains lines json strings.\nFor each line, the line content is converted to an event via the 'siddhi-map-json' extension. Those events are then received as input events in the the 'FooStream' stream.\nOnce the file content is completely read, the system keeps checking for new entries added to the file. If it detects a new entry, it immediately picks it up and processes it.\n"), @Example(syntax="@source(type='file',\nmode='text.full',\ntailing='false'\n dir.uri='file://abc/xyz',\naction.after.process='delete',\n@map(type='csv' @attributes(eof = 'trp:eof', fp = 'trp:file.path'))) \ndefine stream FooStream (symbol string, price float, volume long); \n", description="In the above configuration, all the files in the given directory are picked and read one by one.\nHere, it is assumed that each file contains valid json strings with 'symbol', and 'price' keys.\nOnce a file is read, its content is converted to an event via the 'siddhi-map-json' extension with the additional 'eof' attribute. Then, that event is received as an input event in the 'FooStream' stream.\nOnce a file is completely read, it is deleted.\n")})
public class FileSource
extends Source<FileSourceState> {
    private static final Logger log = Logger.getLogger(FileSource.class);
    private SourceEventListener sourceEventListener;
    private FileSourceConfiguration fileSourceConfiguration;
    private RemoteFileSystemConnectorFactory fileSystemConnectorFactory;
    private FileSourceServiceProvider fileSourceServiceProvider;
    private String filePointer = "0";
    private String[] requiredProperties;
    private boolean isTailingEnabled = true;
    private SiddhiAppContext siddhiAppContext;
    private String mode;
    private String actionAfterProcess;
    private String actionAfterFailure = null;
    private String moveAfterProcess;
    private String moveAfterFailure = null;
    private String tailing;
    private String beginRegex;
    private String endRegex;
    private List<String> tailedFileURIMap;
    private String uri;
    private String dirUri;
    private String fileUri;
    private String dirPollingInterval;
    private String filePollingInterval;
    private String fileReadWaitTimeout;
    private long timeout = 5000L;
    private boolean fileServerConnectorStarted = false;
    private ScheduledFuture scheduledFuture;
    private FileSourcePoller fileSourcePoller;
    private Source.ConnectionCallback connectionCallback;
    private String headerPresent;
    private String readOnlyHeader;
    private String bufferSizeInBinaryChunked;
    private SourceMetrics metrics;
    private String cronExpression;
    private String fileNamePattern;
    private String fileSystemOptions;

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public StateFactory<FileSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requiredProperties, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        FileObject listeningFileObject;
        this.sourceEventListener = sourceEventListener;
        this.siddhiAppContext = siddhiAppContext;
        this.requiredProperties = (String[])requiredProperties.clone();
        this.fileSourceConfiguration = new FileSourceConfiguration();
        this.fileSourceServiceProvider = FileSourceServiceProvider.getInstance();
        this.fileSystemConnectorFactory = this.fileSourceServiceProvider.getFileSystemConnectorFactory();
        this.fileSystemOptions = optionHolder.validateAndGetStaticValue("file.system.options", null);
        if (optionHolder.isOptionExists("dir.uri")) {
            this.dirUri = optionHolder.validateAndGetStaticValue("dir.uri");
            this.validateURL(this.dirUri, "dir.uri");
            listeningFileObject = Utils.getFileObject(this.dirUri, this.fileSystemOptions);
            this.uri = listeningFileObject.getName().getPath();
        }
        if (optionHolder.isOptionExists("file.uri")) {
            this.fileUri = optionHolder.validateAndGetStaticValue("file.uri");
            this.validateURL(this.fileUri, "file.uri");
            listeningFileObject = Utils.getFileObject(this.fileUri, this.fileSystemOptions);
            this.uri = listeningFileObject.getName().getPath();
        }
        if (this.dirUri != null && this.fileUri != null) {
            throw new SiddhiAppCreationException("Only one of directory uri or file uri should be provided. But both have been provided.");
        }
        if (this.dirUri == null && this.fileUri == null) {
            throw new SiddhiAppCreationException("Either directory uri or file uri must be provided. But none of themfound.");
        }
        this.mode = optionHolder.validateAndGetStaticValue("mode", "line");
        List annotations = this.getMapper().getStreamDefinition().getAnnotations();
        annotations.forEach(annotation -> {
            if (annotation.getName().equalsIgnoreCase("source")) {
                List sourceElements = annotation.getElements();
                sourceElements.forEach(element -> {
                    String sourceType;
                    if (element.getKey().equalsIgnoreCase("type") && (sourceType = element.getValue()).equalsIgnoreCase("file")) {
                        List sourceAnnotations = annotation.getAnnotations();
                        sourceAnnotations.forEach(sourceAnnotation -> {
                            if (sourceAnnotation.getName().equalsIgnoreCase("map")) {
                                List mapElements = sourceAnnotation.getElements();
                                mapElements.forEach(mapElement -> {
                                    String mapType;
                                    if (mapElement.getKey().equalsIgnoreCase("type") && (mapType = mapElement.getValue()).equalsIgnoreCase("binary") && this.mode.equalsIgnoreCase("line")) {
                                        throw new SiddhiAppCreationException("'Binary' file mapping cannot be used with file mode 'binary.full'");
                                    }
                                });
                            }
                        });
                    }
                });
            }
        });
        this.tailing = "text.full".equalsIgnoreCase(this.mode) || "binary.full".equalsIgnoreCase(this.mode) || "binary.chunked".equalsIgnoreCase(this.mode) ? optionHolder.validateAndGetStaticValue("tailing", "false") : optionHolder.validateAndGetStaticValue("tailing", "true");
        this.isTailingEnabled = Boolean.parseBoolean(this.tailing);
        this.readOnlyHeader = optionHolder.validateAndGetStaticValue("read.only.header", "false");
        if (this.isTailingEnabled) {
            this.actionAfterProcess = optionHolder.validateAndGetStaticValue("action.after.process", "none");
            if (Boolean.parseBoolean(this.readOnlyHeader)) {
                throw new SiddhiAppCreationException("Either 'tailing' or 'read.only.header' should be true. But both of them set to 'true'.");
            }
        } else {
            this.actionAfterProcess = optionHolder.validateAndGetStaticValue("action.after.process", "keep");
        }
        this.actionAfterFailure = optionHolder.validateAndGetStaticValue("action.after.failure", "delete");
        if (optionHolder.isOptionExists("move.after.process")) {
            this.moveAfterProcess = optionHolder.validateAndGetStaticValue("move.after.process");
            this.validateURL(this.moveAfterProcess, "moveAfterProcess");
        }
        if (optionHolder.isOptionExists("move.after.failure")) {
            this.moveAfterFailure = optionHolder.validateAndGetStaticValue("move.after.failure");
            this.validateURL(this.moveAfterFailure, "moveAfterFailure");
        }
        this.dirPollingInterval = optionHolder.validateAndGetStaticValue("dir.polling.interval", "1000");
        this.filePollingInterval = optionHolder.validateAndGetStaticValue("file.polling.interval", "1000");
        String timeoutValue = optionHolder.validateAndGetStaticValue("timeout", "5000");
        try {
            this.timeout = Long.parseLong(timeoutValue);
        }
        catch (NumberFormatException e) {
            throw new SiddhiAppRuntimeException("Value provided for timeout, " + timeoutValue + " is invalid.", (Throwable)e);
        }
        this.beginRegex = optionHolder.validateAndGetStaticValue("begin.regex", null);
        this.endRegex = optionHolder.validateAndGetStaticValue("end.regex", null);
        this.fileReadWaitTimeout = optionHolder.validateAndGetStaticValue("file.read.wait.timeout", "1000");
        this.headerPresent = optionHolder.validateAndGetStaticValue("header.present", "false");
        this.bufferSizeInBinaryChunked = optionHolder.validateAndGetStaticValue("buffer.size", "65536");
        this.fileNamePattern = optionHolder.validateAndGetStaticValue("file.name.pattern", null);
        if (optionHolder.isOptionExists("cron.expression")) {
            this.cronExpression = optionHolder.validateAndGetStaticValue("cron.expression", null);
            if (!CronExpression.isValidExpression(this.cronExpression)) {
                throw new SiddhiAppCreationException("Cron Expression " + this.cronExpression + " is not valid.");
            }
        } else {
            this.cronExpression = null;
        }
        this.validateParameters();
        this.createInitialSourceConf();
        this.updateSourceConf();
        this.getPattern();
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SourceMetrics(siddhiAppContext.getName(), Utils.capitalizeFirstLetter(this.mode), sourceEventListener.getStreamDefinition().getId());
                }
            }
            catch (IllegalArgumentException e) {
                log.debug((Object)"Prometheus reporter is not running. Hence file metrics will not be initialized.");
            }
        }
        return () -> new FileSourceState();
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, FileSourceState fileSourceState) throws ConnectionUnavailableException {
        this.connectionCallback = connectionCallback;
        if (this.metrics != null) {
            this.metrics.updateMetrics(this.siddhiAppContext.getExecutorService());
        }
        this.updateSourceConf();
        this.deployServers();
    }

    public void disconnect() {
        try {
            Scheduler scheduler;
            ExecutorService executorService;
            if (this.isTailingEnabled && this.fileSourceConfiguration.getFileServerConnector() != null) {
                this.fileSourceConfiguration.getFileServerConnector().stop();
                this.fileSourceConfiguration.setFileServerConnector(null);
            }
            if ((executorService = this.fileSourceConfiguration.getExecutorService()) != null && !executorService.isShutdown()) {
                executorService.shutdown();
            }
            if ((scheduler = this.fileSourceConfiguration.getScheduler()) != null) {
                scheduler.deleteJob(new JobKey("JobName", "JobGroup"));
            }
        }
        catch (ServerConnectorException e) {
            throw new SiddhiAppRuntimeException("Failed to stop the file server when shutting down the siddhi app '" + this.siddhiAppContext.getName() + "' due to " + e.getMessage(), (Throwable)e);
        }
        catch (SchedulerException e) {
            throw new SiddhiAppRuntimeException("Failed to delete the cron job of the siddhi app '" + this.siddhiAppContext.getName() + "' due to " + e.getMessage(), (Throwable)e);
        }
    }

    public void destroy() {
    }

    public void pause() {
        try {
            if (this.isTailingEnabled && this.fileSourceConfiguration.getFileServerConnector() != null) {
                this.fileSourceConfiguration.getFileServerConnector().stop();
                this.fileServerConnectorStarted = false;
            }
            if (this.dirUri != null && this.scheduledFuture != null) {
                this.scheduledFuture.cancel(true);
            }
        }
        catch (ServerConnectorException e) {
            throw new SiddhiAppRuntimeException("Failed to stop the file server when pausing the siddhi app '" + this.siddhiAppContext.getName() + "'.", (Throwable)e);
        }
    }

    public void resume() {
        if (this.dirUri != null && this.scheduledFuture != null) {
            this.scheduledFuture = this.siddhiAppContext.getScheduledExecutorService().scheduleAtFixedRate(this.fileSourcePoller, 0L, 1L, TimeUnit.SECONDS);
        }
        if (this.isTailingEnabled && this.fileSourceConfiguration.getFileServerConnector() != null) {
            FileServerConnector fileServerConnector = this.fileSourceConfiguration.getFileServerConnector();
            Runnable runnableServer = () -> {
                try {
                    fileServerConnector.start();
                }
                catch (ServerConnectorException e) {
                    log.error((Object)String.format("For the siddhi app '" + this.siddhiAppContext.getName() + ",' failed to resume the server for file '%s'.Hence starting to process next file.", this.fileUri));
                }
            };
            this.fileSourceConfiguration.getExecutorService().execute(runnableServer);
            this.fileServerConnectorStarted = true;
        }
    }

    private void createInitialSourceConf() {
        this.fileSourceConfiguration.setUri(this.uri);
        this.fileSourceConfiguration.setBeginRegex(this.beginRegex);
        this.fileSourceConfiguration.setEndRegex(this.endRegex);
        this.fileSourceConfiguration.setMode(this.mode);
        this.fileSourceConfiguration.setTailingEnabled(Boolean.parseBoolean(this.tailing));
        this.fileSourceConfiguration.setFilePollingInterval(this.filePollingInterval);
        this.fileSourceConfiguration.setRequiredProperties(this.requiredProperties);
        this.fileSourceConfiguration.setActionAfterProcess(this.actionAfterProcess);
        this.fileSourceConfiguration.setActionAfterFailure(this.actionAfterFailure);
        this.fileSourceConfiguration.setMoveAfterProcess(this.moveAfterProcess);
        this.fileSourceConfiguration.setTimeout(this.timeout);
        this.fileSourceConfiguration.setFileReadWaitTimeout(this.fileReadWaitTimeout);
        this.fileSourceConfiguration.setHeaderPresent(this.headerPresent);
        this.fileSourceConfiguration.setReadOnlyHeader(this.readOnlyHeader);
        this.fileSourceConfiguration.setBufferSize(this.bufferSizeInBinaryChunked);
        this.fileSourceConfiguration.setCronExpression(this.cronExpression);
    }

    private void updateSourceConf() {
        this.fileSourceConfiguration.setFilePointer(this.filePointer);
        this.fileSourceConfiguration.setTailedFileURIMap(this.tailedFileURIMap);
    }

    private Map<String, String> getFileSystemServerProperties() {
        Map<String, String> map = Utils.getFileSystemOptionMap(this.dirUri, this.fileSystemOptions);
        map.put("uri", this.dirUri);
        map.put("mode", this.mode);
        if (this.actionAfterProcess != null) {
            map.put("actionAfterProcess", this.actionAfterProcess.toUpperCase(Locale.ENGLISH));
        }
        map.put("moveAfterProcess".toUpperCase(Locale.ENGLISH), this.moveAfterProcess);
        map.put("pollingInterval", this.dirPollingInterval);
        map.put("fileSortAttribute", "name");
        map.put("fileSortAscending", "true".toUpperCase(Locale.ENGLISH));
        map.put("createMoveDir", "true".toUpperCase(Locale.ENGLISH));
        map.put("ackTimeOut", "5000");
        map.put("fileReadWaitTimeout", this.fileReadWaitTimeout);
        map.put("buffer.size", this.bufferSizeInBinaryChunked);
        map.put("cron.expression", this.cronExpression);
        map.put("fileNamePattern", this.fileNamePattern);
        if ("binary.full".equalsIgnoreCase(this.mode) || "text.full".equalsIgnoreCase(this.mode) || "binary.chunked".equalsIgnoreCase(this.mode)) {
            map.put("readFromBeginning", "true".toUpperCase(Locale.ENGLISH));
        } else {
            map.put("readFromBeginning", "false".toUpperCase(Locale.ENGLISH));
        }
        if (this.actionAfterFailure != null) {
            map.put("actionAfterFailure", this.actionAfterFailure.toUpperCase(Locale.ENGLISH));
        }
        if (this.moveAfterFailure != null) {
            map.put("moveAfterFailure", this.moveAfterFailure.toUpperCase(Locale.ENGLISH));
        }
        return map;
    }

    private void validateParameters() {
        if ("text.full".equalsIgnoreCase(this.mode) || "binary.full".equalsIgnoreCase(this.mode) || "binary.chunked".equalsIgnoreCase(this.mode)) {
            if (this.isTailingEnabled) {
                throw new SiddhiAppCreationException("In 'file' source of the siddhi app '" + this.siddhiAppContext.getName() + "', tailing has been enabled by user or by default. But tailing can't be enabled in '" + this.mode + "' mode.");
            }
            if (("binary.full".equalsIgnoreCase(this.mode) || "binary.chunked".equalsIgnoreCase(this.mode)) && this.beginRegex != null && this.endRegex != null) {
                throw new SiddhiAppCreationException("'begin.regex' and 'end.regex' can be only provided if the mode is 'regex'. But in 'file' source of the siddhi app '" + this.siddhiAppContext.getName() + "', provided mode is '" + this.mode + "'.");
            }
        }
        if (this.isTailingEnabled && this.cronExpression != null) {
            throw new SiddhiAppCreationException("Tailing has been enabled by user or by default. 'cron.expression' cannot be used when tailing is enabled. Hence stopping the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        if (this.isTailingEnabled && this.moveAfterProcess != null) {
            throw new SiddhiAppCreationException("Tailing has been enabled by user or by default.'moveAfterProcess' cannot be used when tailing is enabled. Hence stopping the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        if ("delete".equalsIgnoreCase(this.actionAfterProcess) && this.moveAfterProcess != null) {
            throw new SiddhiAppCreationException("'moveAfterProcess' can only be used when 'action.after.process' is 'move'. But it has been used when 'action.after.process' is 'delete'.Hence stopping the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        if (!"move".equalsIgnoreCase(this.actionAfterProcess) && this.cronExpression != null) {
            throw new SiddhiAppCreationException("'cronExpression' can only be used when 'action.after.process' is 'move'. Hence stopping the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        if (this.cronExpression != null && this.moveAfterProcess == null) {
            throw new SiddhiAppCreationException("'move.after.process' has not been provided where it is mandatory when 'cron.expression' is given. Hence stopping the siddhi app " + this.siddhiAppContext.getName() + ".");
        }
        if ("move".equalsIgnoreCase(this.actionAfterProcess) && this.moveAfterProcess == null) {
            throw new SiddhiAppCreationException("'moveAfterProcess' has not been provided where it is mandatory when 'actionAfterProcess' is 'move'. Hence stopping the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        if ("regex".equalsIgnoreCase(this.mode) && this.beginRegex == null && this.endRegex == null) {
            this.mode = "line";
        }
    }

    private void deployServers() throws ConnectionUnavailableException {
        ExecutorService executorService = this.siddhiAppContext.getExecutorService();
        this.createInitialSourceConf();
        this.fileSourceConfiguration.setExecutorService(executorService);
        if (this.fileSourceConfiguration.getCronExpression() != null) {
            FileCronExecutor.scheduleJob(this.fileSourceConfiguration, this.sourceEventListener, this.siddhiAppContext);
        } else {
            if (this.dirUri != null) {
                Map<String, String> properties = this.getFileSystemServerProperties();
                Map<String, Object> schemeFileOptions = Utils.getFileSystemOptionObjectMap(this.dirUri, this.fileSystemOptions);
                FileSystemListener fileSystemListener = new FileSystemListener(this.sourceEventListener, this.fileSourceConfiguration, this.metrics, schemeFileOptions);
                try {
                    RemoteFileSystemServerConnector fileSystemServerConnector = this.fileSystemConnectorFactory.createServerConnector(this.siddhiAppContext.getName(), properties, fileSystemListener);
                    this.fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector);
                    FileSourcePoller.CompletionCallback fileSourceCompletionCallback = error -> {
                        if (!error.getClass().equals(RemoteFileSystemConnectorException.class)) {
                            throw new SiddhiAppRuntimeException("File Polling mode run failed.", error);
                        }
                        this.connectionCallback.onError(new ConnectionUnavailableException("Connection to the file directory is lost.", error));
                    };
                    this.fileSourcePoller = new FileSourcePoller(fileSystemServerConnector, this.siddhiAppContext.getName());
                    this.fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback);
                    this.scheduledFuture = this.siddhiAppContext.getScheduledExecutorService().scheduleAtFixedRate(this.fileSourcePoller, 0L, 1L, TimeUnit.SECONDS);
                }
                catch (RemoteFileSystemConnectorException e) {
                    throw new ConnectionUnavailableException("Connection to the file directory is lost.", (Throwable)e);
                }
            }
            if (this.fileUri != null && !this.fileServerConnectorStarted) {
                HashMap<String, String> properties = new HashMap<String, String>();
                properties.put("action", "read");
                properties.put("maxLinesPerPoll", "10");
                properties.put("pollingInterval", this.filePollingInterval);
                properties.put("header.present", this.headerPresent);
                properties.put("read.only.header", this.readOnlyHeader);
                if (this.actionAfterFailure != null) {
                    properties.put("actionAfterFailure", this.actionAfterFailure);
                }
                if (this.moveAfterFailure != null) {
                    properties.put("moveAfterFailure", this.moveAfterFailure);
                }
                if (this.metrics != null) {
                    this.fileSourceConfiguration.setCurrentlyReadingFileURI(this.fileUri);
                    this.metrics.setFilePath(this.fileUri);
                    this.metrics.getSourceFileStatusMap().putIfAbsent(Utils.getShortFilePath(this.fileUri), StreamStatus.CONNECTING);
                    this.metrics.getStartedTimeMetric(System.currentTimeMillis());
                }
                if (this.fileSourceConfiguration.isTailingEnabled()) {
                    if (this.fileSourceConfiguration.getTailedFileURIMap() == null) {
                        this.fileSourceConfiguration.setTailedFileURI(this.fileUri);
                    }
                    if (this.fileSourceConfiguration.getTailedFileURIMap().get(0).toString().equalsIgnoreCase(this.fileUri)) {
                        properties.put("startPosition", this.fileSourceConfiguration.getFilePointer());
                        properties.put("path", this.fileUri);
                        FileServerConnectorProvider fileServerConnectorProvider = this.fileSourceServiceProvider.getFileServerConnectorProvider();
                        FileProcessor fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                        ServerConnector fileServerConnector = fileServerConnectorProvider.createConnector("file-server-connector", properties);
                        fileServerConnector.setMessageProcessor(fileProcessor);
                        this.fileSourceConfiguration.setFileServerConnector((FileServerConnector)fileServerConnector);
                        Runnable runnableServer = () -> {
                            try {
                                fileServerConnector.start();
                            }
                            catch (ServerConnectorException e) {
                                log.error((Object)String.format("For the siddhi app '" + this.siddhiAppContext.getName() + ",' failed to start the server for file '%s'.Hence starting to process next file.", this.fileUri));
                            }
                        };
                        this.fileSourceConfiguration.getExecutorService().execute(runnableServer);
                        this.fileServerConnectorStarted = true;
                        if (this.metrics != null) {
                            this.metrics.getTailEnabledFilesMap().putIfAbsent(Utils.getShortFilePath(this.fileUri), System.currentTimeMillis());
                        }
                    }
                } else {
                    properties.put("uri", this.fileUri);
                    properties.put("ackTimeOut", "1000");
                    properties.put("mode", this.fileSourceConfiguration.getMode());
                    properties.put("header.present", this.headerPresent);
                    properties.put("read.only.header", this.readOnlyHeader);
                    properties.put("buffer.size", this.bufferSizeInBinaryChunked);
                    VFSClientConnector vfsClientConnector = new VFSClientConnector();
                    Map<String, Object> schemeFileOptions = Utils.getFileSystemOptionObjectMap(this.fileUri, this.fileSystemOptions);
                    try {
                        vfsClientConnector.init(null, null, schemeFileOptions);
                        FileProcessor fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                        vfsClientConnector.setMessageProcessor(fileProcessor);
                        VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback();
                        Runnable runnableClient = () -> {
                            try {
                                vfsClientConnector.send(null, vfsClientConnectorCallback, properties);
                                vfsClientConnectorCallback.waitTillDone(this.timeout, this.fileUri);
                                if (this.actionAfterProcess != null) {
                                    properties.put("uri", this.fileUri);
                                    properties.put("action", this.actionAfterProcess);
                                    if (this.moveAfterProcess != null) {
                                        properties.put("destination", this.moveAfterProcess);
                                    }
                                    vfsClientConnector.send(null, vfsClientConnectorCallback, properties);
                                    vfsClientConnectorCallback.waitTillDone(this.timeout, this.fileUri);
                                    if (this.metrics != null) {
                                        this.metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(this.fileUri), StreamStatus.COMPLETED);
                                        if (this.actionAfterProcess.equals("delete")) {
                                            this.metrics.getFileDeleteMetrics().setSource(Utils.getShortFilePath(this.fileUri));
                                            this.metrics.getFileDeleteMetrics().setTime(System.currentTimeMillis());
                                            this.metrics.getFileDeleteMetrics().getDeleteMetric(1);
                                        } else if (this.actionAfterProcess.equals("move")) {
                                            this.metrics.getFileMoveMetrics().setTime(System.currentTimeMillis());
                                            this.metrics.getFileMoveMetrics().set_source(Utils.getShortFilePath(this.fileUri));
                                            this.metrics.getFileMoveMetrics().setDestination(Utils.getShortFilePath(this.moveAfterProcess));
                                            this.metrics.getFileMoveMetrics().getMoveMetric(1);
                                        }
                                        this.metrics.setReadPercentage(100.0);
                                        this.metrics.getCompletedTimeMetric(System.currentTimeMillis());
                                    }
                                }
                            }
                            catch (ClientConnectorException e) {
                                log.error((Object)String.format("Failure occurred in vfs-client while reading the file '%s' through siddhi app '%s'.", this.fileUri, this.siddhiAppContext.getName()), (Throwable)e);
                            }
                            catch (InterruptedException e) {
                                log.error((Object)String.format("Failed to get callback from vfs-client  for file '%s' through siddhi app '%s'.", this.fileUri, this.siddhiAppContext.getName()), (Throwable)e);
                            }
                        };
                        this.fileSourceConfiguration.getExecutorService().execute(runnableClient);
                    }
                    catch (ClientConnectorException e) {
                        log.error((Object)String.format("Failure occurred when initializing vfs-client for the file '%s' through siddhi app '%s'.", this.fileUri, this.siddhiAppContext.getName()), (Throwable)e);
                    }
                }
                if (this.metrics != null) {
                    this.metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(this.fileUri), StreamStatus.PROCESSING);
                }
            }
        }
    }

    private void getPattern() {
        Pattern pattern;
        String beginRegex = this.fileSourceConfiguration.getBeginRegex();
        String endRegex = this.fileSourceConfiguration.getEndRegex();
        try {
            pattern = beginRegex != null && endRegex != null ? Pattern.compile(beginRegex + "((.|\n)*?)" + endRegex) : (beginRegex != null ? Pattern.compile(beginRegex + "((.|\n)*?)" + beginRegex) : (endRegex != null ? Pattern.compile("((.|\n)*?)(" + endRegex + ")") : Pattern.compile("(\n$)")));
        }
        catch (PatternSyntaxException e) {
            throw new SiddhiAppCreationException("Cannot compile the regex '" + beginRegex + "' and '" + endRegex + "'. Hence shutting down the siddhi app '" + this.siddhiAppContext.getName() + "'.");
        }
        this.fileSourceConfiguration.setPattern(pattern);
    }

    private void validateURL(String uri, String parameterName) {
        try {
            if (uri.startsWith("sftp")) {
                uri = uri.replaceFirst("s", "");
            }
            new URL(uri);
            String splitRegex = File.separatorChar == '\\' ? "\\\\" : File.separator;
            this.fileSourceConfiguration.setProtocolForMoveAfterProcess(uri.split(splitRegex)[0]);
        }
        catch (MalformedURLException e) {
            throw new SiddhiAppCreationException(String.format("In 'file' source of siddhi app '" + this.siddhiAppContext.getName() + "', provided uri for '%s' parameter '%s' is invalid.", parameterName, uri), (Throwable)e);
        }
    }

    class FileSourceState
    extends State {
        private final Map<String, Object> state = new HashMap<String, Object>();

        private FileSourceState() {
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            FileSource.this.filePointer = FileSource.this.fileSourceConfiguration.getFilePointer();
            this.state.put("filePointer", FileSource.this.fileSourceConfiguration.getFilePointer());
            this.state.put("tailedFile", FileSource.this.fileSourceConfiguration.getTailedFileURIMap());
            this.state.put("regexStringBuilder", FileSource.this.fileSourceConfiguration.getTailingRegexStringBuilder());
            this.state.put("processedFileList", FileSource.this.fileSourceConfiguration.getProcessedFileList());
            return this.state;
        }

        public void restore(Map<String, Object> map) {
            FileSource.this.filePointer = map.get("filePointer").toString();
            FileSource.this.tailedFileURIMap = (List)map.get("tailedFile");
            FileSource.this.fileSourceConfiguration.setFilePointer(FileSource.this.filePointer);
            FileSource.this.fileSourceConfiguration.setTailedFileURIMap(FileSource.this.tailedFileURIMap);
            FileSource.this.fileSourceConfiguration.updateTailingRegexStringBuilder((StringBuilder)map.get("regexStringBuilder"));
            FileSource.this.fileSourceConfiguration.setProcessedFileList((List)map.get("processedFileList"));
        }
    }
}

