/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.tasks.ContinuallyRunConnectableTask;
import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuartzSchedulingAgent
extends AbstractSchedulingAgent {
    private final Logger logger = LoggerFactory.getLogger(QuartzSchedulingAgent.class);
    private final FlowController flowController;
    private final ProcessContextFactory contextFactory;
    private final StringEncryptor encryptor;
    private final VariableRegistry variableRegistry;
    private volatile String adminYieldDuration = "1 sec";
    private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<Object, List<AtomicBoolean>>();

    public QuartzSchedulingAgent(FlowController flowController, FlowEngine flowEngine, ProcessContextFactory contextFactory, StringEncryptor enryptor, VariableRegistry variableRegistry) {
        super(flowEngine);
        this.flowController = flowController;
        this.contextFactory = contextFactory;
        this.encryptor = enryptor;
        this.variableRegistry = variableRegistry;
    }

    private StateManager getStateManager(String componentId) {
        return this.flowController.getStateManagerProvider().getStateManager(componentId);
    }

    @Override
    public void shutdown() {
    }

    @Override
    public void doSchedule(final ReportingTaskNode taskNode, ScheduleState scheduleState) {
        CronExpression cronExpression;
        List<AtomicBoolean> existingTriggers = this.canceledTriggers.get(taskNode);
        if (existingTriggers != null) {
            throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask() + " because it is already scheduled to run");
        }
        String cronSchedule = taskNode.getSchedulingPeriod();
        try {
            cronExpression = new CronExpression(cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask() + " to run because its scheduling period is not valid");
        }
        final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
        final AtomicBoolean canceled = new AtomicBoolean(false);
        Runnable command = new Runnable(){

            @Override
            public void run() {
                if (canceled.get()) {
                    return;
                }
                taskWrapper.run();
                if (canceled.get()) {
                    return;
                }
                Date date = cronExpression.getTimeAfter(new Date());
                long delay = date.getTime() - System.currentTimeMillis();
                QuartzSchedulingAgent.this.logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{taskNode, date, delay});
                QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
            }
        };
        ArrayList<AtomicBoolean> triggers = new ArrayList<AtomicBoolean>(1);
        triggers.add(canceled);
        this.canceledTriggers.put(taskNode, triggers);
        Date initialDate = cronExpression.getTimeAfter(new Date());
        long initialDelay = initialDate.getTime() - System.currentTimeMillis();
        this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
        scheduleState.setScheduled(true);
        this.logger.info("Scheduled Reporting Task {} to run threads on schedule {}", (Object)taskNode, (Object)cronSchedule);
    }

    @Override
    public synchronized void doSchedule(final Connectable connectable, ScheduleState scheduleState) {
        CronExpression cronExpression;
        List<AtomicBoolean> existingTriggers = this.canceledTriggers.get(connectable);
        if (existingTriggers != null) {
            throw new IllegalStateException("Cannot schedule " + connectable + " because it is already scheduled to run");
        }
        String cronSchedule = connectable.getSchedulingPeriod();
        try {
            cronExpression = new CronExpression(cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule " + connectable + " to run because its scheduling period is not valid");
        }
        ArrayList<AtomicBoolean> triggers = new ArrayList<AtomicBoolean>();
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            Callable<Boolean> continuallyRunTask;
            if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                ProcessorNode procNode = (ProcessorNode)connectable;
                StandardProcessContext standardProcContext = new StandardProcessContext(procNode, this.flowController, this.encryptor, this.getStateManager(connectable.getIdentifier()), this.variableRegistry);
                ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, this.flowController, this.contextFactory, scheduleState, standardProcContext);
                continuallyRunTask = runnableTask;
            } else {
                ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, this.encryptor, this.getStateManager(connectable.getIdentifier()));
                continuallyRunTask = new ContinuallyRunConnectableTask(this.contextFactory, connectable, scheduleState, connProcContext);
            }
            final AtomicBoolean canceled = new AtomicBoolean(false);
            Runnable command = new Runnable(){

                @Override
                public void run() {
                    if (canceled.get()) {
                        return;
                    }
                    try {
                        continuallyRunTask.call();
                    }
                    catch (RuntimeException re) {
                        throw re;
                    }
                    catch (Exception e) {
                        throw new ProcessException((Throwable)e);
                    }
                    if (canceled.get()) {
                        return;
                    }
                    Date date = cronExpression.getTimeAfter(new Date());
                    long delay = date.getTime() - System.currentTimeMillis();
                    QuartzSchedulingAgent.this.logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{connectable, date, delay});
                    QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
                }
            };
            Date initialDate = cronExpression.getTimeAfter(new Date());
            long initialDelay = initialDate.getTime() - System.currentTimeMillis();
            this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
            triggers.add(canceled);
        }
        this.canceledTriggers.put(connectable, triggers);
        this.logger.info("Scheduled {} to run with {} threads on schedule {}", new Object[]{connectable, connectable.getMaxConcurrentTasks(), cronSchedule});
    }

    @Override
    public synchronized void doUnschedule(Connectable connectable, ScheduleState scheduleState) {
        this.unschedule((Object)connectable, scheduleState);
    }

    @Override
    public synchronized void doUnschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
        this.unschedule((Object)taskNode, scheduleState);
    }

    private void unschedule(Object scheduled, ScheduleState scheduleState) {
        List<AtomicBoolean> triggers = this.canceledTriggers.remove(scheduled);
        if (triggers == null) {
            throw new IllegalStateException("Cannot unschedule " + scheduled + " because it was not scheduled to run");
        }
        for (AtomicBoolean trigger : triggers) {
            trigger.set(true);
        }
        scheduleState.setScheduled(false);
        this.logger.info("Stopped scheduling {} to run", scheduled);
    }

    @Override
    public void onEvent(Connectable connectable) {
    }

    @Override
    public void setMaxThreadCount(int maxThreads) {
    }

    @Override
    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    @Override
    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    @Override
    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }
}

