/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.tasks.ContinuallyRunConnectableTask;
import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimerDrivenSchedulingAgent
extends AbstractSchedulingAgent {
    private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
    private final long noWorkYieldNanos;
    private final FlowController flowController;
    private final ProcessContextFactory contextFactory;
    private final StringEncryptor encryptor;
    private final VariableRegistry variableRegistry;
    private volatile String adminYieldDuration = "1 sec";

    public TimerDrivenSchedulingAgent(FlowController flowController, FlowEngine flowEngine, ProcessContextFactory contextFactory, StringEncryptor encryptor, VariableRegistry variableRegistry, NiFiProperties nifiProperties) {
        super(flowEngine);
        this.flowController = flowController;
        this.contextFactory = contextFactory;
        this.encryptor = encryptor;
        this.variableRegistry = variableRegistry;
        String boredYieldDuration = nifiProperties.getBoredYieldDuration();
        try {
            this.noWorkYieldNanos = FormatUtils.getTimeDuration((String)boredYieldDuration, (TimeUnit)TimeUnit.NANOSECONDS);
        }
        catch (IllegalArgumentException e) {
            throw new RuntimeException("Failed to create SchedulingAgent because the nifi.bored.yield.duration property is set to an invalid time duration: " + boredYieldDuration);
        }
    }

    private StateManager getStateManager(String componentId) {
        return this.flowController.getStateManagerProvider().getStateManager(componentId);
    }

    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
        ReportingTaskWrapper reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
        long schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS);
        ScheduledFuture<?> future = this.flowEngine.scheduleWithFixedDelay(reportingTaskWrapper, 0L, schedulingNanos, TimeUnit.NANOSECONDS);
        ArrayList futures = new ArrayList(1);
        futures.add(future);
        scheduleState.setFutures(futures);
        logger.info("{} started.", (Object)taskNode.getReportingTask());
    }

    @Override
    public void doSchedule(final Connectable connectable, final ScheduleState scheduleState) {
        ArrayList futures = new ArrayList();
        for (int i = 0; i < connectable.getMaxConcurrentTasks(); ++i) {
            Object processContext;
            Callable<Boolean> continuallyRunTask;
            if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                ProcessorNode procNode = (ProcessorNode)connectable;
                StandardProcessContext standardProcContext = new StandardProcessContext(procNode, this.flowController, this.encryptor, this.getStateManager(connectable.getIdentifier()), this.variableRegistry);
                ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, this.flowController, this.contextFactory, scheduleState, standardProcContext);
                continuallyRunTask = runnableTask;
                processContext = standardProcContext;
            } else {
                processContext = new ConnectableProcessContext(connectable, this.encryptor, this.getStateManager(connectable.getIdentifier()));
                continuallyRunTask = new ContinuallyRunConnectableTask(this.contextFactory, connectable, scheduleState, (ProcessContext)processContext);
            }
            final AtomicReference futureRef = new AtomicReference();
            Runnable yieldDetectionRunnable = new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    boolean shouldYield;
                    try {
                        shouldYield = (Boolean)continuallyRunTask.call();
                    }
                    catch (RuntimeException re) {
                        throw re;
                    }
                    catch (Exception e) {
                        throw new ProcessException((Throwable)e);
                    }
                    long newYieldExpiration = connectable.getYieldExpiration();
                    long now = System.currentTimeMillis();
                    if (newYieldExpiration > now) {
                        long yieldMillis = newYieldExpiration - now;
                        long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
                        ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                        if (scheduledFuture == null) {
                            return;
                        }
                        if (scheduledFuture.cancel(false)) {
                            long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis));
                            ScheduleState scheduleState2 = scheduleState;
                            synchronized (scheduleState2) {
                                if (scheduleState.isScheduled()) {
                                    ScheduledFuture<?> newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay(this, yieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                                    scheduleState.replaceFuture(scheduledFuture, newFuture);
                                    futureRef.set(newFuture);
                                }
                            }
                        }
                    } else if (TimerDrivenSchedulingAgent.this.noWorkYieldNanos > 0L && shouldYield) {
                        ScheduledFuture scheduledFuture = (ScheduledFuture)futureRef.get();
                        if (scheduledFuture == null) {
                            return;
                        }
                        if (scheduledFuture.cancel(false)) {
                            ScheduleState scheduleState3 = scheduleState;
                            synchronized (scheduleState3) {
                                if (scheduleState.isScheduled()) {
                                    ScheduledFuture<?> newFuture = TimerDrivenSchedulingAgent.this.flowEngine.scheduleWithFixedDelay(this, TimerDrivenSchedulingAgent.this.noWorkYieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                                    scheduleState.replaceFuture(scheduledFuture, newFuture);
                                    futureRef.set(newFuture);
                                }
                            }
                        }
                    }
                }
            };
            ScheduledFuture<?> future = this.flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            futureRef.set(future);
            futures.add(future);
        }
        scheduleState.setFutures(futures);
        logger.info("Scheduled {} to run with {} threads", (Object)connectable, (Object)connectable.getMaxConcurrentTasks());
    }

    @Override
    public void doUnschedule(Connectable connectable, ScheduleState scheduleState) {
        for (ScheduledFuture future : scheduleState.getFutures()) {
            future.cancel(false);
        }
        logger.info("Stopped scheduling {} to run", (Object)connectable);
    }

    @Override
    public void doUnschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
        for (ScheduledFuture future : scheduleState.getFutures()) {
            future.cancel(false);
        }
        logger.info("Stopped scheduling {} to run", (Object)taskNode.getReportingTask());
    }

    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int maxThreads) {
    }
}

