/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.file.listeners;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.file.metrics.SourceMetrics;
import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
import io.siddhi.extension.io.file.util.FileSourceServiceProvider;
import io.siddhi.extension.io.file.util.Util;
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
import io.siddhi.extension.util.Utils;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.CarbonCallback;
import org.wso2.carbon.messaging.CarbonMessage;
import org.wso2.carbon.messaging.ServerConnector;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
import org.wso2.transport.file.connector.sender.VFSClientConnector;
import org.wso2.transport.file.connector.server.FileServerConnector;
import org.wso2.transport.file.connector.server.FileServerConnectorProvider;
import org.wso2.transport.remotefilesystem.listener.RemoteFileSystemListener;
import org.wso2.transport.remotefilesystem.message.RemoteFileSystemBaseMessage;
import org.wso2.transport.remotefilesystem.message.RemoteFileSystemEvent;

public class FileSystemListener
implements RemoteFileSystemListener {
    private static final Logger log = Logger.getLogger(FileSystemListener.class);
    private SourceEventListener sourceEventListener;
    private FileSourceConfiguration fileSourceConfiguration;
    private FileSourceServiceProvider fileSourceServiceProvider;
    private SourceMetrics metrics;
    private Map<String, Object> schemeFileOptions;

    public FileSystemListener(SourceEventListener sourceEventListener, FileSourceConfiguration fileSourceConfiguration, SourceMetrics sourceMetrics, Map<String, Object> schemeFileOptions) {
        this.sourceEventListener = sourceEventListener;
        this.fileSourceConfiguration = fileSourceConfiguration;
        this.fileSourceServiceProvider = FileSourceServiceProvider.getInstance();
        this.metrics = sourceMetrics;
        this.schemeFileOptions = schemeFileOptions;
    }

    @Override
    public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) {
        if (remoteFileSystemBaseEvent instanceof RemoteFileSystemEvent) {
            String mode = this.fileSourceConfiguration.getMode();
            String actionAfterProcess = this.fileSourceConfiguration.getActionAfterProcess();
            RemoteFileSystemEvent remoteFileSystemEvent = (RemoteFileSystemEvent)remoteFileSystemBaseEvent;
            for (int i = 0; i < remoteFileSystemEvent.getAddedFiles().size(); ++i) {
                BinaryCarbonMessage carbonMessage;
                VFSClientConnectorCallback carbonCallback;
                Map<String, String> properties;
                FileProcessor fileProcessor;
                VFSClientConnector vfsClientConnector;
                String fileURI = remoteFileSystemEvent.getAddedFiles().get(i).getPath();
                if (!this.fileSourceConfiguration.addFileToListIfAbsent(fileURI)) continue;
                this.fileSourceConfiguration.setCurrentlyReadingFileURI(fileURI);
                String shortenFilePath = Utils.getShortFilePath(fileURI);
                if (this.metrics != null) {
                    this.metrics.getSourceFileStatusMap().putIfAbsent(shortenFilePath, StreamStatus.PROCESSING);
                    this.metrics.setFilePath(fileURI);
                }
                if ("text.full".equalsIgnoreCase(mode)) {
                    vfsClientConnector = new VFSClientConnector();
                    fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                    vfsClientConnector.setMessageProcessor(fileProcessor);
                    properties = Util.generateProperties(this.fileSourceConfiguration, fileURI);
                    carbonCallback = new VFSClientConnectorCallback();
                    carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
                    try {
                        vfsClientConnector.init(null, null, this.schemeFileOptions);
                        vfsClientConnector.send(carbonMessage, carbonCallback, properties);
                        try {
                            carbonCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), fileURI);
                        }
                        catch (InterruptedException e) {
                            log.error((Object)String.format("Failed to wait until file '%s' is processed.", fileURI), (Throwable)e);
                            if (this.metrics != null) {
                                this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                                this.metrics.getTotalErrorCount().inc();
                            }
                            return false;
                        }
                        if (actionAfterProcess.equalsIgnoreCase("keep")) continue;
                        this.reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI);
                    }
                    catch (ClientConnectorException e) {
                        log.error((Object)String.format("Failed to provide file '%s' for consuming.", fileURI), (Throwable)e);
                        carbonCallback.done(carbonMessage);
                        if (this.metrics == null) continue;
                        this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                        this.metrics.getTotalErrorCount().inc();
                    }
                    continue;
                }
                if ("binary.chunked".equalsIgnoreCase(mode) || "binary.full".equalsIgnoreCase(mode)) {
                    vfsClientConnector = new VFSClientConnector();
                    fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                    vfsClientConnector.setMessageProcessor(fileProcessor);
                    properties = Util.generateProperties(this.fileSourceConfiguration, fileURI);
                    carbonCallback = new VFSClientConnectorCallback();
                    carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
                    try {
                        vfsClientConnector.init(null, null, this.schemeFileOptions);
                        vfsClientConnector.send(carbonMessage, carbonCallback, properties);
                        try {
                            carbonCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), fileURI);
                        }
                        catch (InterruptedException e) {
                            log.error((Object)String.format("Failed to get callback from vfs-client  for file '%s'.", fileURI), (Throwable)e);
                            if (this.metrics != null) {
                                this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                                this.metrics.getTotalErrorCount().inc();
                            }
                            return false;
                        }
                        if (actionAfterProcess.equalsIgnoreCase("keep")) continue;
                        this.reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI);
                    }
                    catch (ClientConnectorException e) {
                        log.error((Object)String.format("Failed to provide file '%s' for consuming.", fileURI), (Throwable)e);
                        if (this.metrics == null) continue;
                        this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                        this.metrics.getTotalErrorCount().inc();
                    }
                    continue;
                }
                if (!"line".equalsIgnoreCase(mode) && !"regex".equalsIgnoreCase(mode)) continue;
                properties = Util.generateProperties(this.fileSourceConfiguration, fileURI);
                if (this.fileSourceConfiguration.isTailingEnabled()) {
                    this.fileSourceConfiguration.setTailedFileURI(fileURI);
                    if (this.metrics != null) {
                        this.metrics.getTailEnabledFilesMap().putIfAbsent(shortenFilePath, System.currentTimeMillis());
                    }
                    if (!this.fileSourceConfiguration.getTailedFileURIMap().contains(fileURI)) continue;
                    properties.put("startPosition", this.fileSourceConfiguration.getFilePointer());
                    properties.put("path", fileURI);
                    FileServerConnectorProvider fileServerConnectorProvider = this.fileSourceServiceProvider.getFileServerConnectorProvider();
                    fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                    ServerConnector fileServerConnector = fileServerConnectorProvider.createConnector("file-server-connector", properties);
                    fileServerConnector.setMessageProcessor(fileProcessor);
                    this.fileSourceConfiguration.setFileServerConnector((FileServerConnector)fileServerConnector);
                    VFSClientConnectorCallback carbonCallback2 = new VFSClientConnectorCallback();
                    BinaryCarbonMessage carbonMessage2 = new BinaryCarbonMessage(ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
                    FileServerExecutor fileServerExecutor = new FileServerExecutor(carbonMessage2, carbonCallback2, fileServerConnector, fileURI, this.metrics);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("fileServerExecutor started with file tailing for file: " + fileURI));
                    }
                    this.fileSourceConfiguration.getExecutorService().execute(fileServerExecutor);
                    continue;
                }
                vfsClientConnector = new VFSClientConnector();
                fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration, this.metrics);
                vfsClientConnector.setMessageProcessor(fileProcessor);
                carbonCallback = new VFSClientConnectorCallback();
                carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
                try {
                    vfsClientConnector.init(null, null, this.schemeFileOptions);
                    vfsClientConnector.send(carbonMessage, carbonCallback, properties);
                    try {
                        carbonCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), fileURI);
                    }
                    catch (InterruptedException e) {
                        log.error((Object)String.format("Failed to get callback from vfs-client  for file '%s'.", fileURI), (Throwable)e);
                        if (this.metrics != null) {
                            this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                            this.metrics.getTotalErrorCount().inc();
                        }
                        return false;
                    }
                    if (actionAfterProcess.equalsIgnoreCase("keep")) continue;
                    this.reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI);
                    continue;
                }
                catch (ClientConnectorException e) {
                    log.error((Object)String.format("Failed to provide file '%s' for consuming.", fileURI), (Throwable)e);
                    if (this.metrics == null) continue;
                    this.metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
                    this.metrics.getTotalErrorCount().inc();
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void done() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback vfsClientConnectorCallback, Map<String, String> properties, String fileUri) {
        Map<String, String> reGeneratedProperties = Util.reProcessFileGenerateProperties(this.fileSourceConfiguration, fileUri, properties);
        BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileUri.getBytes(StandardCharsets.UTF_8)), true);
        String moveAfterProcess = this.fileSourceConfiguration.getMoveAfterProcess();
        if (this.metrics != null) {
            this.metrics.setFilePath(fileUri);
        }
        try {
            if (this.fileSourceConfiguration.getActionAfterProcess() != null) {
                String destination;
                if (this.fileSourceConfiguration.getMoveAfterProcess() != null && (destination = this.constructPath(moveAfterProcess, this.getFileName(fileUri, this.fileSourceConfiguration.getProtocolForMoveAfterProcess()))) != null) {
                    reGeneratedProperties.put("destination", destination);
                }
                vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties);
                vfsClientConnectorCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), fileUri);
                if (this.metrics != null) {
                    this.fileSourceConfiguration.getExecutorService().execute(() -> {
                        this.metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileUri), StreamStatus.COMPLETED);
                        this.increaseMetricsAfterProcess(this.fileSourceConfiguration.getMoveAfterProcess(), 1, fileUri);
                    });
                }
            }
        }
        catch (ClientConnectorException e) {
            if (this.metrics != null) {
                this.increaseMetricsAfterProcess(this.fileSourceConfiguration.getMoveAfterProcess(), 0, fileUri);
            }
            log.error((Object)String.format("Failure occurred in vfs-client while reading the file '%s'.", fileUri), (Throwable)e);
        }
        catch (InterruptedException e) {
            log.error((Object)String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), (Throwable)e);
        }
        finally {
            if (this.metrics != null) {
                this.metrics.setFilePath(fileUri);
                this.metrics.getCompletedTimeMetric(System.currentTimeMillis());
            }
        }
    }

    private String getFileName(String uri, String protocol) {
        try {
            if ("smb:".equalsIgnoreCase(protocol)) {
                protocol = "ftp:";
            }
            URL url = new URL(String.format("%s%s%s", protocol, File.separator, uri));
            return FilenameUtils.getName((String)url.getPath());
        }
        catch (MalformedURLException e) {
            log.error((Object)String.format("Failed to extract file name from the uri '%s'.", uri), (Throwable)e);
            return null;
        }
    }

    private String constructPath(String baseUri, String fileName) {
        if (baseUri != null && fileName != null) {
            if (baseUri.endsWith(File.separator)) {
                return String.format("%s%s", baseUri, fileName);
            }
            return String.format("%s%s%s", baseUri, File.separator, fileName);
        }
        return null;
    }

    private void increaseMetricsAfterProcess(String moveAfterProcess, int value, String fileUri) {
        if (moveAfterProcess == null) {
            this.metrics.getFileDeleteMetrics().setTime(System.currentTimeMillis());
            this.metrics.getFileDeleteMetrics().getDeleteMetric(value);
        } else {
            this.metrics.getFileMoveMetrics().setDestination(Utils.getShortFilePath(moveAfterProcess));
            this.metrics.getFileMoveMetrics().setTime(System.currentTimeMillis());
            this.metrics.getFileMoveMetrics().getMoveMetric(value);
        }
        this.metrics.setReadPercentage(100.0, fileUri);
    }

    static class FileServerExecutor
    implements Runnable {
        ServerConnector fileServerConnector = null;
        CarbonCallback carbonCallback = null;
        CarbonMessage carbonMessage = null;
        String fileURI = null;
        SourceMetrics metrics;

        FileServerExecutor(CarbonMessage carbonMessage, CarbonCallback carbonCallback, ServerConnector fileServerConnector, String fileURI, SourceMetrics metrics) {
            this.fileURI = fileURI;
            this.fileServerConnector = fileServerConnector;
            this.carbonCallback = carbonCallback;
            this.carbonMessage = carbonMessage;
            this.metrics = metrics;
        }

        @Override
        public void run() {
            block2: {
                try {
                    this.fileServerConnector.start();
                }
                catch (ServerConnectorException e) {
                    log.error((Object)String.format("Failed to start the server for file '%s'. Hence starting to process next file.", this.fileURI));
                    this.carbonCallback.done(this.carbonMessage);
                    if (this.metrics == null) break block2;
                    this.metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(this.fileURI), StreamStatus.ERROR);
                    this.metrics.getTotalErrorCount().inc();
                }
            }
        }
    }
}

