package org.wso2.extension.siddhi.io.file.listeners;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.CarbonCallback;
import org.wso2.carbon.messaging.CarbonMessage;
import org.wso2.carbon.messaging.ServerConnector;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
import org.wso2.carbon.transport.file.connector.sender.VFSClientConnector;
import org.wso2.carbon.transport.file.connector.server.FileServerConnector;
import org.wso2.carbon.transport.file.connector.server.FileServerConnectorProvider;
import org.wso2.carbon.transport.remotefilesystem.exception.RemoteFileSystemConnectorException;
import org.wso2.carbon.transport.remotefilesystem.listener.RemoteFileSystemListener;
import org.wso2.carbon.transport.remotefilesystem.message.RemoteFileSystemBaseMessage;
import org.wso2.carbon.transport.remotefilesystem.message.RemoteFileSystemEvent;
import org.wso2.extension.siddhi.io.file.processors.FileProcessor;
import org.wso2.extension.siddhi.io.file.util.Constants;
import org.wso2.extension.siddhi.io.file.util.FileSourceConfiguration;
import org.wso2.extension.siddhi.io.file.util.FileSourceServiceProvider;
import org.wso2.extension.siddhi.io.file.util.VFSClientConnectorCallback;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

/* loaded from: input_file:org/wso2/extension/siddhi/io/file/listeners/FileSystemListener.class */
public class FileSystemListener implements RemoteFileSystemListener {
    private static final Logger log = Logger.getLogger(FileSystemListener.class);
    private SourceEventListener sourceEventListener;
    private FileSourceConfiguration fileSourceConfiguration;
    private FileSourceServiceProvider fileSourceServiceProvider = FileSourceServiceProvider.getInstance();

    /* loaded from: input_file:org/wso2/extension/siddhi/io/file/listeners/FileSystemListener$FileServerExecutor.class */
    static class FileServerExecutor implements Runnable {
        ServerConnector fileServerConnector;
        CarbonCallback carbonCallback;
        CarbonMessage carbonMessage;
        String fileURI;

        FileServerExecutor(CarbonMessage carbonMessage, CarbonCallback carbonCallback, ServerConnector serverConnector, String str) {
            this.fileServerConnector = null;
            this.carbonCallback = null;
            this.carbonMessage = null;
            this.fileURI = null;
            this.fileURI = str;
            this.fileServerConnector = serverConnector;
            this.carbonCallback = carbonCallback;
            this.carbonMessage = carbonMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.fileServerConnector.start();
            } catch (ServerConnectorException e) {
                FileSystemListener.log.error(String.format("Failed to start the server for file '%s'. Hence starting to process next file.", this.fileURI));
                this.carbonCallback.done(this.carbonMessage);
            }
        }
    }

    public FileSystemListener(SourceEventListener sourceEventListener, FileSourceConfiguration fileSourceConfiguration) {
        this.sourceEventListener = sourceEventListener;
        this.fileSourceConfiguration = fileSourceConfiguration;
    }

    @Override // org.wso2.carbon.transport.remotefilesystem.listener.RemoteFileSystemListener
    public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage) {
        if (!(remoteFileSystemBaseMessage instanceof RemoteFileSystemEvent)) {
            return false;
        }
        String mode = this.fileSourceConfiguration.getMode();
        String uri = ((RemoteFileSystemEvent) remoteFileSystemBaseMessage).getUri();
        if (Constants.TEXT_FULL.equalsIgnoreCase(mode)) {
            VFSClientConnector vFSClientConnector = new VFSClientConnector();
            vFSClientConnector.setMessageProcessor(new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration));
            Map<String, String> hashMap = new HashMap<>();
            hashMap.put("uri", uri);
            hashMap.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE);
            hashMap.put("action", "read");
            hashMap.put("pollingInterval", this.fileSourceConfiguration.getFilePollingInterval());
            VFSClientConnectorCallback vFSClientConnectorCallback = new VFSClientConnectorCallback();
            CarbonMessage binaryCarbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(uri.getBytes(Charset.forName("UTF-8"))), true);
            try {
                vFSClientConnector.send(binaryCarbonMessage, vFSClientConnectorCallback, hashMap);
                try {
                    vFSClientConnectorCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), uri);
                    reProcessFile(vFSClientConnector, vFSClientConnectorCallback, hashMap, uri);
                    return true;
                } catch (InterruptedException e) {
                    log.error(String.format("Failed to wait until file '%s' is processed.", uri), e);
                    return false;
                }
            } catch (ClientConnectorException e2) {
                log.error(String.format("Failed to provide file '%s' for consuming.", uri), e2);
                vFSClientConnectorCallback.done(binaryCarbonMessage);
                return true;
            }
        }
        if (Constants.BINARY_FULL.equalsIgnoreCase(mode)) {
            VFSClientConnector vFSClientConnector2 = new VFSClientConnector();
            vFSClientConnector2.setMessageProcessor(new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration));
            Map<String, String> hashMap2 = new HashMap<>();
            hashMap2.put("uri", uri);
            hashMap2.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE);
            hashMap2.put("action", "read");
            hashMap2.put("pollingInterval", this.fileSourceConfiguration.getFilePollingInterval());
            VFSClientConnectorCallback vFSClientConnectorCallback2 = new VFSClientConnectorCallback();
            try {
                vFSClientConnector2.send(new BinaryCarbonMessage(ByteBuffer.wrap(uri.getBytes(Charset.forName("UTF-8"))), true), vFSClientConnectorCallback2, hashMap2);
                try {
                    vFSClientConnectorCallback2.waitTillDone(this.fileSourceConfiguration.getTimeout(), uri);
                    reProcessFile(vFSClientConnector2, vFSClientConnectorCallback2, hashMap2, uri);
                    return true;
                } catch (InterruptedException e3) {
                    log.error(String.format("Failed to get callback from vfs-client  for file '%s'.", uri), e3);
                    return false;
                }
            } catch (ClientConnectorException e4) {
                log.error(String.format("Failed to provide file '%s' for consuming.", uri), e4);
                return true;
            }
        }
        if (!Constants.LINE.equalsIgnoreCase(mode) && !Constants.REGEX.equalsIgnoreCase(mode)) {
            return true;
        }
        Map<String, String> hashMap3 = new HashMap<>();
        hashMap3.put("action", "read");
        hashMap3.put("maxLinesPerPoll", "10");
        hashMap3.put("pollingInterval", this.fileSourceConfiguration.getFilePollingInterval());
        if (!this.fileSourceConfiguration.isTailingEnabled()) {
            hashMap3.put("uri", uri);
            VFSClientConnector vFSClientConnector3 = new VFSClientConnector();
            vFSClientConnector3.setMessageProcessor(new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration));
            VFSClientConnectorCallback vFSClientConnectorCallback3 = new VFSClientConnectorCallback();
            try {
                vFSClientConnector3.send(new BinaryCarbonMessage(ByteBuffer.wrap(uri.getBytes(Charset.forName("UTF-8"))), true), vFSClientConnectorCallback3, hashMap3);
                try {
                    vFSClientConnectorCallback3.waitTillDone(this.fileSourceConfiguration.getTimeout(), uri);
                    reProcessFile(vFSClientConnector3, vFSClientConnectorCallback3, hashMap3, uri);
                    return true;
                } catch (InterruptedException e5) {
                    log.error(String.format("Failed to get callback from vfs-client  for file '%s'.", uri), e5);
                    return false;
                }
            } catch (ClientConnectorException e6) {
                log.error(String.format("Failed to provide file '%s' for consuming.", uri), e6);
                return true;
            }
        }
        if (this.fileSourceConfiguration.getTailedFileURI() == null) {
            this.fileSourceConfiguration.setTailedFileURI(uri);
        }
        if (!this.fileSourceConfiguration.getTailedFileURI().equalsIgnoreCase(uri)) {
            return true;
        }
        try {
            this.fileSourceConfiguration.getFileSystemServerConnector().stop();
        } catch (RemoteFileSystemConnectorException e7) {
            log.error(String.format("Failed to stop file system server while processing the file '%s' with tailing enabled.", uri), e7);
        }
        hashMap3.put("startPosition", this.fileSourceConfiguration.getFilePointer());
        hashMap3.put("path", uri);
        FileServerConnectorProvider fileServerConnectorProvider = this.fileSourceServiceProvider.getFileServerConnectorProvider();
        FileProcessor fileProcessor = new FileProcessor(this.sourceEventListener, this.fileSourceConfiguration);
        ServerConnector createConnector = fileServerConnectorProvider.createConnector("file-server-connector", hashMap3);
        createConnector.setMessageProcessor(fileProcessor);
        this.fileSourceConfiguration.setFileServerConnector((FileServerConnector) createConnector);
        this.fileSourceConfiguration.getExecutorService().execute(new FileServerExecutor(new BinaryCarbonMessage(ByteBuffer.wrap(uri.getBytes(Charset.forName("UTF-8"))), true), new VFSClientConnectorCallback(), createConnector, uri));
        return true;
    }

    @Override // org.wso2.carbon.transport.remotefilesystem.listener.RemoteFileSystemListener
    public void onError(Throwable th) {
    }

    @Override // org.wso2.carbon.transport.remotefilesystem.listener.RemoteFileSystemListener
    public void done() {
    }

    private void reProcessFile(VFSClientConnector vFSClientConnector, VFSClientConnectorCallback vFSClientConnectorCallback, Map<String, String> map, String str) {
        String constructPath;
        map.put("uri", str);
        map.put(Constants.ACK_TIME_OUT, "1000");
        BinaryCarbonMessage binaryCarbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(str.getBytes(Charset.forName("UTF-8"))), true);
        String actionAfterProcess = this.fileSourceConfiguration.getActionAfterProcess();
        String moveAfterProcess = this.fileSourceConfiguration.getMoveAfterProcess();
        try {
            if (this.fileSourceConfiguration.getActionAfterProcess() != null) {
                map.put("uri", str);
                map.put("action", actionAfterProcess);
                if (this.fileSourceConfiguration.getMoveAfterProcess() != null && (constructPath = constructPath(moveAfterProcess, getFileName(str, this.fileSourceConfiguration.getProtocolForMoveAfterProcess()))) != null) {
                    map.put("destination", constructPath);
                }
                vFSClientConnector.send(binaryCarbonMessage, vFSClientConnectorCallback, map);
                vFSClientConnectorCallback.waitTillDone(this.fileSourceConfiguration.getTimeout(), str);
            }
        } catch (InterruptedException e) {
            log.error(String.format("Failed to get callback from vfs-client  for file '%s'.", str), e);
        } catch (ClientConnectorException e2) {
            log.error(String.format("Failure occurred in vfs-client while reading the file '%s'.", str), e2);
        }
    }

    private String getFileName(String str, String str2) {
        try {
            return FilenameUtils.getName(new URL(String.format("%s%s%s", str2, File.separator, str)).getPath());
        } catch (MalformedURLException e) {
            log.error(String.format("Failed to extract file name from the uri '%s'.", str), e);
            return null;
        }
    }

    private String constructPath(String str, String str2) {
        if (str == null || str2 == null) {
            return null;
        }
        return str.endsWith(File.separator) ? String.format("%s%s", str, str2) : String.format("%s%s%s", str, File.separator, str2);
    }
}
