/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDrivenSchedulingAgent
extends AbstractSchedulingAgent {
    private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
    private final ControllerServiceProvider serviceProvider;
    private final StateManagerProvider stateManagerProvider;
    private final EventDrivenWorkerQueue workerQueue;
    private final RepositoryContextFactory contextFactory;
    private final AtomicInteger maxThreadCount;
    private final AtomicInteger activeThreadCount = new AtomicInteger(0);
    private final PropertyEncryptor encryptor;
    private final ExtensionManager extensionManager;
    private final NodeTypeProvider nodeTypeProvider;
    private volatile String adminYieldDuration = "1 sec";
    private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<Connectable, AtomicLong>();
    private final ConcurrentMap<Connectable, LifecycleState> scheduleStates = new ConcurrentHashMap<Connectable, LifecycleState>();

    public EventDrivenSchedulingAgent(FlowEngine flowEngine, ControllerServiceProvider serviceProvider, StateManagerProvider stateManagerProvider, EventDrivenWorkerQueue workerQueue, RepositoryContextFactory contextFactory, int maxThreadCount, PropertyEncryptor encryptor, ExtensionManager extensionManager, NodeTypeProvider nodeTypeProvider) {
        super(flowEngine);
        this.serviceProvider = serviceProvider;
        this.stateManagerProvider = stateManagerProvider;
        this.workerQueue = workerQueue;
        this.contextFactory = contextFactory;
        this.maxThreadCount = new AtomicInteger(maxThreadCount);
        this.encryptor = encryptor;
        this.extensionManager = extensionManager;
        this.nodeTypeProvider = nodeTypeProvider;
        for (int i = 0; i < maxThreadCount; ++i) {
            EventDrivenTask eventDrivenTask = new EventDrivenTask(workerQueue, this.activeThreadCount);
            flowEngine.scheduleWithFixedDelay((Runnable)eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
        }
    }

    public int getActiveThreadCount() {
        return this.activeThreadCount.get();
    }

    private StateManager getStateManager(String componentId) {
        return this.stateManagerProvider.getStateManager(componentId);
    }

    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override
    public void doUnschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override
    public void doSchedule(Connectable connectable, LifecycleState scheduleState) {
        this.workerQueue.resumeWork(connectable);
        logger.info("Scheduled {} to run in Event-Driven mode", (Object)connectable);
        this.scheduleStates.put(connectable, scheduleState);
    }

    @Override
    protected void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void doUnschedule(Connectable connectable, LifecycleState scheduleState) {
        this.workerQueue.suspendWork(connectable);
        logger.info("Stopped scheduling {} to run", (Object)connectable);
    }

    public void onEvent(Connectable connectable) {
        this.workerQueue.offer(connectable);
    }

    public void setMaxThreadCount(int maxThreadCount) {
        int oldMax = this.maxThreadCount.getAndSet(maxThreadCount);
        if (maxThreadCount > oldMax) {
            int tasksToAdd = maxThreadCount - oldMax;
            for (int i = 0; i < tasksToAdd; ++i) {
                EventDrivenTask eventDrivenTask = new EventDrivenTask(this.workerQueue, this.activeThreadCount);
                this.flowEngine.scheduleWithFixedDelay((Runnable)eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
            }
        }
    }

    public void incrementMaxThreadCount(int toAdd) {
    }

    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }

    private class EventDrivenTask
    implements Runnable {
        private final EventDrivenWorkerQueue workerQueue;
        private final AtomicInteger activeThreadCount;

        public EventDrivenTask(EventDrivenWorkerQueue workerQueue, AtomicInteger activeThreadCount) {
            this.workerQueue = workerQueue;
            this.activeThreadCount = activeThreadCount;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!EventDrivenSchedulingAgent.this.flowEngine.isShutdown()) {
                EventDrivenWorkerQueue.Worker worker = this.workerQueue.poll(1L, TimeUnit.SECONDS);
                if (worker == null) continue;
                Connectable connectable = worker.getConnectable();
                LifecycleState scheduleState = (LifecycleState)EventDrivenSchedulingAgent.this.scheduleStates.get(connectable);
                if (scheduleState == null) continue;
                this.activeThreadCount.incrementAndGet();
                try {
                    AtomicLong connectionIndex = (AtomicLong)EventDrivenSchedulingAgent.this.connectionIndexMap.get(connectable);
                    if (connectionIndex == null) {
                        connectionIndex = new AtomicLong(0L);
                        AtomicLong existingConnectionIndex = EventDrivenSchedulingAgent.this.connectionIndexMap.putIfAbsent(connectable, connectionIndex);
                        if (existingConnectionIndex != null) {
                            connectionIndex = existingConnectionIndex;
                        }
                    }
                    RepositoryContext context = EventDrivenSchedulingAgent.this.contextFactory.newProcessContext(connectable, connectionIndex);
                    if (connectable instanceof ProcessorNode) {
                        boolean batch;
                        Object sessionFactory;
                        StandardProcessSession rawSession;
                        ProcessorNode procNode = (ProcessorNode)connectable;
                        TaskTerminationAwareStateManager stateManager = new TaskTerminationAwareStateManager(EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier()), () -> ((LifecycleState)scheduleState).isTerminated());
                        StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, EventDrivenSchedulingAgent.this.serviceProvider, EventDrivenSchedulingAgent.this.encryptor, (StateManager)stateManager, () -> ((LifecycleState)scheduleState).isTerminated(), EventDrivenSchedulingAgent.this.nodeTypeProvider);
                        long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
                        if (procNode.isSessionBatchingSupported() && runNanos > 0L) {
                            rawSession = new StandardProcessSession(context, () -> ((LifecycleState)scheduleState).isTerminated());
                            sessionFactory = new BatchingSessionFactory(rawSession);
                            batch = true;
                        } else {
                            rawSession = null;
                            sessionFactory = new StandardProcessSessionFactory(context, () -> ((LifecycleState)scheduleState).isTerminated());
                            batch = false;
                        }
                        WeakHashMapProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory((ProcessSessionFactory)sessionFactory);
                        long startNanos = System.nanoTime();
                        long finishNanos = startNanos + runNanos;
                        int invocationCount = 0;
                        boolean shouldRun = true;
                        try {
                            while (shouldRun) {
                                this.trigger(procNode, context, scheduleState, standardProcessContext, activeSessionFactory);
                                ++invocationCount;
                                if (!batch) {
                                    break;
                                }
                                if (System.nanoTime() > finishNanos) {
                                    break;
                                }
                                if (!scheduleState.isScheduled()) {
                                    break;
                                }
                                int eventCount = worker.decrementEventCount();
                                if (eventCount < 0) {
                                    worker.incrementEventCount();
                                }
                                shouldRun = eventCount > 0;
                            }
                        }
                        finally {
                            if (batch && rawSession != null) {
                                try {
                                    rawSession.commitAsync();
                                }
                                catch (RuntimeException re) {
                                    logger.error("Unable to commit process session", (Throwable)re);
                                }
                            }
                            try {
                                long processingNanos = System.nanoTime() - startNanos;
                                StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
                                procEvent.setProcessingNanos(processingNanos);
                                procEvent.setInvocations(invocationCount);
                                context.getFlowFileEventRepository().updateRepository((FlowFileEvent)procEvent, connectable.getIdentifier());
                            }
                            catch (IOException e) {
                                logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", (Object)connectable, (Object)e.toString());
                                logger.error("", (Throwable)e);
                            }
                        }
                        if (!Connectables.flowFilesQueued((Connectable)procNode)) continue;
                        EventDrivenSchedulingAgent.this.onEvent((Connectable)procNode);
                        continue;
                    }
                    StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, () -> ((LifecycleState)scheduleState).isTerminated());
                    WeakHashMapProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory((ProcessSessionFactory)sessionFactory);
                    ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, EventDrivenSchedulingAgent.this.encryptor, EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier()));
                    this.trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory);
                    if (!Connectables.flowFilesQueued((Connectable)connectable)) continue;
                    EventDrivenSchedulingAgent.this.onEvent(connectable);
                }
                finally {
                    this.activeThreadCount.decrementAndGet();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void trigger(Connectable worker, LifecycleState scheduleState, ConnectableProcessContext processContext, ActiveProcessSessionFactory sessionFactory) {
            Throwable throwable;
            int newThreadCount = scheduleState.incrementActiveThreadCount(sessionFactory);
            if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
                scheduleState.decrementActiveThreadCount();
                return;
            }
            try {
                try {
                    throwable = null;
                    try (NarCloseable ncl = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getClass(), (String)worker.getIdentifier());){
                        worker.onTrigger((ProcessContext)processContext, (ProcessSessionFactory)sessionFactory);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                }
                catch (ProcessException pe) {
                    logger.error("{} failed to process session due to {}", (Object)worker, (Object)pe.toString());
                }
                catch (Throwable t) {
                    logger.error("{} failed to process session due to {}", (Object)worker, (Object)t.toString());
                    logger.error("", t);
                    logger.warn("{} Administratively Pausing for {} due to processing failure: {}", new Object[]{worker, EventDrivenSchedulingAgent.this.getAdministrativeYieldDuration(), t.toString()});
                    logger.warn("", t);
                    try {
                        Thread.sleep(FormatUtils.getTimeDuration((String)EventDrivenSchedulingAgent.this.adminYieldDuration, (TimeUnit)TimeUnit.MILLISECONDS));
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }
            catch (Throwable throwable3) {
                if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                    try (NarCloseable x = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getClass(), (String)worker.getIdentifier());){
                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)worker, (Object[])new Object[]{processContext});
                    }
                }
                scheduleState.decrementActiveThreadCount();
                throw throwable3;
            }
            if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                throwable = null;
                try (NarCloseable x = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getClass(), (String)worker.getIdentifier());){
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)worker, (Object[])new Object[]{processContext});
                }
                catch (Throwable throwable4) {
                    throwable = throwable4;
                    throw throwable4;
                }
            }
            scheduleState.decrementActiveThreadCount();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void trigger(ProcessorNode worker, RepositoryContext context, LifecycleState scheduleState, StandardProcessContext processContext, ActiveProcessSessionFactory sessionFactory) {
            Throwable throwable;
            int newThreadCount = scheduleState.incrementActiveThreadCount(sessionFactory);
            if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
                scheduleState.decrementActiveThreadCount();
                return;
            }
            try {
                SimpleProcessLogger procLog;
                try {
                    throwable = null;
                    try (NarCloseable ncl = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getProcessor().getClass(), (String)worker.getIdentifier());){
                        worker.onTrigger((ProcessContext)processContext, (ProcessSessionFactory)sessionFactory);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                }
                catch (ProcessException pe) {
                    procLog = new SimpleProcessLogger(worker.getIdentifier(), (Object)worker.getProcessor());
                    procLog.error("Failed to process session due to {}", new Object[]{pe});
                }
                catch (Throwable t) {
                    procLog = new SimpleProcessLogger(worker.getIdentifier(), (Object)worker.getProcessor());
                    procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t});
                    procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{EventDrivenSchedulingAgent.this.adminYieldDuration});
                    logger.warn("Administratively Yielding {} due to uncaught Exception: ", (Object)worker.getProcessor());
                    logger.warn("", t);
                    worker.yield(FormatUtils.getTimeDuration((String)EventDrivenSchedulingAgent.this.adminYieldDuration, (TimeUnit)TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                }
            }
            catch (Throwable throwable3) {
                if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                    try (NarCloseable x = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getProcessor().getClass(), (String)worker.getIdentifier());){
                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)worker.getProcessor(), (Object[])new Object[]{processContext});
                    }
                }
                scheduleState.decrementActiveThreadCount();
                throw throwable3;
            }
            if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                throwable = null;
                try (NarCloseable x = NarCloseable.withComponentNarLoader((ExtensionManager)EventDrivenSchedulingAgent.this.extensionManager, worker.getProcessor().getClass(), (String)worker.getIdentifier());){
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, (Object)worker.getProcessor(), (Object[])new Object[]{processContext});
                }
                catch (Throwable throwable4) {
                    throwable = throwable4;
                    throw throwable4;
                }
            }
            scheduleState.decrementActiveThreadCount();
        }
    }
}

