/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.AbstractSchedulingAgent;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.annotation.OnStopped;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.Connectables;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDrivenSchedulingAgent
extends AbstractSchedulingAgent {
    private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
    private final ControllerServiceProvider serviceProvider;
    private final StateManagerProvider stateManagerProvider;
    private final EventDrivenWorkerQueue workerQueue;
    private final ProcessContextFactory contextFactory;
    private final AtomicInteger maxThreadCount;
    private final StringEncryptor encryptor;
    private final VariableRegistry variableRegistry;
    private volatile String adminYieldDuration = "1 sec";
    private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<Connectable, AtomicLong>();
    private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<Connectable, ScheduleState>();

    public EventDrivenSchedulingAgent(FlowEngine flowEngine, ControllerServiceProvider serviceProvider, StateManagerProvider stateManagerProvider, EventDrivenWorkerQueue workerQueue, ProcessContextFactory contextFactory, int maxThreadCount, StringEncryptor encryptor, VariableRegistry variableRegistry) {
        super(flowEngine);
        this.serviceProvider = serviceProvider;
        this.stateManagerProvider = stateManagerProvider;
        this.workerQueue = workerQueue;
        this.contextFactory = contextFactory;
        this.maxThreadCount = new AtomicInteger(maxThreadCount);
        this.encryptor = encryptor;
        this.variableRegistry = variableRegistry;
        for (int i = 0; i < maxThreadCount; ++i) {
            EventDrivenTask eventDrivenTask = new EventDrivenTask(workerQueue);
            flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000L, TimeUnit.NANOSECONDS);
        }
    }

    private StateManager getStateManager(String componentId) {
        return this.stateManagerProvider.getStateManager(componentId);
    }

    @Override
    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override
    public void doSchedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override
    public void doUnschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override
    public void doSchedule(Connectable connectable, ScheduleState scheduleState) {
        this.workerQueue.resumeWork(connectable);
        logger.info("Scheduled {} to run in Event-Driven mode", (Object)connectable);
        this.scheduleStates.put(connectable, scheduleState);
    }

    @Override
    public void doUnschedule(Connectable connectable, ScheduleState scheduleState) {
        this.workerQueue.suspendWork(connectable);
        logger.info("Stopped scheduling {} to run", (Object)connectable);
    }

    @Override
    public void onEvent(Connectable connectable) {
        this.workerQueue.offer(connectable);
    }

    @Override
    public void setMaxThreadCount(int maxThreadCount) {
        int oldMax = this.maxThreadCount.getAndSet(maxThreadCount);
        if (maxThreadCount > oldMax) {
            int tasksToAdd = maxThreadCount - oldMax;
            for (int i = 0; i < tasksToAdd; ++i) {
                EventDrivenTask eventDrivenTask = new EventDrivenTask(this.workerQueue);
                this.flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000L, TimeUnit.NANOSECONDS);
            }
        }
    }

    @Override
    public void setAdministrativeYieldDuration(String yieldDuration) {
        this.adminYieldDuration = yieldDuration;
    }

    @Override
    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    @Override
    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration((String)this.adminYieldDuration, (TimeUnit)timeUnit);
    }

    private class EventDrivenTask
    implements Runnable {
        private final EventDrivenWorkerQueue workerQueue;

        public EventDrivenTask(EventDrivenWorkerQueue workerQueue) {
            this.workerQueue = workerQueue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!EventDrivenSchedulingAgent.this.flowEngine.isShutdown()) {
                EventDrivenWorkerQueue.Worker worker = this.workerQueue.poll(1L, TimeUnit.SECONDS);
                if (worker == null) continue;
                Connectable connectable = worker.getConnectable();
                ScheduleState scheduleState = (ScheduleState)EventDrivenSchedulingAgent.this.scheduleStates.get(connectable);
                if (scheduleState == null) continue;
                AtomicLong connectionIndex = (AtomicLong)EventDrivenSchedulingAgent.this.connectionIndexMap.get(connectable);
                if (connectionIndex == null) {
                    connectionIndex = new AtomicLong(0L);
                    AtomicLong existingConnectionIndex = EventDrivenSchedulingAgent.this.connectionIndexMap.putIfAbsent(connectable, connectionIndex);
                    if (existingConnectionIndex != null) {
                        connectionIndex = existingConnectionIndex;
                    }
                }
                org.apache.nifi.controller.repository.ProcessContext context = EventDrivenSchedulingAgent.this.contextFactory.newProcessContext(connectable, connectionIndex);
                if (connectable instanceof ProcessorNode) {
                    boolean batch;
                    Object sessionFactory;
                    StandardProcessSession rawSession;
                    ProcessorNode procNode = (ProcessorNode)connectable;
                    StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, EventDrivenSchedulingAgent.this.serviceProvider, EventDrivenSchedulingAgent.this.encryptor, EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier()), EventDrivenSchedulingAgent.this.variableRegistry);
                    long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
                    if (procNode.isHighThroughputSupported() && runNanos > 0L) {
                        rawSession = new StandardProcessSession(context);
                        sessionFactory = new BatchingSessionFactory(rawSession);
                        batch = true;
                    } else {
                        rawSession = null;
                        sessionFactory = new StandardProcessSessionFactory(context);
                        batch = false;
                    }
                    long startNanos = System.nanoTime();
                    long finishNanos = startNanos + runNanos;
                    int invocationCount = 0;
                    boolean shouldRun = true;
                    try {
                        while (shouldRun) {
                            this.trigger(procNode, context, scheduleState, standardProcessContext, (ProcessSessionFactory)sessionFactory);
                            ++invocationCount;
                            if (!batch) {
                                break;
                            }
                            if (System.nanoTime() > finishNanos) {
                                break;
                            }
                            if (!scheduleState.isScheduled()) {
                                break;
                            }
                            int eventCount = worker.decrementEventCount();
                            if (eventCount < 0) {
                                worker.incrementEventCount();
                            }
                            shouldRun = eventCount > 0;
                        }
                    }
                    finally {
                        if (batch && rawSession != null) {
                            try {
                                rawSession.commit();
                            }
                            catch (RuntimeException re) {
                                logger.error("Unable to commit process session", (Throwable)re);
                            }
                        }
                        try {
                            long processingNanos = System.nanoTime() - startNanos;
                            StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
                            procEvent.setProcessingNanos(processingNanos);
                            procEvent.setInvocations(invocationCount);
                            context.getFlowFileEventRepository().updateRepository((FlowFileEvent)procEvent);
                        }
                        catch (IOException e) {
                            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", (Object)connectable, (Object)e.toString());
                            logger.error("", (Throwable)e);
                        }
                    }
                    if (!Connectables.flowFilesQueued((Connectable)procNode)) continue;
                    EventDrivenSchedulingAgent.this.onEvent((Connectable)procNode);
                    continue;
                }
                StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
                ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, EventDrivenSchedulingAgent.this.encryptor, EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier()));
                this.trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
                if (!Connectables.flowFilesQueued(connectable)) continue;
                EventDrivenSchedulingAgent.this.onEvent(connectable);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private void trigger(Connectable worker, ScheduleState scheduleState, ConnectableProcessContext processContext, ProcessSessionFactory sessionFactory) {
            block46: {
                Throwable throwable;
                int newThreadCount = scheduleState.incrementActiveThreadCount();
                if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
                    scheduleState.decrementActiveThreadCount();
                    return;
                }
                try {
                    try {
                        throwable = null;
                        try (NarCloseable ncl = NarCloseable.withNarLoader();){
                            worker.onTrigger((ProcessContext)processContext, sessionFactory);
                        }
                        catch (Throwable x2) {
                            throwable = x2;
                            throw x2;
                        }
                    }
                    catch (ProcessException pe) {
                        logger.error("{} failed to process session due to {}", (Object)worker, (Object)pe.toString());
                    }
                    catch (Throwable t) {
                        logger.error("{} failed to process session due to {}", (Object)worker, (Object)t.toString());
                        logger.error("", t);
                        logger.warn("{} Administratively Pausing for {} due to processing failure: {}", new Object[]{worker, EventDrivenSchedulingAgent.this.getAdministrativeYieldDuration(), t.toString()});
                        logger.warn("", t);
                        try {
                            Thread.sleep(FormatUtils.getTimeDuration((String)EventDrivenSchedulingAgent.this.adminYieldDuration, (TimeUnit)TimeUnit.MILLISECONDS));
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    if (scheduleState.isScheduled() || scheduleState.getActiveThreadCount() != 1 || !scheduleState.mustCallOnStoppedMethods()) break block46;
                }
                catch (Throwable throwable2) {
                    if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                        try (NarCloseable x = NarCloseable.withNarLoader();){
                            ReflectionUtils.quietlyInvokeMethodsWithAnnotations(org.apache.nifi.annotation.lifecycle.OnStopped.class, OnStopped.class, (Object)worker, processContext);
                        }
                    }
                    scheduleState.decrementActiveThreadCount();
                    throw throwable2;
                }
                throwable = null;
                try (NarCloseable x = NarCloseable.withNarLoader();){
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotations(org.apache.nifi.annotation.lifecycle.OnStopped.class, OnStopped.class, (Object)worker, processContext);
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
            }
            scheduleState.decrementActiveThreadCount();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private void trigger(ProcessorNode worker, org.apache.nifi.controller.repository.ProcessContext context, ScheduleState scheduleState, StandardProcessContext processContext, ProcessSessionFactory sessionFactory) {
            block44: {
                Throwable throwable;
                int newThreadCount = scheduleState.incrementActiveThreadCount();
                if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
                    scheduleState.decrementActiveThreadCount();
                    return;
                }
                try {
                    SimpleProcessLogger procLog;
                    try {
                        throwable = null;
                        try (NarCloseable ncl = NarCloseable.withNarLoader();){
                            worker.onTrigger((ProcessContext)processContext, sessionFactory);
                        }
                        catch (Throwable x2) {
                            throwable = x2;
                            throw x2;
                        }
                    }
                    catch (ProcessException pe) {
                        procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
                        procLog.error("Failed to process session due to {}", new Object[]{pe});
                    }
                    catch (Throwable t) {
                        procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
                        procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t});
                        procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{EventDrivenSchedulingAgent.this.adminYieldDuration});
                        logger.warn("Administratively Yielding {} due to uncaught Exception: ", (Object)worker.getProcessor());
                        logger.warn("", t);
                        worker.yield(FormatUtils.getTimeDuration((String)EventDrivenSchedulingAgent.this.adminYieldDuration, (TimeUnit)TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                    }
                    if (scheduleState.isScheduled() || scheduleState.getActiveThreadCount() != 1 || !scheduleState.mustCallOnStoppedMethods()) break block44;
                }
                catch (Throwable throwable2) {
                    if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                        try (NarCloseable x = NarCloseable.withNarLoader();){
                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(org.apache.nifi.annotation.lifecycle.OnStopped.class, (Object)worker.getProcessor(), processContext);
                        }
                    }
                    scheduleState.decrementActiveThreadCount();
                    throw throwable2;
                }
                throwable = null;
                try (NarCloseable x = NarCloseable.withNarLoader();){
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(org.apache.nifi.annotation.lifecycle.OnStopped.class, (Object)worker.getProcessor(), processContext);
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
            }
            scheduleState.decrementActiveThreadCount();
        }
    }
}

