/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.InvocationResult;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;

public class TimerDrivenSchedulingAgent
extends AbstractTimeBasedSchedulingAgent {
    private final long noWorkYieldNanos;

    public TimerDrivenSchedulingAgent(FlowController flowController, FlowEngine flowEngine, RepositoryContextFactory contextFactory, NiFiProperties nifiProperties) {
        super(flowEngine, flowController, contextFactory);
        String boredYieldDuration = nifiProperties.getBoredYieldDuration();
        try {
            this.noWorkYieldNanos = FormatUtils.getTimeDuration((String)boredYieldDuration, (TimeUnit)TimeUnit.NANOSECONDS);
        }
        catch (IllegalArgumentException e) {
            throw new RuntimeException("Failed to create SchedulingAgent because the nifi.bored.yield.duration property is set to an invalid time duration: " + boredYieldDuration);
        }
    }

    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        ReportingTaskWrapper reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, this.flowController.getExtensionManager());
        long schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS);
        ScheduledFuture future = this.flowEngine.scheduleWithFixedDelay((Runnable)reportingTaskWrapper, 0L, schedulingNanos, TimeUnit.NANOSECONDS);
        ArrayList<ScheduledFuture> futures = new ArrayList<ScheduledFuture>(1);
        futures.add(future);
        scheduleState.setFutures(futures);
        this.logger.info("{} started.", (Object)taskNode.getReportingTask());
    }

    @Override
    public void doSchedule(Connectable connectable, LifecycleState scheduleState) {
        ArrayList<ScheduledFuture> futures = new ArrayList<ScheduledFuture>();
        ConnectableTask connectableTask = new ConnectableTask(this, connectable, this.flowController, this.contextFactory, scheduleState);
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            AtomicReference futureRef = new AtomicReference();
            Runnable trigger = this.createTrigger(connectableTask, scheduleState, futureRef);
            ScheduledFuture future = this.flowEngine.scheduleWithFixedDelay(trigger, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            futureRef.set(future);
            futures.add(future);
        }
        scheduleState.setFutures(futures);
        this.logger.info("Scheduled {} to run with {} threads", (Object)connectable, (Object)connectable.getMaxConcurrentTasks());
    }

    private Runnable createTrigger(final ConnectableTask connectableTask, final LifecycleState scheduleState, final AtomicReference<ScheduledFuture<?>> futureRef) {
        final Connectable connectable = connectableTask.getConnectable();
        Runnable yieldDetectionRunnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                long now;
                long newYieldExpiration;
                InvocationResult invocationResult = connectableTask.invoke();
                if (invocationResult.isYield()) {
                    TimerDrivenSchedulingAgent.this.logger.debug("Yielding {} due to {}", (Object)connectable, (Object)invocationResult.getYieldExplanation());
                }
                if ((newYieldExpiration = connectable.getYieldExpiration()) > (now = System.currentTimeMillis())) {
                    long yieldMillis = newYieldExpiration - now;
                    long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
                    ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                    if (scheduledFuture == null) {
                        return;
                    }
                    if (scheduledFuture.cancel(false)) {
                        long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis));
                        LifecycleState lifecycleState = scheduleState;
                        synchronized (lifecycleState) {
                            if (scheduleState.isScheduled()) {
                                long schedulingNanos = connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS);
                                ScheduledFuture newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay((Runnable)this, yieldNanos, schedulingNanos, TimeUnit.NANOSECONDS);
                                scheduleState.replaceFuture(scheduledFuture, newFuture);
                                futureRef.set(newFuture);
                            }
                        }
                    }
                } else if (TimerDrivenSchedulingAgent.this.noWorkYieldNanos > 0L && invocationResult.isYield()) {
                    ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                    if (scheduledFuture == null) {
                        return;
                    }
                    if (scheduledFuture.cancel(false)) {
                        LifecycleState lifecycleState = scheduleState;
                        synchronized (lifecycleState) {
                            if (scheduleState.isScheduled()) {
                                ScheduledFuture newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay((Runnable)this, TimerDrivenSchedulingAgent.this.noWorkYieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                                scheduleState.replaceFuture(scheduledFuture, newFuture);
                                futureRef.set(newFuture);
                            }
                        }
                    }
                }
            }
        };
        return yieldDetectionRunnable;
    }

    @Override
    public void doUnschedule(Connectable connectable, LifecycleState lifecycleState) {
        for (ScheduledFuture future : lifecycleState.getFutures()) {
            future.cancel(false);
        }
        this.logger.info("Stopped scheduling {} to run", (Object)connectable);
    }

    @Override
    public void doUnschedule(ReportingTaskNode taskNode, LifecycleState lifecycleState) {
        for (ScheduledFuture future : lifecycleState.getFutures()) {
            future.cancel(false);
        }
        this.logger.info("Stopped scheduling {} to run", (Object)taskNode.getReportingTask());
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int maxThreads) {
    }
}

