/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.transport.file.connector.server;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.vfs2.FileNotFoundException;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemManager;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.FileType;
import org.apache.commons.vfs2.RandomAccessContent;
import org.apache.commons.vfs2.VFS;
import org.apache.commons.vfs2.provider.UriParser;
import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder;
import org.apache.commons.vfs2.util.RandomAccessMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.CarbonMessageProcessor;
import org.wso2.carbon.messaging.TextCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
import org.wso2.transport.file.connector.server.exception.FileServerConnectorException;
import org.wso2.transport.file.connector.server.util.Constants;
import org.wso2.transport.file.connector.server.util.FileTransportUtils;

public class FileConsumer {
    private static final Logger log = LoggerFactory.getLogger(FileConsumer.class);
    private Map<String, String> fileProperties;
    private FileSystemManager fsManager = null;
    private CarbonMessageProcessor messageProcessor;
    private String path;
    private String serviceName;
    private FileObject fileObject;
    private FileSystemOptions fso;
    private final byte[] inbuf = new byte[4096];
    private int startPosition;
    private long currentTime = 0L;
    private long position = 0L;
    private RandomAccessContent reader = null;
    private int maxLinesPerPoll;

    public FileConsumer(String id, Map<String, String> fileProperties, CarbonMessageProcessor messageProcessor) throws ServerConnectorException {
        this.serviceName = id;
        this.fileProperties = fileProperties;
        this.messageProcessor = messageProcessor;
        this.setupParams();
        try {
            this.fsManager = VFS.getManager();
        }
        catch (FileSystemException e) {
            throw new ServerConnectorException("Could not initialize File System Manager from the configuration: providers.xml", e);
        }
        Map<String, String> options = this.parseSchemeFileOptions(this.path);
        this.fso = FileTransportUtils.attachFileSystemOptions(options, this.fsManager);
        if (options != null && "ftp".equals(options.get("VFS_SCHEME"))) {
            FtpFileSystemConfigBuilder.getInstance().setPassiveMode(this.fso, true);
        }
        try {
            this.fileObject = this.fsManager.resolveFile(this.path, this.fso);
            this.reader = this.fileObject.getContent().getRandomAccessContent(RandomAccessMode.READ);
            this.position = this.startPosition == -1 ? this.fileObject.getContent().getSize() : (long)this.startPosition;
            this.currentTime = System.currentTimeMillis();
            this.reader.seek(this.position);
        }
        catch (FileSystemException e) {
            throw new FileServerConnectorException("Failed to resolve path: " + FileTransportUtils.maskURLPassword(this.path), e);
        }
        catch (IOException e) {
            throw new FileServerConnectorException("Failed to read File: " + FileTransportUtils.maskURLPassword(this.path), e);
        }
    }

    public void consume() throws FileServerConnectorException {
        boolean isFileReadable;
        boolean isFileExists;
        if (log.isDebugEnabled()) {
            log.debug("Polling for file : " + FileTransportUtils.maskURLPassword(this.path));
        }
        try {
            isFileExists = this.fileObject.exists();
        }
        catch (FileSystemException e) {
            throw new FileServerConnectorException("Error occurred when determining whether the file at URI : " + FileTransportUtils.maskURLPassword(this.path) + " exists. " + e);
        }
        try {
            isFileReadable = this.fileObject.isReadable();
        }
        catch (FileSystemException e) {
            throw new FileServerConnectorException("Error occurred while determining whether the file at URI : " + FileTransportUtils.maskURLPassword(this.path) + " is readable. " + e);
        }
        if (isFileExists && isFileReadable) {
            FileType fileType;
            try {
                fileType = this.fileObject.getType();
            }
            catch (FileSystemException e) {
                throw new FileServerConnectorException("Error occurred while determining whether the file: " + FileTransportUtils.maskURLPassword(this.path) + " is a file or a folder", e);
            }
            if (fileType != FileType.FILE) {
                throw new FileServerConnectorException("Unable to access or read file/directory : " + FileTransportUtils.maskURLPassword(this.path));
            }
            this.processFile(this.fileObject);
            if (log.isDebugEnabled()) {
                log.debug("End : Scanning directory or file : " + FileTransportUtils.maskURLPassword(this.path));
            }
        }
    }

    private void setupParams() throws ServerConnectorException {
        this.path = this.fileProperties.get("path");
        if (this.path == null) {
            throw new ServerConnectorException("path is a mandatory parameter for file transport.");
        }
        if (this.path.trim().equals("")) {
            throw new ServerConnectorException("path parameter cannot be empty for file transport.");
        }
        String startPosition = this.fileProperties.get("startPosition");
        this.startPosition = startPosition != null ? Integer.parseInt(startPosition) : -1;
        String maxLinesPerPoll = this.fileProperties.get("maxLinesPerPoll");
        this.maxLinesPerPoll = maxLinesPerPoll != null ? Integer.parseInt(maxLinesPerPoll) : -1;
    }

    private Map<String, String> parseSchemeFileOptions(String fileURI) {
        String scheme = UriParser.extractScheme(fileURI);
        if (scheme == null) {
            return null;
        }
        HashMap<String, String> schemeFileOptions = new HashMap<String, String>();
        schemeFileOptions.put("VFS_SCHEME", scheme);
        this.addOptions(scheme, schemeFileOptions);
        return schemeFileOptions;
    }

    private void addOptions(String scheme, Map<String, String> schemeFileOptions) {
        if (scheme.equals("sftp")) {
            for (Constants.SftpFileOption option : Constants.SftpFileOption.values()) {
                String strValue = this.fileProperties.get("sftp" + option.toString());
                if (strValue == null || strValue.equals("")) continue;
                schemeFileOptions.put(option.toString(), strValue);
            }
        }
    }

    private FileObject processFile(FileObject file) throws FileServerConnectorException {
        block9: {
            try {
                boolean newer = FileConsumer.isFileNewer(this.fileObject, this.currentTime);
                long length = this.fileObject.getContent().getSize();
                if (length < this.position) {
                    EventListener.fileRotated(this.fileObject, this.messageProcessor, this.serviceName);
                    try {
                        this.reader = this.fileObject.getContent().getRandomAccessContent(RandomAccessMode.READ);
                        this.position = this.fileObject.getContent().getSize();
                        break block9;
                    }
                    catch (FileNotFoundException e) {
                        throw new FileServerConnectorException("File Not Found: " + FileTransportUtils.maskURLPassword(this.path), e);
                    }
                }
                if (length > this.position) {
                    this.position = this.readLines(this.reader);
                    this.currentTime = System.currentTimeMillis();
                } else if (newer) {
                    this.position = this.fileObject.getContent().getSize();
                    this.reader.seek(this.position);
                    this.position = this.readLines(this.reader);
                    this.currentTime = System.currentTimeMillis();
                }
                FileObject parent = this.fileObject.getParent();
                parent.getType();
                parent.refresh();
                this.fileObject.refresh();
                this.reader.close();
                this.reader = this.fileObject.getContent().getRandomAccessContent(RandomAccessMode.READ);
                this.reader.seek(this.position);
            }
            catch (FileSystemException e) {
                throw new FileServerConnectorException("Error in reading file: " + FileTransportUtils.maskURLPassword(this.path), e);
            }
            catch (IOException e) {
                throw new FileServerConnectorException("Error in reading file: " + FileTransportUtils.maskURLPassword(this.path), e);
            }
        }
        return file;
    }

    private long readLines(RandomAccessContent reader) throws IOException, FileServerConnectorException {
        int num;
        long pos;
        long rePos = pos = reader.getFilePointer();
        ArrayList<Byte> list = new ArrayList<Byte>();
        int lines = 0;
        boolean throttled = false;
        while ((num = FileConsumer.read(reader, this.inbuf)) != -1 && !throttled) {
            for (int i = 0; i < num && !throttled; ++i) {
                byte ch = this.inbuf[i];
                if (ch == 10) {
                    Byte[] line = new Byte[list.size()];
                    line = list.toArray(line);
                    rePos = pos + (long)i + 1L;
                    EventListener.fileUpdated(line, this.messageProcessor, this.serviceName, rePos);
                    ++lines;
                    list.clear();
                } else {
                    list.add(ch);
                }
                if (this.maxLinesPerPoll == -1 || lines <= this.maxLinesPerPoll || ch != 10) continue;
                throttled = true;
            }
            pos = reader.getFilePointer();
        }
        reader.seek(rePos);
        return rePos;
    }

    private static boolean isFileNewer(FileObject file, long timeMillis) throws FileSystemException {
        if (file == null) {
            throw new IllegalArgumentException("No specified file");
        }
        return !file.exists() || file.getContent().getLastModifiedTime() > timeMillis;
    }

    private static int read(RandomAccessContent reader, byte[] inbuf) throws IOException {
        InputStream is = reader.getInputStream();
        int count = is.read(inbuf);
        return count;
    }

    private static class EventListener {
        private EventListener() {
        }

        private static void fileRotated(FileObject file, CarbonMessageProcessor messageProcessor, String serviceName) throws FileServerConnectorException {
            try {
                TextCarbonMessage cMessage = new TextCarbonMessage(file.getURL().toString());
                cMessage.setProperty("PROTOCOL", "file");
                cMessage.setProperty("TRANSPORT_FILE_SERVICE_NAME", serviceName);
                cMessage.setProperty("FILE_TRANSPORT_EVENT_NAME", "FILE_ROTATE");
                messageProcessor.receive(cMessage, null);
            }
            catch (Exception e) {
                throw new FileServerConnectorException("Failed to send event message processor. ", e);
            }
        }

        private static void fileUpdated(Byte[] content, CarbonMessageProcessor messageProcessor, String serviceName, long currentPosition) throws FileServerConnectorException {
            try {
                BinaryCarbonMessage cMessage = new BinaryCarbonMessage(ByteBuffer.wrap(EventListener.toPrimitives(content)), true);
                cMessage.setProperty("PROTOCOL", "file");
                cMessage.setProperty("TRANSPORT_FILE_SERVICE_NAME", serviceName);
                cMessage.setProperty("FILE_TRANSPORT_EVENT_NAME", "FILE_UPDATE");
                cMessage.setProperty("SINGLE_THREADED", serviceName);
                cMessage.setProperty("currentPosition", currentPosition);
                messageProcessor.receive(cMessage, null);
            }
            catch (Exception e) {
                throw new FileServerConnectorException("Failed to send event message processor. ", e);
            }
        }

        private static byte[] toPrimitives(Byte[] oBytes) {
            byte[] bytes = new byte[oBytes.length];
            for (int i = 0; i < oBytes.length; ++i) {
                bytes[i] = oBytes[i];
            }
            return bytes;
        }
    }
}

