/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StandardProcessScheduler
implements ProcessScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class);
    private final ControllerServiceProvider controllerServiceProvider;
    private final Heartbeater heartbeater;
    private final long administrativeYieldMillis;
    private final String administrativeYieldDuration;
    private final StateManagerProvider stateManagerProvider;
    private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<Object, ScheduleState>();
    private final ScheduledExecutorService frameworkTaskExecutor;
    private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<SchedulingStrategy, SchedulingAgent>();
    private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
    private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
    private final StringEncryptor encryptor;
    private final VariableRegistry variableRegistry;

    public StandardProcessScheduler(Heartbeater heartbeater, ControllerServiceProvider controllerServiceProvider, StringEncryptor encryptor, StateManagerProvider stateManagerProvider, VariableRegistry variableRegistry) {
        this.heartbeater = heartbeater;
        this.controllerServiceProvider = controllerServiceProvider;
        this.encryptor = encryptor;
        this.stateManagerProvider = stateManagerProvider;
        this.variableRegistry = variableRegistry;
        this.administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration();
        this.administrativeYieldMillis = FormatUtils.getTimeDuration((String)this.administrativeYieldDuration, (TimeUnit)TimeUnit.MILLISECONDS);
        this.frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
    }

    private StateManager getStateManager(String componentId) {
        return this.stateManagerProvider.getStateManager(componentId);
    }

    public void scheduleFrameworkTask(final Runnable command, final String taskName, long initialDelay, long delay, TimeUnit timeUnit) {
        this.frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                block2: {
                    try {
                        command.run();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to run Framework Task {} due to {}", (Object)taskName, (Object)t.toString());
                        if (!LOG.isDebugEnabled()) break block2;
                        LOG.error("", t);
                    }
                }
            }
        }, initialDelay, delay, timeUnit);
    }

    public void submitFrameworkTask(Runnable task) {
        this.frameworkTaskExecutor.submit(task);
    }

    public void setMaxThreadCount(SchedulingStrategy schedulingStrategy, int maxThreadCount) {
        SchedulingAgent agent = this.getSchedulingAgent(schedulingStrategy);
        if (agent == null) {
            return;
        }
        agent.setMaxThreadCount(maxThreadCount);
    }

    public void setSchedulingAgent(SchedulingStrategy strategy, SchedulingAgent agent) {
        this.strategyAgentMap.put(strategy, agent);
    }

    public SchedulingAgent getSchedulingAgent(SchedulingStrategy strategy) {
        return (SchedulingAgent)this.strategyAgentMap.get(strategy);
    }

    private SchedulingAgent getSchedulingAgent(Connectable connectable) {
        return this.getSchedulingAgent(connectable.getSchedulingStrategy());
    }

    public void shutdown() {
        for (SchedulingAgent schedulingAgent : this.strategyAgentMap.values()) {
            try {
                schedulingAgent.shutdown();
            }
            catch (Throwable t) {
                LOG.error("Failed to shutdown Scheduling Agent {} due to {}", (Object)schedulingAgent, (Object)t.toString());
                LOG.error("", t);
            }
        }
        this.frameworkTaskExecutor.shutdown();
        this.componentLifeCycleThreadPool.shutdown();
        this.componentMonitoringThreadPool.shutdown();
    }

    public void schedule(final ReportingTaskNode taskNode) {
        final ScheduleState scheduleState = this.getScheduleState(Objects.requireNonNull(taskNode));
        if (scheduleState.isScheduled()) {
            return;
        }
        int activeThreadCount = scheduleState.getActiveThreadCount();
        if (activeThreadCount > 0) {
            throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
        }
        if (!taskNode.isValid()) {
            throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
        }
        final SchedulingAgent agent = this.getSchedulingAgent(taskNode.getSchedulingStrategy());
        scheduleState.setScheduled(true);
        Runnable startReportingTaskRunnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                long lastStopTime = scheduleState.getLastStopTime();
                ReportingTask reportingTask = taskNode.getReportingTask();
                while (true) {
                    try {
                        ScheduleState scheduleState2 = scheduleState;
                        synchronized (scheduleState2) {
                            if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
                                return;
                            }
                            try (NarCloseable x = NarCloseable.withNarLoader();){
                                ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                            }
                            agent.schedule(taskNode, scheduleState);
                            return;
                        }
                    }
                    catch (Exception e) {
                        Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
                        SimpleProcessLogger componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", (Object)new Object[]{reportingTask, e.toString(), StandardProcessScheduler.this.administrativeYieldDuration}, (Object)e);
                        try {
                            Thread.sleep(StandardProcessScheduler.this.administrativeYieldMillis);
                        }
                        catch (InterruptedException interruptedException) {
                        }
                        continue;
                    }
                    break;
                }
            }
        };
        this.componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
        taskNode.setScheduledState(ScheduledState.RUNNING);
    }

    public void unschedule(final ReportingTaskNode taskNode) {
        final ScheduleState scheduleState = this.getScheduleState(Objects.requireNonNull(taskNode));
        if (!scheduleState.isScheduled()) {
            return;
        }
        taskNode.verifyCanStop();
        final SchedulingAgent agent = this.getSchedulingAgent(taskNode.getSchedulingStrategy());
        final ReportingTask reportingTask = taskNode.getReportingTask();
        taskNode.setScheduledState(ScheduledState.STOPPED);
        Runnable unscheduleReportingTaskRunnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ConfigurationContext configurationContext = taskNode.getConfigurationContext();
                ScheduleState scheduleState2 = scheduleState;
                synchronized (scheduleState2) {
                    scheduleState.setScheduled(false);
                    try (NarCloseable x = NarCloseable.withNarLoader();){
                        ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
                    }
                    catch (Exception e) {
                        Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
                        SimpleProcessLogger componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
                        componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
                        LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, cause.toString(), StandardProcessScheduler.this.administrativeYieldDuration});
                        LOG.error("", cause);
                        try {
                            Thread.sleep(StandardProcessScheduler.this.administrativeYieldMillis);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    agent.unschedule(taskNode, scheduleState);
                    if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)reportingTask, configurationContext);
                    }
                }
            }
        };
        this.componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable);
    }

    public synchronized void startProcessor(final ProcessorNode procNode) {
        StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, this.encryptor, this.getStateManager(procNode.getIdentifier()), this.variableRegistry);
        final ScheduleState scheduleState = this.getScheduleState(Objects.requireNonNull(procNode));
        SchedulingAgentCallback callback = new SchedulingAgentCallback(){

            public void trigger() {
                StandardProcessScheduler.this.getSchedulingAgent((Connectable)procNode).schedule((Connectable)procNode, scheduleState);
                StandardProcessScheduler.this.heartbeater.heartbeat();
            }

            public Future<?> invokeMonitoringTask(Callable<?> task) {
                scheduleState.incrementActiveThreadCount();
                return StandardProcessScheduler.this.componentMonitoringThreadPool.submit(task);
            }

            public void postMonitor() {
                scheduleState.decrementActiveThreadCount();
            }
        };
        procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, (ProcessContext)processContext, callback);
    }

    public synchronized void stopProcessor(final ProcessorNode procNode) {
        StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, this.encryptor, this.getStateManager(procNode.getIdentifier()), this.variableRegistry);
        final ScheduleState state = this.getScheduleState(procNode);
        procNode.stop(this.componentLifeCycleThreadPool, (ProcessContext)processContext, (Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() {
                if (state.isScheduled()) {
                    StandardProcessScheduler.this.getSchedulingAgent((Connectable)procNode).unschedule((Connectable)procNode, state);
                }
                return state.getActiveThreadCount() == 0;
            }
        });
    }

    public void yield(ProcessorNode procNode) {
    }

    public void registerEvent(Connectable worker) {
        this.getSchedulingAgent(worker).onEvent(worker);
    }

    public int getActiveThreadCount(Object scheduled) {
        return this.getScheduleState(scheduled).getActiveThreadCount();
    }

    public void startPort(Port port) {
        if (!port.isValid()) {
            throw new IllegalStateException("Port " + port.getName() + " is not in a valid state");
        }
        port.onSchedulingStart();
        this.startConnectable((Connectable)port);
    }

    public void startFunnel(Funnel funnel) {
        this.startConnectable((Connectable)funnel);
        funnel.setScheduledState(ScheduledState.RUNNING);
    }

    public void stopPort(Port port) {
        this.stopConnectable((Connectable)port);
        port.shutdown();
    }

    public void stopFunnel(Funnel funnel) {
        this.stopConnectable((Connectable)funnel);
        funnel.setScheduledState(ScheduledState.STOPPED);
    }

    private synchronized void startConnectable(Connectable connectable) {
        if (connectable.getScheduledState() == ScheduledState.DISABLED) {
            throw new IllegalStateException(connectable + " is disabled, so it cannot be started");
        }
        ScheduleState scheduleState = this.getScheduleState(Objects.requireNonNull(connectable));
        if (scheduleState.isScheduled()) {
            return;
        }
        int activeThreads = scheduleState.getActiveThreadCount();
        if (activeThreads > 0) {
            throw new IllegalStateException("Port cannot be scheduled to run until its last " + activeThreads + " threads finish");
        }
        this.getSchedulingAgent(connectable).schedule(connectable, scheduleState);
        scheduleState.setScheduled(true);
    }

    private synchronized void stopConnectable(Connectable connectable) {
        ScheduleState state = this.getScheduleState(Objects.requireNonNull(connectable));
        if (!state.isScheduled()) {
            return;
        }
        state.setScheduled(false);
        this.getSchedulingAgent(connectable).unschedule(connectable, state);
        if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
            ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, this.encryptor, this.getStateManager(connectable.getIdentifier()));
            try (NarCloseable x = NarCloseable.withNarLoader();){
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)connectable, processContext);
                this.heartbeater.heartbeat();
            }
        }
    }

    public synchronized void enableFunnel(Funnel funnel) {
        if (funnel.getScheduledState() != ScheduledState.DISABLED) {
            throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
        }
        funnel.setScheduledState(ScheduledState.STOPPED);
    }

    public synchronized void disableFunnel(Funnel funnel) {
        if (funnel.getScheduledState() != ScheduledState.STOPPED) {
            throw new IllegalStateException("Funnel cannot be disabled because its state its state is set to " + funnel.getScheduledState());
        }
        funnel.setScheduledState(ScheduledState.DISABLED);
    }

    public synchronized void disablePort(Port port) {
        if (port.getScheduledState() != ScheduledState.STOPPED) {
            throw new IllegalStateException("Port cannot be disabled because its state is set to " + port.getScheduledState());
        }
        if (!(port instanceof AbstractPort)) {
            throw new IllegalArgumentException();
        }
        ((AbstractPort)port).disable();
    }

    public synchronized void enablePort(Port port) {
        if (port.getScheduledState() != ScheduledState.DISABLED) {
            throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
        }
        if (!(port instanceof AbstractPort)) {
            throw new IllegalArgumentException();
        }
        ((AbstractPort)port).enable();
    }

    public synchronized void enableProcessor(ProcessorNode procNode) {
        procNode.enable();
    }

    public synchronized void disableProcessor(ProcessorNode procNode) {
        procNode.disable();
    }

    public synchronized void enableReportingTask(ReportingTaskNode taskNode) {
        if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
            throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
        }
        taskNode.setScheduledState(ScheduledState.STOPPED);
    }

    public synchronized void disableReportingTask(ReportingTaskNode taskNode) {
        if (taskNode.getScheduledState() != ScheduledState.STOPPED) {
            throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
        }
        taskNode.setScheduledState(ScheduledState.DISABLED);
    }

    public boolean isScheduled(Object scheduled) {
        ScheduleState scheduleState = (ScheduleState)this.scheduleStates.get(scheduled);
        return scheduleState == null ? false : scheduleState.isScheduled();
    }

    private ScheduleState getScheduleState(Object schedulable) {
        ScheduleState scheduleState = (ScheduleState)this.scheduleStates.get(schedulable);
        if (scheduleState == null) {
            scheduleState = new ScheduleState();
            this.scheduleStates.putIfAbsent(schedulable, scheduleState);
        }
        return scheduleState;
    }

    public void enableControllerService(ControllerServiceNode service) {
        service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
    }

    public void disableControllerService(ControllerServiceNode service) {
        service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
    }

    public void disableControllerServices(List<ControllerServiceNode> services) {
        if (!Objects.requireNonNull(services).isEmpty()) {
            for (ControllerServiceNode controllerServiceNode : services) {
                this.disableControllerService(controllerServiceNode);
            }
        }
    }
}

