/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuartzSchedulingAgent
extends AbstractSchedulingAgent {
    private final Logger logger = LoggerFactory.getLogger(QuartzSchedulingAgent.class);
    private final FlowController flowController;
    private final RepositoryContextFactory contextFactory;
    private final StringEncryptor encryptor;
    private volatile String adminYieldDuration = "1 sec";
    private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<Object, List<AtomicBoolean>>();

    public QuartzSchedulingAgent(FlowController flowController, FlowEngine flowEngine, RepositoryContextFactory contextFactory, StringEncryptor enryptor) {
        super(flowEngine);
        this.flowController = flowController;
        this.contextFactory = contextFactory;
        this.encryptor = enryptor;
    }

    public void shutdown() {
    }

    @Override
    public void doSchedule(final ReportingTaskNode taskNode, LifecycleState scheduleState) {
        CronExpression cronExpression;
        List<AtomicBoolean> existingTriggers = this.canceledTriggers.get(taskNode);
        if (existingTriggers != null) {
            throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run");
        }
        String cronSchedule = taskNode.getSchedulingPeriod();
        try {
            cronExpression = new CronExpression(cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid");
        }
        final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
        final AtomicBoolean canceled = new AtomicBoolean(false);
        final Date initialDate = cronExpression.getTimeAfter(new Date());
        long initialDelay = initialDate.getTime() - System.currentTimeMillis();
        Runnable command = new Runnable(){
            private Date nextSchedule;
            {
                this.nextSchedule = initialDate;
            }

            @Override
            public void run() {
                if (canceled.get()) {
                    return;
                }
                taskWrapper.run();
                if (canceled.get()) {
                    return;
                }
                this.nextSchedule = QuartzSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                long delay = QuartzSchedulingAgent.getDelay(this.nextSchedule);
                QuartzSchedulingAgent.this.logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{taskNode, this.nextSchedule, delay});
                QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
            }
        };
        ArrayList<AtomicBoolean> triggers = new ArrayList<AtomicBoolean>(1);
        triggers.add(canceled);
        this.canceledTriggers.put(taskNode, triggers);
        this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
        scheduleState.setScheduled(true);
        this.logger.info("Scheduled Reporting Task {} to run threads on schedule {}", (Object)taskNode, (Object)cronSchedule);
    }

    @Override
    public synchronized void doSchedule(final Connectable connectable, LifecycleState scheduleState) {
        CronExpression cronExpression;
        List<AtomicBoolean> existingTriggers = this.canceledTriggers.get(connectable);
        if (existingTriggers != null) {
            throw new IllegalStateException("Cannot schedule " + connectable + " because it is already scheduled to run");
        }
        String cronSchedule = connectable.getSchedulingPeriod();
        try {
            cronExpression = new CronExpression(cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule " + connectable + " to run because its scheduling period is not valid");
        }
        ArrayList<AtomicBoolean> triggers = new ArrayList<AtomicBoolean>();
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, this.flowController, this.contextFactory, scheduleState, this.encryptor);
            final AtomicBoolean canceled = new AtomicBoolean(false);
            final Date initialDate = cronExpression.getTimeAfter(new Date());
            long initialDelay = initialDate.getTime() - System.currentTimeMillis();
            Runnable command = new Runnable(){
                private Date nextSchedule;
                {
                    this.nextSchedule = initialDate;
                }

                @Override
                public void run() {
                    if (canceled.get()) {
                        return;
                    }
                    try {
                        continuallyRunTask.invoke();
                    }
                    catch (RuntimeException re) {
                        throw re;
                    }
                    catch (Exception e) {
                        throw new ProcessException((Throwable)e);
                    }
                    if (canceled.get()) {
                        return;
                    }
                    this.nextSchedule = QuartzSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                    long delay = QuartzSchedulingAgent.getDelay(this.nextSchedule);
                    QuartzSchedulingAgent.this.logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{connectable, this.nextSchedule, delay});
                    QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
                }
            };
            this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
            triggers.add(canceled);
        }
        this.canceledTriggers.put(connectable, triggers);
        this.logger.info("Scheduled {} to run with {} threads on schedule {}", new Object[]{connectable, connectable.getMaxConcurrentTasks(), cronSchedule});
    }

    @Override
    public synchronized void doUnschedule(Connectable connectable, LifecycleState scheduleState) {
        this.unschedule((Object)connectable, scheduleState);
    }

    @Override
    public synchronized void doUnschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        this.unschedule((Object)taskNode, scheduleState);
    }

    private void unschedule(Object scheduled, LifecycleState scheduleState) {
        List<AtomicBoolean> triggers = this.canceledTriggers.remove(scheduled);
        if (triggers == null) {
            throw new IllegalStateException("Cannot unschedule " + scheduled + " because it was not scheduled to run");
        }
        for (AtomicBoolean trigger : triggers) {
            trigger.set(true);
        }
        scheduleState.setScheduled(false);
        this.logger.info("Stopped scheduling {} to run", scheduled);
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int maxThreads) {
    }

    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }

    public void incrementMaxThreadCount(int toAdd) {
        int corePoolSize = this.flowEngine.getCorePoolSize();
        if (toAdd < 0 && corePoolSize + toAdd < 1) {
            throw new IllegalStateException("Cannot remove " + -toAdd + " threads from pool because there are only " + corePoolSize + " threads in the pool");
        }
        this.flowEngine.setCorePoolSize(corePoolSize + toAdd);
    }

    private static Date getNextSchedule(Date currentSchedule, CronExpression cronExpression) {
        Date now = new Date();
        return cronExpression.getTimeAfter(now.after(currentSchedule) ? now : currentSchedule);
    }

    private static long getDelay(Date nextSchedule) {
        return Math.max(nextSchedule.getTime() - System.currentTimeMillis(), 0L);
    }
}

