/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.time.OffsetDateTime;
import java.time.temporal.Temporal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.scheduling.support.CronExpression;

public class CronSchedulingAgent
extends AbstractTimeBasedSchedulingAgent {
    private final Map<Object, Map<Integer, ScheduledFuture<?>>> scheduledFutures = new HashMap();

    public CronSchedulingAgent(FlowController flowController, FlowEngine flowEngine, RepositoryContextFactory contextFactory) {
        super(flowEngine, flowController, contextFactory);
    }

    public void shutdown() {
        this.scheduledFutures.values().forEach(map -> map.values().forEach(future -> {
            if (!future.isCancelled()) {
                future.cancel(true);
            }
        }));
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) {
        CronExpression cronExpression;
        final Map componentFuturesMap = this.scheduledFutures.computeIfAbsent(taskNode, k -> new HashMap());
        if (!componentFuturesMap.values().isEmpty()) {
            throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run");
        }
        String cronSchedule = taskNode.getSchedulingPeriod();
        try {
            cronExpression = CronExpression.parse((String)cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid", pe);
        }
        final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, this.flowController.getExtensionManager());
        final OffsetDateTime initialDate = this.getInitialDate(cronExpression);
        long initialDelay = this.getInitialDelay(initialDate);
        Runnable command = new Runnable(){
            private OffsetDateTime nextSchedule;
            {
                this.nextSchedule = initialDate;
            }

            @Override
            public void run() {
                taskWrapper.run();
                this.nextSchedule = CronSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                long delay = CronSchedulingAgent.getDelay(this.nextSchedule);
                CronSchedulingAgent.this.logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{taskNode, this.nextSchedule, delay});
                ScheduledFuture newFuture = CronSchedulingAgent.this.flowEngine.schedule((Runnable)this, delay, TimeUnit.MILLISECONDS);
                ScheduledFuture oldFuture = componentFuturesMap.put(0, newFuture);
                scheduleState.replaceFuture(oldFuture, newFuture);
            }
        };
        ScheduledFuture future = this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
        componentFuturesMap.put(0, future);
        scheduleState.setScheduled(true);
        scheduleState.setFutures(componentFuturesMap.values());
        this.logger.info("Scheduled Reporting Task {} to run threads on schedule {}", (Object)taskNode, (Object)cronSchedule);
    }

    @Override
    public synchronized void doSchedule(final Connectable connectable, final LifecycleState scheduleState) {
        CronExpression cronExpression;
        final Map componentFuturesMap = this.scheduledFutures.computeIfAbsent(connectable, k -> new HashMap());
        if (!componentFuturesMap.values().isEmpty()) {
            throw new IllegalStateException("Cannot schedule " + String.valueOf(connectable) + " because it is already scheduled to run");
        }
        String cronSchedule = connectable.evaluateParameters(connectable.getSchedulingPeriod());
        try {
            cronExpression = CronExpression.parse((String)cronSchedule);
        }
        catch (Exception pe) {
            throw new IllegalStateException("Cannot schedule " + String.valueOf(connectable) + " to run because its scheduling period is not valid", pe);
        }
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, this.flowController, this.contextFactory, scheduleState);
            final AtomicInteger taskNumber = new AtomicInteger(i);
            final OffsetDateTime initialDate = this.getInitialDate(cronExpression);
            long initialDelay = this.getInitialDelay(initialDate);
            Runnable command = new Runnable(){
                private OffsetDateTime nextSchedule;
                {
                    this.nextSchedule = initialDate;
                }

                @Override
                public void run() {
                    try {
                        continuallyRunTask.invoke();
                    }
                    catch (RuntimeException re) {
                        throw re;
                    }
                    catch (Exception e) {
                        throw new ProcessException((Throwable)e);
                    }
                    this.nextSchedule = CronSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                    long delay = CronSchedulingAgent.getDelay(this.nextSchedule);
                    CronSchedulingAgent.this.logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{connectable, this.nextSchedule, delay});
                    ScheduledFuture newFuture = CronSchedulingAgent.this.flowEngine.schedule((Runnable)this, delay, TimeUnit.MILLISECONDS);
                    ScheduledFuture oldFuture = componentFuturesMap.put(taskNumber.get(), newFuture);
                    scheduleState.replaceFuture(oldFuture, newFuture);
                }
            };
            ScheduledFuture future = this.flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
            componentFuturesMap.put(taskNumber.get(), future);
        }
        scheduleState.setFutures(componentFuturesMap.values());
        this.logger.info("Scheduled {} to run with {} threads on schedule {}", new Object[]{connectable, connectable.getMaxConcurrentTasks(), cronSchedule});
    }

    @Override
    public synchronized void doUnschedule(Connectable connectable, LifecycleState scheduleState) {
        this.unschedule((Object)connectable, scheduleState);
    }

    @Override
    public synchronized void doUnschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        this.unschedule((Object)taskNode, scheduleState);
    }

    private void unschedule(Object scheduled, LifecycleState scheduleState) {
        this.scheduledFutures.remove(scheduled);
        scheduleState.getFutures().forEach(future -> {
            if (!future.isCancelled()) {
                future.cancel(false);
            }
        });
        scheduleState.setScheduled(false);
        this.logger.info("Stopped scheduling {} to run", scheduled);
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int maxThreads) {
    }

    private OffsetDateTime getInitialDate(CronExpression cronExpression) {
        OffsetDateTime now = OffsetDateTime.now();
        OffsetDateTime initialDate = (OffsetDateTime)cronExpression.next((Temporal)now);
        return initialDate == null ? now : initialDate;
    }

    private long getInitialDelay(OffsetDateTime initialDate) {
        return initialDate.toInstant().toEpochMilli() - System.currentTimeMillis();
    }

    private static OffsetDateTime getNextSchedule(OffsetDateTime currentSchedule, CronExpression cronExpression) {
        OffsetDateTime now = OffsetDateTime.now();
        return (OffsetDateTime)cronExpression.next((Temporal)(now.isAfter(currentSchedule) ? now : currentSchedule));
    }

    private static long getDelay(OffsetDateTime nextSchedule) {
        return Math.max(nextSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L);
    }
}

