/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.InvocationResult;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimerDrivenSchedulingAgent
extends AbstractSchedulingAgent {
    private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
    private final long noWorkYieldNanos;
    private final FlowController flowController;
    private final RepositoryContextFactory contextFactory;
    private final StringEncryptor encryptor;
    private volatile String adminYieldDuration = "1 sec";

    public TimerDrivenSchedulingAgent(FlowController flowController, FlowEngine flowEngine, RepositoryContextFactory contextFactory, StringEncryptor encryptor, NiFiProperties nifiProperties) {
        super(flowEngine);
        this.flowController = flowController;
        this.contextFactory = contextFactory;
        this.encryptor = encryptor;
        String boredYieldDuration = nifiProperties.getBoredYieldDuration();
        try {
            this.noWorkYieldNanos = FormatUtils.getTimeDuration((String)boredYieldDuration, (TimeUnit)TimeUnit.NANOSECONDS);
        }
        catch (IllegalArgumentException e) {
            throw new RuntimeException("Failed to create SchedulingAgent because the nifi.bored.yield.duration property is set to an invalid time duration: " + boredYieldDuration);
        }
    }

    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        ReportingTaskWrapper reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
        long schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS);
        ScheduledFuture<?> future = this.flowEngine.scheduleWithFixedDelay(reportingTaskWrapper, 0L, schedulingNanos, TimeUnit.NANOSECONDS);
        ArrayList futures = new ArrayList(1);
        futures.add(future);
        scheduleState.setFutures(futures);
        logger.info("{} started.", (Object)taskNode.getReportingTask());
    }

    @Override
    public void doSchedule(Connectable connectable, LifecycleState scheduleState) {
        ArrayList futures = new ArrayList();
        ConnectableTask connectableTask = new ConnectableTask(this, connectable, this.flowController, this.contextFactory, scheduleState, this.encryptor);
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            AtomicReference futureRef = new AtomicReference();
            Runnable trigger = this.createTrigger(connectableTask, scheduleState, futureRef);
            ScheduledFuture<?> future = this.flowEngine.scheduleWithFixedDelay(trigger, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            futureRef.set(future);
            futures.add(future);
        }
        scheduleState.setFutures(futures);
        logger.info("Scheduled {} to run with {} threads", (Object)connectable, (Object)connectable.getMaxConcurrentTasks());
    }

    private Runnable createTrigger(final ConnectableTask connectableTask, final LifecycleState scheduleState, final AtomicReference<ScheduledFuture<?>> futureRef) {
        final Connectable connectable = connectableTask.getConnectable();
        Runnable yieldDetectionRunnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                long now;
                long newYieldExpiration;
                InvocationResult invocationResult = connectableTask.invoke();
                if (invocationResult.isYield()) {
                    logger.debug("Yielding {} due to {}", (Object)connectable, (Object)invocationResult.getYieldExplanation());
                }
                if ((newYieldExpiration = connectable.getYieldExpiration()) > (now = System.currentTimeMillis())) {
                    long yieldMillis = newYieldExpiration - now;
                    long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
                    ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                    if (scheduledFuture == null) {
                        return;
                    }
                    if (scheduledFuture.cancel(false)) {
                        long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis));
                        LifecycleState lifecycleState = scheduleState;
                        synchronized (lifecycleState) {
                            if (scheduleState.isScheduled()) {
                                long schedulingNanos = connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS);
                                ScheduledFuture<?> newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay(this, yieldNanos, schedulingNanos, TimeUnit.NANOSECONDS);
                                scheduleState.replaceFuture(scheduledFuture, newFuture);
                                futureRef.set(newFuture);
                            }
                        }
                    }
                } else if (TimerDrivenSchedulingAgent.this.noWorkYieldNanos > 0L && invocationResult.isYield()) {
                    ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                    if (scheduledFuture == null) {
                        return;
                    }
                    if (scheduledFuture.cancel(false)) {
                        LifecycleState lifecycleState = scheduleState;
                        synchronized (lifecycleState) {
                            if (scheduleState.isScheduled()) {
                                ScheduledFuture<?> newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay(this, TimerDrivenSchedulingAgent.this.noWorkYieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                                scheduleState.replaceFuture(scheduledFuture, newFuture);
                                futureRef.set(newFuture);
                            }
                        }
                    }
                }
            }
        };
        return yieldDetectionRunnable;
    }

    @Override
    public void doUnschedule(Connectable connectable, LifecycleState scheduleState) {
        for (ScheduledFuture future : scheduleState.getFutures()) {
            future.cancel(false);
        }
        logger.info("Stopped scheduling {} to run", (Object)connectable);
    }

    @Override
    public void doUnschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        for (ScheduledFuture future : scheduleState.getFutures()) {
            future.cancel(false);
        }
        logger.info("Stopped scheduling {} to run", (Object)taskNode.getReportingTask());
    }

    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int maxThreads) {
    }

    public void incrementMaxThreadCount(int toAdd) {
        int corePoolSize = this.flowEngine.getCorePoolSize();
        if (toAdd < 0 && corePoolSize + toAdd < 1) {
            throw new IllegalStateException("Cannot remove " + -toAdd + " threads from pool because there are only " + corePoolSize + " threads in the pool");
        }
        this.flowEngine.setCorePoolSize(corePoolSize + toAdd);
    }
}

