/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.file.listeners;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
import io.siddhi.extension.io.file.util.Util;
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.transport.file.connector.sender.VFSClientConnector;

public class FileCronExecutor
implements Job {
    private static final Logger log = LogManager.getLogger(FileCronExecutor.class);

    public static void scheduleJob(FileSourceConfiguration fileSourceConfiguration, SourceEventListener sourceEventListener, SiddhiAppContext siddhiAppContext) {
        try {
            JobKey jobKey = new JobKey("JobName", "JobGroup");
            Scheduler scheduler = new StdSchedulerFactory().getScheduler();
            fileSourceConfiguration.setScheduler(scheduler);
            if (scheduler.checkExists(jobKey)) {
                scheduler.deleteJob(jobKey);
            }
            scheduler.start();
            JobDataMap dataMap = new JobDataMap();
            dataMap.put("FileSourceConfiguration", (Object)fileSourceConfiguration);
            dataMap.put("SourceEventListener", (Object)sourceEventListener);
            JobDetail cron = JobBuilder.newJob(FileCronExecutor.class).usingJobData(dataMap).withIdentity(jobKey).build();
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("TriggerName", "TriggerGroup").withSchedule(CronScheduleBuilder.cronSchedule(fileSourceConfiguration.getCronExpression())).build();
            scheduler.scheduleJob(cron, trigger);
        }
        catch (SchedulerException e) {
            log.error("The error occurs at scheduler start in SiddhiApp " + siddhiAppContext.getName() + " : " + e);
        }
    }

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration)dataMap.get("FileSourceConfiguration");
        SourceEventListener sourceEventListener = (SourceEventListener)dataMap.get("SourceEventListener");
        File listeningFileObject = new File(fileSourceConfiguration.getUri());
        if (listeningFileObject.isDirectory()) {
            File[] listOfFiles = listeningFileObject.listFiles();
            if (listOfFiles != null) {
                for (File file : listOfFiles) {
                    if (!file.isFile()) continue;
                    this.processFile(file.toURI().toString(), jobExecutionContext, sourceEventListener);
                }
            }
        } else {
            this.processFile(listeningFileObject.toURI().toString(), jobExecutionContext, sourceEventListener);
        }
    }

    public void processFile(String fileURI, JobExecutionContext jobExecutionContext, SourceEventListener sourceEventListener) {
        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration)dataMap.get("FileSourceConfiguration");
        FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, null);
        VFSClientConnector vfsClientConnector = new VFSClientConnector();
        vfsClientConnector.setMessageProcessor(fileProcessor);
        Map<String, String> properties = Util.generateProperties(fileSourceConfiguration, fileURI);
        VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
        this.initialProcessFile(vfsClientConnector, carbonCallback, properties, fileURI, fileSourceConfiguration, fileProcessor);
    }

    public void initialProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback carbonCallback, Map<String, String> properties, String fileURI, FileSourceConfiguration fileSourceConfiguration, FileProcessor fileProcessor) {
        vfsClientConnector.setMessageProcessor(fileProcessor);
        BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
        try {
            vfsClientConnector.send(carbonMessage, carbonCallback, properties);
            try {
                carbonCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileURI);
            }
            catch (InterruptedException e) {
                log.error(String.format("Failed to get callback from vfs-client  for file '%s'.", fileURI), (Throwable)e);
            }
            this.reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI, fileSourceConfiguration);
        }
        catch (ClientConnectorException e) {
            log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), (Throwable)e);
        }
    }

    public void reProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback vfsClientConnectorCallback, Map<String, String> properties, String fileUri, FileSourceConfiguration fileSourceConfiguration) {
        BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(fileUri.getBytes(StandardCharsets.UTF_8)), true);
        String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess();
        Map<String, String> reGeneratedProperties = Util.reProcessFileGenerateProperties(fileSourceConfiguration, fileUri, properties);
        try {
            File file = new File(fileSourceConfiguration.getUri());
            if (file.isFile()) {
                reGeneratedProperties.put("destination", moveAfterProcess);
            } else {
                String destination = Util.constructPath(moveAfterProcess, Util.getFileName(fileUri, fileSourceConfiguration.getProtocolForMoveAfterProcess()));
                if (destination != null) {
                    reGeneratedProperties.put("destination", destination);
                }
            }
            vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties);
            vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri);
        }
        catch (ClientConnectorException e) {
            log.error(String.format("Failure occurred in vfs-client while reading the file '%s '.", fileUri), (Throwable)e);
        }
        catch (InterruptedException e) {
            log.error(String.format("Failed to get callback from vfs-client for file '%s '.", fileUri), (Throwable)e);
        }
    }
}

