/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.components.ClassLoaderAwarePythonBridge;
import org.apache.nifi.components.monitor.LongRunningTaskMonitor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.TriggerValidationTask;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.ClusterCoordinatorNodeInformant;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.GarbageCollectionLog;
import org.apache.nifi.controller.MissingBundleException;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ParameterProviderNode;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.RingBufferGarbageCollectionLog;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SnippetManager;
import org.apache.nifi.controller.StandardReloadComponent;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.ContentRepositoryFlowFileAccess;
import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
import org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardContentRepositoryContext;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardQueueProvider;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.CronSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.service.ControllerServiceApiLookup;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceResolver;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceApiLookup;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceResolver;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.StorageStatus;
import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsModelMapFactory;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StandardGarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.tasks.ExpireFlowFiles;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flowanalysis.StandardFlowAnalyzer;
import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.StatelessGroupScheduledState;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.provenance.ComponentIdentifierLookup;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.DisabledPythonBridge;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
import org.apache.nifi.remote.SocketRemoteSiteListener;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.reporting.StandardEventAccess;
import org.apache.nifi.reporting.UserAwareEventAccess;
import org.apache.nifi.repository.encryption.configuration.EncryptionProtocol;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.validation.RuleViolationsManager;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowController
implements ReportingTaskProvider,
FlowAnalysisRuleProvider,
Authorizable,
NodeTypeProvider {
    private static final String STANDARD_PYTHON_BRIDGE_IMPLEMENTATION_CLASS = "org.apache.nifi.py4j.StandardPythonBridge";
    public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
    public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
    public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
    public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
    private static final String ENCRYPTED_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository";
    private static final String ENCRYPTED_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
    private static final String ENCRYPTED_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.EncryptedFileSystemSwapManager";
    public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
    public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10L;
    private final AtomicInteger maxTimerDrivenThreads;
    private final AtomicReference<FlowEngine> timerDrivenEngineRef;
    private final ContentRepository contentRepository;
    private final FlowFileRepository flowFileRepository;
    private final FlowFileEventRepository flowFileEventRepository;
    private final ProvenanceRepository provenanceRepository;
    private final BulletinRepository bulletinRepository;
    private final LifecycleStateManager lifecycleStateManager;
    private final StandardProcessScheduler processScheduler;
    private final SnippetManager snippetManager;
    private final long gracefulShutdownSeconds;
    private final ExtensionDiscoveringManager extensionManager;
    private final NiFiProperties nifiProperties;
    private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<RemoteSiteListener>();
    private final AtomicReference<CounterRepository> counterRepositoryRef;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
    private final StandardControllerServiceProvider controllerServiceProvider;
    private final StandardControllerServiceResolver controllerServiceResolver;
    private final Authorizer authorizer;
    private final AuditService auditService;
    private final StatusHistoryRepository statusHistoryRepository;
    private final StateManagerProvider stateManagerProvider;
    private final long systemStartTime = System.currentTimeMillis();
    private final RevisionManager revisionManager;
    private final ConnectionLoadBalanceServer loadBalanceServer;
    private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
    private final FlowEngine loadBalanceClientThreadPool;
    private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<NioAsyncLoadBalanceClientTask>();
    private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<String, ProcessGroup>();
    private final ZooKeeperStateServer zooKeeperStateServer;
    private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference();
    private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false);
    private final Integer remoteInputSocketPort;
    private final Integer remoteInputHttpPort;
    private final Boolean isSiteToSiteSecure;
    private final Set<Connectable> startConnectablesAfterInitialization;
    private final Set<ProcessGroup> startGroupsAfterInitialization;
    private final Set<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
    private final LeaderElectionManager leaderElectionManager;
    private final ClusterCoordinator clusterCoordinator;
    private final FlowEngine validationThreadPool;
    private final FlowEngine flowAnalysisThreadPool;
    private final ValidationTrigger validationTrigger;
    private final ReloadComponent reloadComponent;
    private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
    private final UserAwareEventAccess eventAccess;
    private final ParameterContextManager parameterContextManager;
    private final StandardFlowAnalyzer flowAnalyzer;
    private final StandardFlowManager flowManager;
    private final RepositoryContextFactory repositoryContextFactory;
    private final RingBufferGarbageCollectionLog gcLog;
    private final Optional<FlowEngine> longRunningTaskMonitorThreadPool;
    private final boolean configuredForClustering;
    private final int heartbeatDelaySeconds;
    private final PropertyEncryptor encryptor;
    private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks", true);
    private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
    private ScheduledFuture<?> heartbeatSenderFuture;
    private final Heartbeater heartbeater;
    private final HeartbeatMonitor heartbeatMonitor;
    private final PythonBridge pythonBridge;
    private final org.apache.nifi.bundle.Bundle pythonBundle;
    private final AtomicReference<HeartbeatSendTask> heartbeatSendTask = new AtomicReference<Object>(null);
    private volatile NodeIdentifier nodeId;
    private boolean clustered;
    private volatile NodeConnectionStatus connectionStatus;
    private StatusAnalyticsEngine analyticsEngine;
    private String instanceId;
    private volatile boolean shutdown = false;
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final TimedLock readLock = new TimedLock((Lock)this.rwLock.readLock(), "FlowControllerReadLock", 1);
    private final TimedLock writeLock = new TimedLock((Lock)this.rwLock.writeLock(), "FlowControllerWriteLock", 1);
    private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);

    public static FlowController createStandaloneInstance(FlowFileEventRepository flowFileEventRepo, SSLContext sslContext, NiFiProperties properties, Authorizer authorizer, AuditService auditService, PropertyEncryptor encryptor, BulletinRepository bulletinRepo, ExtensionDiscoveringManager extensionManager, StatusHistoryRepository statusHistoryRepository, RuleViolationsManager ruleViolationsManager, StateManagerProvider stateManagerProvider) {
        return new FlowController(flowFileEventRepo, sslContext, properties, authorizer, auditService, encryptor, false, null, bulletinRepo, null, null, null, extensionManager, null, statusHistoryRepository, ruleViolationsManager, stateManagerProvider);
    }

    public static FlowController createClusteredInstance(FlowFileEventRepository flowFileEventRepo, SSLContext sslContext, NiFiProperties properties, Authorizer authorizer, AuditService auditService, PropertyEncryptor encryptor, NodeProtocolSender protocolSender, BulletinRepository bulletinRepo, ClusterCoordinator clusterCoordinator, HeartbeatMonitor heartbeatMonitor, LeaderElectionManager leaderElectionManager, ExtensionDiscoveringManager extensionManager, RevisionManager revisionManager, StatusHistoryRepository statusHistoryRepository, RuleViolationsManager ruleViolationsManager, StateManagerProvider stateManagerProvider) {
        return new FlowController(flowFileEventRepo, sslContext, properties, authorizer, auditService, encryptor, true, protocolSender, bulletinRepo, clusterCoordinator, heartbeatMonitor, leaderElectionManager, extensionManager, revisionManager, statusHistoryRepository, ruleViolationsManager, stateManagerProvider);
    }

    private FlowController(FlowFileEventRepository flowFileEventRepo, SSLContext sslContext, NiFiProperties nifiProperties, Authorizer authorizer, AuditService auditService, PropertyEncryptor encryptor, boolean configuredForClustering, NodeProtocolSender protocolSender, BulletinRepository bulletinRepo, ClusterCoordinator clusterCoordinator, HeartbeatMonitor heartbeatMonitor, LeaderElectionManager leaderElectionManager, ExtensionDiscoveringManager extensionManager, RevisionManager revisionManager, final StatusHistoryRepository statusHistoryRepository, RuleViolationsManager ruleViolationsManager, StateManagerProvider stateManagerProvider) {
        long snapshotMillis;
        long shutdownSecs;
        FlowFileRepository flowFileRepo;
        this.maxTimerDrivenThreads = new AtomicInteger(10);
        this.encryptor = encryptor;
        this.nifiProperties = nifiProperties;
        this.heartbeatMonitor = heartbeatMonitor;
        this.leaderElectionManager = leaderElectionManager;
        this.extensionManager = extensionManager;
        this.clusterCoordinator = clusterCoordinator;
        this.authorizer = authorizer;
        this.auditService = auditService;
        this.configuredForClustering = configuredForClustering;
        this.revisionManager = revisionManager;
        this.statusHistoryRepository = statusHistoryRepository;
        this.stateManagerProvider = stateManagerProvider;
        this.timerDrivenEngineRef = new AtomicReference<FlowEngine>(new FlowEngine(this.maxTimerDrivenThreads.get(), "Timer-Driven Process"));
        this.flowFileRepository = flowFileRepo = FlowController.createFlowFileRepository(nifiProperties, (ExtensionManager)extensionManager, this.resourceClaimManager);
        this.flowFileEventRepository = flowFileEventRepo;
        this.counterRepositoryRef = new AtomicReference<StandardCounterRepository>(new StandardCounterRepository());
        this.gcLog = new RingBufferGarbageCollectionLog(1000, 20L);
        for (GarbageCollectorMXBean mxBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            if (!(mxBean instanceof NotificationEmitter)) continue;
            ((NotificationEmitter)((Object)mxBean)).addNotificationListener(this.gcLog, null, null);
        }
        this.bulletinRepository = bulletinRepo;
        try {
            this.provenanceAuthorizableFactory = new StandardProvenanceAuthorizableFactory(this);
            this.provenanceRepository = this.createProvenanceRepository(nifiProperties);
            ComponentIdentifierLookup identifierLookup = new ComponentIdentifierLookup(this);
            this.provenanceRepository.initialize(this.createEventReporter(), authorizer, this.provenanceAuthorizableFactory, (IdentifierLookup)identifierLookup);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to create Provenance Repository", e);
        }
        try {
            this.contentRepository = this.createContentRepository(nifiProperties);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to create Content Repository", e);
        }
        this.lifecycleStateManager = new StandardLifecycleStateManager();
        this.processScheduler = new StandardProcessScheduler(this.timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, this.lifecycleStateManager);
        this.parameterContextManager = new StandardParameterContextManager();
        this.repositoryContextFactory = new RepositoryContextFactory(this.contentRepository, this.flowFileRepository, this.flowFileEventRepository, this.counterRepositoryRef.get(), this.provenanceRepository, stateManagerProvider);
        this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
        this.flowAnalyzer = ruleViolationsManager != null ? new StandardFlowAnalyzer(ruleViolationsManager, this, (ExtensionManager)extensionManager) : null;
        this.flowManager = new StandardFlowManager(nifiProperties, sslContext, this, this.flowFileEventRepository, this.parameterContextManager);
        this.controllerServiceProvider = new StandardControllerServiceProvider((ProcessScheduler)this.processScheduler, this.bulletinRepository, (FlowManager)this.flowManager, (ExtensionManager)extensionManager);
        this.controllerServiceResolver = new StandardControllerServiceResolver(authorizer, (FlowManager)this.flowManager, new NiFiRegistryFlowMapper((ExtensionManager)extensionManager), (ControllerServiceProvider)this.controllerServiceProvider, (ControllerServiceApiLookup)new StandardControllerServiceApiLookup((ExtensionManager)extensionManager));
        PythonBridge rawPythonBridge = this.createPythonBridge(nifiProperties, (ControllerServiceProvider)this.controllerServiceProvider);
        ClassLoader pythonBridgeClassLoader = rawPythonBridge.getClass().getClassLoader();
        ClassLoaderAwarePythonBridge classloaderAwareBridge = new ClassLoaderAwarePythonBridge(rawPythonBridge, pythonBridgeClassLoader);
        this.pythonBridge = classloaderAwareBridge;
        try {
            this.pythonBridge.start();
        }
        catch (IOException e) {
            throw new IllegalStateException("Failed to communicate with Python Controller", e);
        }
        extensionManager.setPythonBridge(this.pythonBridge);
        this.pythonBundle = PythonBundle.create((NiFiProperties)nifiProperties, (ClassLoader)pythonBridgeClassLoader);
        extensionManager.discoverPythonExtensions(this.pythonBundle);
        this.flowManager.initialize((ControllerServiceProvider)this.controllerServiceProvider, this.pythonBridge, this.flowAnalyzer, ruleViolationsManager);
        if (this.flowAnalyzer != null) {
            this.flowAnalyzer.initialize((ControllerServiceProvider)this.controllerServiceProvider);
        }
        CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, this.timerDrivenEngineRef.get(), this.repositoryContextFactory);
        TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, this.timerDrivenEngineRef.get(), this.repositoryContextFactory, this.nifiProperties);
        this.processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
        this.processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, cronSchedulingAgent);
        this.startConnectablesAfterInitialization = new HashSet<Connectable>();
        this.startRemoteGroupPortsAfterInitialization = new HashSet<RemoteGroupPort>();
        this.startGroupsAfterInitialization = new HashSet<ProcessGroup>();
        String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
        try {
            shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
            if (shutdownSecs < 1L) {
                shutdownSecs = 10L;
            }
        }
        catch (NumberFormatException nfe) {
            shutdownSecs = 10L;
        }
        this.gracefulShutdownSeconds = shutdownSecs;
        this.remoteInputSocketPort = nifiProperties.getRemoteInputPort();
        this.remoteInputHttpPort = nifiProperties.getRemoteInputHttpPort();
        this.isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure();
        this.heartbeatDelaySeconds = (int)FormatUtils.getTimeDuration((String)nifiProperties.getNodeHeartbeatInterval(), (TimeUnit)TimeUnit.SECONDS);
        this.snippetManager = new SnippetManager();
        this.reloadComponent = new StandardReloadComponent(this);
        ProcessGroup rootGroup = this.flowManager.createProcessGroup(ComponentIdGenerator.generateId().toString());
        rootGroup.setName("NiFi Flow");
        this.setRootGroup(rootGroup);
        this.instanceId = ComponentIdGenerator.generateId().toString();
        this.validationThreadPool = new FlowEngine(5, "Validate Components", true);
        this.validationTrigger = new StandardValidationTrigger((ExecutorService)this.validationThreadPool, this::isInitialized);
        if (this.remoteInputSocketPort == null) {
            LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
        } else if (this.isSiteToSiteSecure.booleanValue() && sslContext == null) {
            LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
        } else {
            RemoteResourceManager.setServerProtocolImplementation((String)"SocketFlowFileProtocol", SocketFlowFileServerProtocol.class);
            ClusterCoordinatorNodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
            this.externalSiteListeners.add((RemoteSiteListener)new SocketRemoteSiteListener(this.remoteInputSocketPort.intValue(), this.isSiteToSiteSecure != false ? sslContext : null, nifiProperties, (NodeInformant)nodeInformant));
        }
        if (this.remoteInputHttpPort == null) {
            LOG.debug("Not enabling HTTP(S) Site-to-Site functionality because the '{}' property is not true", (Object)"nifi.remote.input.http.enabled");
        } else {
            this.externalSiteListeners.add((RemoteSiteListener)HttpRemoteSiteListener.getInstance((NiFiProperties)nifiProperties));
        }
        for (RemoteSiteListener listener : this.externalSiteListeners) {
            listener.setRootGroup(rootGroup);
        }
        String snapshotFrequency = nifiProperties.getProperty("nifi.components.status.snapshot.frequency", "5 mins");
        try {
            snapshotMillis = FormatUtils.getTimeDuration((String)snapshotFrequency, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            snapshotMillis = FormatUtils.getTimeDuration((String)"5 mins", (TimeUnit)TimeUnit.MILLISECONDS);
        }
        if (nifiProperties.isStartEmbeddedZooKeeper() && configuredForClustering) {
            try {
                this.zooKeeperStateServer = ZooKeeperStateServer.create(nifiProperties);
                this.zooKeeperStateServer.start();
            }
            catch (IOException | QuorumPeerConfig.ConfigException e) {
                throw new IllegalStateException("Unable to initialize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
            }
        } else {
            this.zooKeeperStateServer = null;
        }
        boolean analyticsEnabled = Boolean.parseBoolean(nifiProperties.getProperty("nifi.analytics.predict.enabled", "false"));
        if (analyticsEnabled) {
            Double modelScoreThreshold;
            long queryIntervalMillis;
            long predictionIntervalMillis;
            String predictionInterval = nifiProperties.getProperty("nifi.analytics.predict.interval", "3 mins");
            try {
                predictionIntervalMillis = FormatUtils.getTimeDuration((String)predictionInterval, (TimeUnit)TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'", (Object)"nifi.analytics.predict.interval", (Object)"3 mins");
                predictionIntervalMillis = FormatUtils.getTimeDuration((String)"3 mins", (TimeUnit)TimeUnit.MILLISECONDS);
            }
            String queryInterval = nifiProperties.getProperty("nifi.analytics.query.interval", "3 mins");
            try {
                queryIntervalMillis = FormatUtils.getTimeDuration((String)queryInterval, (TimeUnit)TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'", (Object)"nifi.analytics.query.interval", (Object)"3 mins");
                queryIntervalMillis = FormatUtils.getTimeDuration((String)"3 mins", (TimeUnit)TimeUnit.MILLISECONDS);
            }
            String modelScoreName = nifiProperties.getProperty("nifi.analytics.connection.model.score.name", "rSquared");
            try {
                modelScoreThreshold = Double.valueOf(nifiProperties.getProperty("nifi.analytics.connection.model.score.threshold", Double.toString(0.9)));
            }
            catch (Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'.", (Object)"nifi.analytics.connection.model.score.threshold", (Object)0.9);
                modelScoreThreshold = 0.9;
            }
            StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory = new StatusAnalyticsModelMapFactory((ExtensionManager)extensionManager, nifiProperties);
            this.analyticsEngine = new CachingConnectionStatusAnalyticsEngine(this.flowManager, statusHistoryRepository, statusAnalyticsModelMapFactory, predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);
            this.timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        Long startTs = System.currentTimeMillis();
                        RepositoryStatusReport statusReport = FlowController.this.flowFileEventRepository.reportTransferEvents(startTs.longValue());
                        FlowController.this.flowManager.findAllConnections().forEach(connection -> {
                            ConnectionStatusAnalytics connectionStatusAnalytics = (ConnectionStatusAnalytics)FlowController.this.analyticsEngine.getStatusAnalytics(connection.getIdentifier());
                            connectionStatusAnalytics.refresh();
                            connectionStatusAnalytics.loadPredictions(statusReport);
                        });
                        Long endTs = System.currentTimeMillis();
                        LOG.debug("Time Elapsed for Prediction for loading all predictions: {}", (Object)(endTs - startTs));
                    }
                    catch (Exception e) {
                        LOG.error("Failed to generate predictions", (Throwable)e);
                    }
                }
            }, 0L, 15L, TimeUnit.SECONDS);
        }
        this.eventAccess = new StandardEventAccess(this.flowManager, this.flowFileEventRepository, this.processScheduler, authorizer, this.provenanceRepository, auditService, this.analyticsEngine, this.flowFileRepository, this.contentRepository);
        this.timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    statusHistoryRepository.capture(FlowController.this.getNodeStatusSnapshot(), FlowController.this.eventAccess.getControllerStatus(), FlowController.this.getGarbageCollectionStatus(), new Date());
                }
                catch (Exception e) {
                    LOG.error("Failed to capture component stats for Stats History", (Throwable)e);
                }
            }
        }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
        this.connectionStatus = new NodeConnectionStatus(this.nodeId, DisconnectionCode.NOT_YET_CONNECTED);
        this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
        if (configuredForClustering) {
            this.heartbeater = new ClusterProtocolHeartbeater(protocolSender, clusterCoordinator, leaderElectionManager);
            LOG.info("Checking for elected Cluster Coordinator...");
            Optional clusterCoordinatorLeader = leaderElectionManager.getLeader("Cluster Coordinator");
            if (!clusterCoordinatorLeader.isPresent()) {
                LOG.info("No Cluster Coordinator elected: Registering for Cluster Coordinator election");
                this.registerForClusterCoordinator(true);
            } else {
                LOG.info("Cluster Coordinator [{}] elected: Not registering for election until after connecting to the cluster and inheriting the flow", clusterCoordinatorLeader.get());
                this.registerForClusterCoordinator(false);
            }
            leaderElectionManager.start();
            heartbeatMonitor.start();
            InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
            EventReporter eventReporter = this.createEventReporter();
            ClusterLoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
            StandardLoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, this.contentRepository, this.provenanceRepository, this, authorizeConnection);
            int numThreads = nifiProperties.getIntegerProperty("nifi.cluster.load.balance.max.thread.count", Integer.valueOf(8));
            String timeoutPeriod = nifiProperties.getProperty("nifi.cluster.load.balance.comms.timeout", "30 sec");
            int timeoutMillis = (int)FormatUtils.getTimeDuration((String)timeoutPeriod, (TimeUnit)TimeUnit.MILLISECONDS);
            this.loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext, numThreads, loadBalanceProtocol, eventReporter, timeoutMillis);
            int connectionsPerNode = nifiProperties.getIntegerProperty("nifi.cluster.load.balance.connections.per.node", Integer.valueOf(4));
            NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(this.contentRepository), eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
            this.loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
            int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty("nifi.cluster.load.balance.max.thread.count", Integer.valueOf(8));
            this.loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);
            for (int i = 0; i < loadBalanceClientThreadCount; ++i) {
                NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(this.loadBalanceClientRegistry, clusterCoordinator, eventReporter);
                this.loadBalanceClientTasks.add(clientTask);
                this.loadBalanceClientThreadPool.submit((Runnable)clientTask);
            }
        } else {
            this.loadBalanceClientRegistry = null;
            this.heartbeater = null;
            this.loadBalanceServer = null;
            this.loadBalanceClientThreadPool = null;
        }
        this.longRunningTaskMonitorThreadPool = this.isLongRunningTaskMonitorEnabled() ? Optional.of(new FlowEngine(1, "Long Running Task Monitor", true)) : Optional.empty();
    }

    public Authorizable getParentAuthorizable() {
        return null;
    }

    public Resource getResource() {
        return ResourceFactory.getControllerResource();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static FlowFileRepository createFlowFileRepository(NiFiProperties properties, ExtensionManager extensionManager, ResourceClaimManager contentClaimManager) {
        String implementationClassName = properties.getProperty("nifi.flowfile.repository.implementation", DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
        if (implementationClassName == null) {
            throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: nifi.flowfile.repository.implementation");
        }
        try {
            FlowFileRepository created;
            FlowFileRepository flowFileRepository = created = (FlowFileRepository)NarThreadContextClassLoader.createInstance((ExtensionManager)extensionManager, (String)implementationClassName, FlowFileRepository.class, (NiFiProperties)properties);
            synchronized (flowFileRepository) {
                created.initialize(contentClaimManager);
            }
            return created;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private PythonBridge createPythonBridge(NiFiProperties nifiProperties, ControllerServiceProvider serviceProvider) {
        String pythonCommand = nifiProperties.getProperty("nifi.python.command");
        if (pythonCommand == null) {
            LOG.info("Python Extensions disabled because the nifi.python.command property has not been configured in nifi.properties");
            return new DisabledPythonBridge();
        }
        String commsTimeout = nifiProperties.getProperty("nifi.python.comms.timeout");
        File pythonFrameworkSourceDirectory = nifiProperties.getPythonFrameworkSourceDirectory();
        List pythonExtensionsDirectories = nifiProperties.getPythonExtensionsDirectories();
        File pythonWorkingDirectory = new File(nifiProperties.getProperty("nifi.python.working.directory"));
        int maxProcesses = nifiProperties.getIntegerProperty("nifi.python.max.processes", Integer.valueOf(20));
        int maxProcessesPerType = nifiProperties.getIntegerProperty("nifi.python.max.processes.per.extension.type", Integer.valueOf(2));
        boolean enableControllerDebug = Boolean.parseBoolean(nifiProperties.getProperty("nifi.python.controller.debugpy.enabled", "false"));
        int debugPort = nifiProperties.getIntegerProperty("nifi.python.controller.debugpy.port", Integer.valueOf(5678));
        String debugHost = nifiProperties.getProperty("nifi.python.controller.debugpy.host", "localhost");
        if (maxProcessesPerType < 1) {
            LOG.warn("Configured value for {} in nifi.properties is {}, which is invalid. Defaulting to 2.", (Object)"nifi.python.max.processes.per.extension.type", (Object)maxProcessesPerType);
            maxProcessesPerType = 2;
        }
        if (maxProcesses < 0) {
            LOG.warn("Configured value for {} in nifi.properties is {}, which is invalid. Defaulting to 20.", (Object)"nifi.python.max.processes", (Object)maxProcessesPerType);
            maxProcesses = 20;
        }
        if (maxProcesses == 0) {
            LOG.warn("Will not enable Python Extensions because the {} property in nifi.properties is set to 0.", (Object)"nifi.python.max.processes");
            return new DisabledPythonBridge();
        }
        if (maxProcessesPerType > maxProcesses) {
            LOG.warn("Configured values for {} and {} in nifi.properties are {} and {} (respectively), which is invalid. Cannot set max process count per extension type greater than the max number of processors. Setting both to {}", new Object[]{"nifi.python.max.processes.per.extension.type", "nifi.python.max.processes", maxProcessesPerType, maxProcesses, maxProcesses});
            maxProcessesPerType = maxProcesses;
        }
        ArrayList<File> narDirectories = new ArrayList<File>();
        for (org.apache.nifi.bundle.Bundle bundle : this.extensionManager.getAllBundles()) {
            File workingDir = bundle.getBundleDetails().getWorkingDirectory();
            if (!workingDir.exists()) continue;
            narDirectories.add(workingDir);
        }
        final PythonProcessConfig pythonProcessConfig = new PythonProcessConfig.Builder().pythonCommand(pythonCommand).pythonFrameworkDirectory(pythonFrameworkSourceDirectory).pythonExtensionsDirectories((Collection)pythonExtensionsDirectories).narDirectories(narDirectories).pythonWorkingDirectory(pythonWorkingDirectory).commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration((String)commsTimeout, (TimeUnit)TimeUnit.MILLISECONDS))).maxPythonProcesses(maxProcesses).maxPythonProcessesPerType(maxProcessesPerType).enableControllerDebug(enableControllerDebug).debugPort(debugPort).debugHost(debugHost).build();
        final ControllerServiceTypeLookup serviceTypeLookup = arg_0 -> ((ControllerServiceProvider)serviceProvider).getControllerServiceType(arg_0);
        try {
            PythonBridge bridge = (PythonBridge)NarThreadContextClassLoader.createInstance((ExtensionManager)this.extensionManager, (String)STANDARD_PYTHON_BRIDGE_IMPLEMENTATION_CLASS, PythonBridge.class, null);
            PythonBridgeInitializationContext initializationContext = new PythonBridgeInitializationContext(){

                public PythonProcessConfig getPythonProcessConfig() {
                    return pythonProcessConfig;
                }

                public ControllerServiceTypeLookup getControllerServiceTypeLookup() {
                    return serviceTypeLookup;
                }
            };
            bridge.initialize(initializationContext);
            return bridge;
        }
        catch (Exception e) {
            throw new RuntimeException("Python Bridge initialization failed", e);
        }
    }

    public FlowFileSwapManager createSwapManager() {
        String implementationClassName;
        String string = implementationClassName = this.isEncryptionProtocolVersionConfigured(this.nifiProperties) ? ENCRYPTED_SWAP_MANAGER_IMPLEMENTATION : this.nifiProperties.getProperty("nifi.swap.manager.implementation", DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
        if (implementationClassName == null) {
            return null;
        }
        try {
            FlowFileSwapManager swapManager = (FlowFileSwapManager)NarThreadContextClassLoader.createInstance((ExtensionManager)this.extensionManager, (String)implementationClassName, FlowFileSwapManager.class, (NiFiProperties)this.nifiProperties);
            final EventReporter eventReporter = this.createEventReporter();
            try (NarCloseable narCloseable = NarCloseable.withNarLoader();){
                SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext(){

                    public ResourceClaimManager getResourceClaimManager() {
                        return FlowController.this.resourceClaimManager;
                    }

                    public FlowFileRepository getFlowFileRepository() {
                        return FlowController.this.flowFileRepository;
                    }

                    public EventReporter getEventReporter() {
                        return eventReporter;
                    }
                };
                swapManager.initialize(initializationContext);
            }
            return swapManager;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public EventReporter createEventReporter() {
        return new EventReporter(){
            private static final long serialVersionUID = 1L;

            public void reportEvent(Severity severity, String category, String message) {
                Bulletin bulletin = BulletinFactory.createBulletin((String)category, (String)severity.name(), (String)message);
                FlowController.this.bulletinRepository.addBulletin(bulletin);
            }
        };
    }

    public void purge() {
        this.getFlowManager().purge();
        this.writeLock.lock();
        try {
            this.startConnectablesAfterInitialization.clear();
            this.startRemoteGroupPortsAfterInitialization.clear();
            this.startGroupsAfterInitialization.clear();
        }
        finally {
            this.writeLock.unlock("purge");
        }
    }

    public void initializeFlow() throws IOException {
        this.initializeFlow(new StandardQueueProvider(this.getFlowManager()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeFlow(QueueProvider queueProvider) throws IOException {
        this.writeLock.lock();
        try {
            Set connections = this.flowManager.findAllConnections();
            this.flowFileRepository.loadFlowFiles(queueProvider);
            long maxIdFromSwapFiles = -1L;
            if (this.flowFileRepository.isVolatile()) {
                for (Connection connection : connections) {
                    queue = connection.getFlowFileQueue();
                    queue.purgeSwapFiles();
                }
            } else {
                for (Connection connection : connections) {
                    queue = connection.getFlowFileQueue();
                    SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
                    if (swapSummary == null) continue;
                    Long maxFlowFileId = swapSummary.getMaxFlowFileId();
                    if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
                        maxIdFromSwapFiles = maxFlowFileId;
                    }
                    for (ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
                        this.resourceClaimManager.incrementClaimantCount(resourceClaim);
                    }
                }
            }
            this.flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1L);
            RepositoryContextFactory contextFactory = new RepositoryContextFactory(this.contentRepository, this.flowFileRepository, this.flowFileEventRepository, this.counterRepositoryRef.get(), this.provenanceRepository, this.stateManagerProvider);
            this.processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
            this.contentRepository.cleanup();
            for (RemoteSiteListener listener : this.externalSiteListeners) {
                listener.start();
            }
            if (this.loadBalanceServer != null) {
                this.loadBalanceServer.start();
            }
            this.notifyComponentsConfigurationRestored();
            this.timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        FlowController.this.updateRemoteProcessGroups();
                    }
                    catch (Throwable t) {
                        LOG.warn("Unable to update Remote Process Groups", t);
                    }
                }
            }, 0L, 30L, TimeUnit.SECONDS);
            this.timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    ProcessGroup rootGroup = FlowController.this.flowManager.getRootGroup();
                    List allGroups = rootGroup.findAllProcessGroups();
                    allGroups.add(rootGroup);
                    for (ProcessGroup group : allGroups) {
                        try {
                            group.synchronizeWithFlowRegistry((FlowManager)FlowController.this.flowManager);
                        }
                        catch (Exception e) {
                            LOG.error("Failed to synchronize {} with Flow Registry", (Object)group, (Object)e);
                        }
                    }
                }
            }, 5L, 60L, TimeUnit.SECONDS);
            this.initialized.set(true);
        }
        finally {
            this.writeLock.unlock("initializeFlow");
        }
    }

    private void notifyComponentsConfigurationRestored() {
        NarCloseable nc;
        for (ProcessorNode procNode : this.flowManager.getRootGroup().findAllProcessors()) {
            Processor processor = procNode.getProcessor();
            nc = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, (Class)processor.getClass(), (String)processor.getIdentifier());
            try {
                StandardProcessContext processContext = new StandardProcessContext(procNode, (ControllerServiceProvider)this.controllerServiceProvider, this.getStateManagerProvider().getStateManager(processor.getIdentifier()), () -> false, (NodeTypeProvider)this);
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, (Object)processor, (Object[])new Object[]{processContext});
            }
            finally {
                if (nc == null) continue;
                nc.close();
            }
        }
        for (ControllerServiceNode serviceNode : this.flowManager.getAllControllerServices()) {
            ControllerService service = serviceNode.getControllerServiceImplementation();
            nc = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, (Class)service.getClass(), (String)service.getIdentifier());
            try {
                StandardConfigurationContext configurationContext = new StandardConfigurationContext((ComponentNode)serviceNode, (ControllerServiceLookup)this.controllerServiceProvider, null);
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, (Object)service, (Object[])new Object[]{configurationContext});
            }
            finally {
                if (nc == null) continue;
                nc.close();
            }
        }
        for (ReportingTaskNode taskNode : this.getAllReportingTasks()) {
            ReportingTask task = taskNode.getReportingTask();
            nc = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, (Class)task.getClass(), (String)task.getIdentifier());
            try {
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, (Object)task, (Object[])new Object[]{taskNode.getConfigurationContext()});
            }
            finally {
                if (nc == null) continue;
                nc.close();
            }
        }
        for (FlowAnalysisRuleNode ruleNode : this.getAllFlowAnalysisRules()) {
            FlowAnalysisRule rule = ruleNode.getFlowAnalysisRule();
            nc = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, (Class)rule.getClass(), (String)rule.getIdentifier());
            try {
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, (Object)rule, (Object[])new Object[]{ruleNode.getConfigurationContext()});
            }
            finally {
                if (nc == null) continue;
                nc.close();
            }
        }
        for (ParameterProviderNode parameterProviderNode : this.flowManager.getAllParameterProviders()) {
            ParameterProvider provider = parameterProviderNode.getParameterProvider();
            nc = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, (Class)provider.getClass(), (String)provider.getIdentifier());
            try {
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, (Object)provider, (Object[])new Object[0]);
            }
            finally {
                if (nc == null) continue;
                nc.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onFlowInitialized(boolean startDelayedComponents) {
        this.writeLock.lock();
        try {
            LOG.debug("Triggering initial validation of all components");
            long start = System.nanoTime();
            Supplier<VersionedProcessGroup> rootProcessGroupSupplier = () -> {
                ProcessGroup rootProcessGroup = this.getFlowManager().getRootGroup();
                NiFiRegistryFlowMapper mapper = FlowAnalysisUtil.createMapper((ExtensionManager)this.getExtensionManager());
                InstantiatedVersionedProcessGroup versionedRootProcessGroup = mapper.mapNonVersionedProcessGroup(rootProcessGroup, (ControllerServiceProvider)this.controllerServiceProvider);
                return versionedRootProcessGroup;
            };
            ValidationTrigger triggerIfValidating = new ValidationTrigger(){

                public void triggerAsync(ComponentNode component) {
                    ValidationStatus status = component.getValidationStatus();
                    if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
                        LOG.debug("Will trigger async validation for {} because its status is VALIDATING", (Object)component);
                        FlowController.this.validationTrigger.triggerAsync(component);
                    } else {
                        LOG.debug("Will not trigger async validation for {} because its status is {}", (Object)component, (Object)status);
                    }
                }

                public void trigger(ComponentNode component) {
                    ValidationStatus status = component.getValidationStatus();
                    if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
                        LOG.debug("Will trigger immediate validation for {} because its status is VALIDATING", (Object)component);
                        FlowController.this.validationTrigger.trigger(component);
                    } else {
                        LOG.debug("Will not trigger immediate validation for {} because its status is {}", (Object)component, (Object)status);
                    }
                }
            };
            if (this.flowAnalyzer != null) {
                new TriggerFlowAnalysisTask(this.flowAnalyzer, rootProcessGroupSupplier).run();
            }
            new TriggerValidationTask(this.flowManager, triggerIfValidating).run();
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
            LOG.info("Performed initial validation of all components in {} milliseconds", (Object)millis);
            this.scheduleBackgroundFlowAnalysis(rootProcessGroupSupplier);
            this.validationThreadPool.scheduleWithFixedDelay((Runnable)new TriggerValidationTask(this.flowManager, this.validationTrigger), 5L, 5L, TimeUnit.SECONDS);
            if (startDelayedComponents) {
                LOG.info("Starting {} Stateless Process Groups", (Object)this.startGroupsAfterInitialization.size());
                for (ProcessGroup processGroup : this.startGroupsAfterInitialization) {
                    processGroup.startProcessing();
                }
                this.startGroupsAfterInitialization.clear();
                LOG.info("Starting {} processors/ports/funnels", (Object)(this.startConnectablesAfterInitialization.size() + this.startRemoteGroupPortsAfterInitialization.size()));
                for (Connectable connectable : this.startConnectablesAfterInitialization) {
                    if (connectable.getScheduledState() == ScheduledState.DISABLED) continue;
                    try {
                        if (connectable instanceof ProcessorNode) {
                            connectable.getProcessGroup().startProcessor((ProcessorNode)connectable, true);
                            continue;
                        }
                        this.startConnectable(connectable);
                    }
                    catch (Throwable t) {
                        LOG.error("Unable to start {}", (Object)connectable, (Object)t);
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.error("", t);
                    }
                }
                this.startConnectablesAfterInitialization.clear();
                int startedTransmitting = 0;
                for (RemoteGroupPort remoteGroupPort : this.startRemoteGroupPortsAfterInitialization) {
                    try {
                        remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                        ++startedTransmitting;
                    }
                    catch (Throwable t) {
                        LOG.error("Unable to start transmitting with {}", (Object)remoteGroupPort, (Object)t);
                    }
                }
                LOG.info("Started {} Remote Group Ports transmitting", (Object)startedTransmitting);
                this.startRemoteGroupPortsAfterInitialization.clear();
            } else {
                for (Connectable connectable : this.startConnectablesAfterInitialization) {
                    try {
                        if (!(connectable instanceof Funnel)) continue;
                        this.startConnectable(connectable);
                    }
                    catch (Throwable t) {
                        LOG.error("Unable to start {}", (Object)connectable, (Object)t);
                    }
                }
                this.startConnectablesAfterInitialization.clear();
                this.startRemoteGroupPortsAfterInitialization.clear();
            }
            this.flowManager.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
            for (Connection connection : this.flowManager.findAllConnections()) {
                connection.getFlowFileQueue().startLoadBalancing();
            }
            this.scheduleLongRunningTaskMonitor();
            Runnable discoverPythonExtensions = () -> this.extensionManager.discoverNewPythonExtensions(this.pythonBundle);
            this.timerDrivenEngineRef.get().scheduleWithFixedDelay(discoverPythonExtensions, 1L, 1L, TimeUnit.MINUTES);
        }
        finally {
            this.writeLock.unlock("onFlowInitialized");
        }
    }

    private void scheduleBackgroundFlowAnalysis(Supplier<VersionedProcessGroup> rootProcessGroupSupplier) {
        if (this.flowAnalyzer != null) {
            try {
                this.flowAnalysisThreadPool.scheduleWithFixedDelay((Runnable)new TriggerFlowAnalysisTask(this.flowAnalyzer, rootProcessGroupSupplier), 5L, 5L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOG.warn("Could not initialize TriggerFlowAnalysisTask.", (Throwable)e);
            }
        }
    }

    private void scheduleLongRunningTaskMonitor() {
        this.longRunningTaskMonitorThreadPool.ifPresent(flowEngine -> {
            try {
                long scheduleMillis = this.parseDurationPropertyToMillis("nifi.monitor.long.running.task.schedule");
                long thresholdMillis = this.parseDurationPropertyToMillis("nifi.monitor.long.running.task.threshold");
                LongRunningTaskMonitor longRunningTaskMonitor = new LongRunningTaskMonitor(this.getFlowManager(), this.createEventReporter(), thresholdMillis);
                this.longRunningTaskMonitorThreadPool.get().scheduleWithFixedDelay((Runnable)longRunningTaskMonitor, scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOG.warn("Could not initialize LongRunningTaskMonitor.", (Throwable)e);
            }
        });
    }

    private long parseDurationPropertyToMillis(String propertyName) {
        try {
            String duration = this.nifiProperties.getProperty(propertyName);
            return (long)FormatUtils.getPreciseTimeDuration((String)duration, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.warn("Could not retrieve value for {}. Valid values e.g. 60 secs or 1 min.", (Object)propertyName);
            throw e;
        }
    }

    private boolean isLongRunningTaskMonitorEnabled() {
        return StringUtils.isNotBlank((CharSequence)this.nifiProperties.getProperty("nifi.monitor.long.running.task.schedule")) && StringUtils.isNotBlank((CharSequence)this.nifiProperties.getProperty("nifi.monitor.long.running.task.threshold"));
    }

    public boolean isStartAfterInitialization(Connectable component) {
        return this.startConnectablesAfterInitialization.contains(component) || this.startRemoteGroupPortsAfterInitialization.contains(component);
    }

    public boolean isStartAfterInitialization(ProcessGroup group) {
        return this.startGroupsAfterInitialization.contains(group);
    }

    private ContentRepository createContentRepository(NiFiProperties properties) {
        String implementationClassName;
        String string = implementationClassName = this.isEncryptionProtocolVersionConfigured(properties) ? ENCRYPTED_CONTENT_REPO_IMPLEMENTATION : properties.getProperty("nifi.content.repository.implementation", DEFAULT_CONTENT_REPO_IMPLEMENTATION);
        if (implementationClassName == null) {
            throw new RuntimeException("Cannot create Content Repository because the NiFi Properties is missing the following property: nifi.content.repository.implementation");
        }
        LOG.info("Creating Content Repository [{}]", (Object)implementationClassName);
        try {
            ContentRepository contentRepo = (ContentRepository)NarThreadContextClassLoader.createInstance((ExtensionManager)this.extensionManager, (String)implementationClassName, ContentRepository.class, (NiFiProperties)properties);
            contentRepo.initialize((ContentRepositoryContext)new StandardContentRepositoryContext(this.resourceClaimManager, this.createEventReporter()));
            return contentRepo;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private ProvenanceRepository createProvenanceRepository(NiFiProperties properties) {
        String implementationClassName;
        String string = implementationClassName = this.isEncryptionProtocolVersionConfigured(properties) ? ENCRYPTED_PROVENANCE_REPO_IMPLEMENTATION : properties.getProperty("nifi.provenance.repository.implementation", DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
        if (StringUtils.isBlank((CharSequence)implementationClassName)) {
            throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: nifi.provenance.repository.implementation");
        }
        LOG.info("Creating Provenance Repository [{}]", (Object)implementationClassName);
        try {
            return (ProvenanceRepository)NarThreadContextClassLoader.createInstance((ExtensionManager)this.extensionManager, (String)implementationClassName, ProvenanceRepository.class, (NiFiProperties)properties);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public KerberosConfig createKerberosConfig(NiFiProperties nifiProperties) {
        String principal = nifiProperties.getKerberosServicePrincipal();
        String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
        File kerberosConfigFile = nifiProperties.getKerberosConfigurationFile();
        if (principal == null && keytabLocation == null && kerberosConfigFile == null) {
            return KerberosConfig.NOT_CONFIGURED;
        }
        File keytabFile = keytabLocation == null ? null : new File(keytabLocation);
        return new KerberosConfig(principal, keytabFile, kerberosConfigFile);
    }

    public ValidationTrigger getValidationTrigger() {
        return this.validationTrigger;
    }

    public PropertyEncryptor getEncryptor() {
        return this.encryptor;
    }

    public ExtensionManager getExtensionManager() {
        return this.extensionManager;
    }

    public String getInstanceId() {
        this.readLock.lock();
        try {
            String string = this.instanceId;
            return string;
        }
        finally {
            this.readLock.unlock("getInstanceId");
        }
    }

    public Heartbeater getHeartbeater() {
        return this.heartbeater;
    }

    public BulletinRepository getBulletinRepository() {
        return this.bulletinRepository;
    }

    public LifecycleStateManager getLifecycleStateManager() {
        return this.lifecycleStateManager;
    }

    public SnippetManager getSnippetManager() {
        return this.snippetManager;
    }

    public StateManagerProvider getStateManagerProvider() {
        return this.stateManagerProvider;
    }

    public Authorizer getAuthorizer() {
        return this.authorizer;
    }

    public boolean isTerminated() {
        this.readLock.lock();
        try {
            boolean bl = null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
            return bl;
        }
        finally {
            this.readLock.unlock("isTerminated");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown(boolean kill) {
        LOG.info("Initiating shutdown of FlowController...");
        this.shutdown = true;
        CompletableFuture rootGroupStopFuture = this.flowManager.getRootGroup().stopProcessing();
        this.readLock.lock();
        try {
            if (this.isTerminated() || this.timerDrivenEngineRef.get().isTerminating()) {
                throw new IllegalStateException("Controller already stopped or still stopping...");
            }
            if (this.leaderElectionManager != null) {
                this.leaderElectionManager.stop();
            }
            if (this.heartbeatMonitor != null) {
                this.heartbeatMonitor.stop();
            }
            this.validationThreadPool.shutdown();
            this.flowAnalysisThreadPool.shutdown();
            this.clusterTaskExecutor.shutdownNow();
            if (this.zooKeeperStateServer != null) {
                this.zooKeeperStateServer.shutdown();
            }
            if (this.loadBalanceClientThreadPool != null) {
                this.loadBalanceClientThreadPool.shutdownNow();
            }
            this.loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
            this.flowManager.getRootGroup().shutdown();
            this.stateManagerProvider.shutdown();
            for (ControllerServiceNode serviceNode : this.flowManager.getAllControllerServices()) {
                this.processScheduler.shutdownControllerService(serviceNode, (ControllerServiceProvider)this.controllerServiceProvider);
            }
            for (ReportingTaskNode taskNode : this.getAllReportingTasks()) {
                this.processScheduler.shutdownReportingTask(taskNode);
            }
            long shutdownEnd = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.gracefulShutdownSeconds);
            if (!kill) {
                try {
                    rootGroupStopFuture.get(this.gracefulShutdownSeconds, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    LOG.warn("Failed to wait until all components have gracefully stopped", (Throwable)e);
                }
            }
            if (kill) {
                this.timerDrivenEngineRef.get().shutdownNow();
                LOG.info("Initiated immediate shutdown of flow controller...");
            } else {
                this.timerDrivenEngineRef.get().shutdown();
                LOG.info("Initiated graceful shutdown of flow controller...waiting up to {} seconds", (Object)this.gracefulShutdownSeconds);
            }
            try {
                long millisToWait = Math.max(2000L, shutdownEnd - System.currentTimeMillis());
                this.timerDrivenEngineRef.get().awaitTermination(millisToWait, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ie) {
                LOG.info("Interrupted while waiting for controller termination.");
            }
            try {
                this.flowFileRepository.close();
            }
            catch (Throwable t) {
                LOG.warn("Unable to shut down FlowFileRepository", t);
            }
            if (this.timerDrivenEngineRef.get().isTerminated()) {
                LOG.info("Controller has been terminated successfully.");
            } else {
                LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
            }
            for (RemoteSiteListener listener : this.externalSiteListeners) {
                listener.stop();
                listener.destroy();
            }
            if (this.pythonBridge != null) {
                try {
                    this.pythonBridge.shutdown();
                }
                catch (Exception e) {
                    LOG.warn("Failed to cleanly shutdown Py4J Bridge", (Throwable)e);
                }
            }
            if (this.loadBalanceServer != null) {
                this.loadBalanceServer.stop();
            }
            if (this.loadBalanceClientRegistry != null) {
                this.loadBalanceClientRegistry.stop();
            }
            if (this.processScheduler != null) {
                this.processScheduler.shutdown();
            }
            if (this.contentRepository != null) {
                this.contentRepository.shutdown();
            }
            if (this.provenanceRepository != null) {
                try {
                    this.provenanceRepository.close();
                }
                catch (IOException ioe) {
                    LOG.warn("There was a problem shutting down the Provenance Repository", (Throwable)ioe);
                }
            }
            if (this.statusHistoryRepository != null) {
                this.statusHistoryRepository.shutdown();
            }
        }
        finally {
            this.readLock.unlock("shutdown");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized <T> void serialize(FlowSerializer<T> serializer, OutputStream os) throws FlowSerializationException {
        T flowConfiguration;
        this.readLock.lock();
        try {
            ScheduledStateLookup scheduledStateLookup = this.createScheduledStateLookup();
            flowConfiguration = serializer.transform(this, scheduledStateLookup);
        }
        finally {
            this.readLock.unlock("serialize");
        }
        serializer.serialize(flowConfiguration, os);
    }

    public ScheduledStateLookup createScheduledStateLookup() {
        return new ScheduledStateLookup(){

            @Override
            public ScheduledState getScheduledState(ProcessorNode procNode) {
                if (FlowController.this.startConnectablesAfterInitialization.contains(procNode)) {
                    return ScheduledState.RUNNING;
                }
                return procNode.getDesiredState();
            }

            @Override
            public ScheduledState getScheduledState(Port port) {
                if (FlowController.this.startConnectablesAfterInitialization.contains(port)) {
                    return ScheduledState.RUNNING;
                }
                if (FlowController.this.startRemoteGroupPortsAfterInitialization.contains(port)) {
                    return ScheduledState.RUNNING;
                }
                return port.getScheduledState();
            }

            @Override
            public ScheduledState getScheduledState(ProcessGroup processGroup) {
                if (FlowController.this.startGroupsAfterInitialization.contains(processGroup)) {
                    return ScheduledState.RUNNING;
                }
                return processGroup.getDesiredStatelessScheduledState() == StatelessGroupScheduledState.RUNNING ? ScheduledState.RUNNING : ScheduledState.STOPPED;
            }
        };
    }

    public VersionedComponentStateLookup createVersionedComponentStateLookup(final VersionedComponentStateLookup delegate) {
        return new VersionedComponentStateLookup(){

            public org.apache.nifi.flow.ScheduledState getState(ProcessorNode processorNode) {
                if (FlowController.this.isStartAfterInitialization((Connectable)processorNode)) {
                    return org.apache.nifi.flow.ScheduledState.RUNNING;
                }
                return delegate.getState(processorNode);
            }

            public org.apache.nifi.flow.ScheduledState getState(Port port) {
                if (FlowController.this.isStartAfterInitialization((Connectable)port)) {
                    return org.apache.nifi.flow.ScheduledState.RUNNING;
                }
                return delegate.getState(port);
            }

            public org.apache.nifi.flow.ScheduledState getState(ReportingTaskNode taskNode) {
                return delegate.getState(taskNode);
            }

            public org.apache.nifi.flow.ScheduledState getState(FlowAnalysisRuleNode ruleNode) {
                return delegate.getState(ruleNode);
            }

            public org.apache.nifi.flow.ScheduledState getState(ControllerServiceNode serviceNode) {
                return delegate.getState(serviceNode);
            }

            public org.apache.nifi.flow.ScheduledState getState(ProcessGroup group) {
                if (FlowController.this.isStartAfterInitialization(group)) {
                    return org.apache.nifi.flow.ScheduledState.RUNNING;
                }
                return delegate.getState(group);
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void synchronize(FlowSynchronizer synchronizer, DataFlow dataFlow, FlowService flowService, BundleUpdateStrategy bundleUpdateStrategy) throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
        this.writeLock.lock();
        try {
            LOG.debug("Synchronizing controller with proposed flow");
            try {
                synchronizer.sync(this, dataFlow, flowService, bundleUpdateStrategy);
            }
            catch (UninheritableFlowException ufe) {
                NodeIdentifier localNodeId = this.getNodeId();
                if (localNodeId != null) {
                    try {
                        this.clusterCoordinator.requestNodeDisconnect(localNodeId, DisconnectionCode.MISMATCHED_FLOWS, ufe.getMessage());
                    }
                    catch (Exception e) {
                        LOG.error("Failed to synchronize Controller with proposed flow and also failed to notify cluster that the flows do not match. Node's state may remain CONNECTING instead of transitioning to DISCONNECTED.", (Throwable)e);
                    }
                }
                throw ufe;
            }
            this.flowSynchronized.set(true);
            LOG.info("Successfully synchronized controller with proposed flow. Flow contains the following number of components: {}", (Object)this.flowManager.getComponentCounts());
        }
        finally {
            this.writeLock.unlock("synchronize");
        }
    }

    public int getMaxTimerDrivenThreadCount() {
        return this.maxTimerDrivenThreads.get();
    }

    public int getActiveTimerDrivenThreadCount() {
        return this.timerDrivenEngineRef.get().getActiveCount();
    }

    public void setMaxTimerDrivenThreadCount(int maxThreadCount) {
        this.writeLock.lock();
        try {
            this.setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
        }
        finally {
            this.writeLock.unlock("setMaxTimerDrivenThreadCount");
        }
    }

    private void setMaxThreadCount(int maxThreadCount, String poolName, FlowEngine engine, AtomicInteger maxThreads) {
        if (maxThreadCount < 1) {
            throw new IllegalArgumentException("Cannot set max number of threads to less than 1");
        }
        maxThreads.getAndSet(maxThreadCount);
        if (engine == null) {
            LOG.debug("[{}] Engine not found: Maximum Thread Count not updated", (Object)poolName);
        } else {
            int previousCorePoolSize = engine.getCorePoolSize();
            engine.setCorePoolSize(maxThreadCount);
            LOG.info("[{}] Maximum Thread Count updated [{}] previous [{}]", new Object[]{poolName, maxThreadCount, previousCorePoolSize});
        }
    }

    public UserAwareEventAccess getEventAccess() {
        return this.eventAccess;
    }

    public StatusAnalyticsEngine getStatusAnalyticsEngine() {
        return this.analyticsEngine;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setRootGroup(ProcessGroup group) {
        if (Objects.requireNonNull(group).getParent() != null) {
            throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group");
        }
        this.writeLock.lock();
        try {
            this.flowManager.setRootGroup(group);
            for (RemoteSiteListener listener : this.externalSiteListeners) {
                listener.setRootGroup(group);
            }
            this.heartbeatBeanRef.set(new HeartbeatBean(group, this.isPrimary()));
            this.allProcessGroups.put(group.getIdentifier(), group);
            this.allProcessGroups.put("root", group);
        }
        finally {
            this.writeLock.unlock("setRootGroup");
        }
    }

    public SystemDiagnostics getSystemDiagnostics() {
        SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory();
        return factory.create(this.flowFileRepository, this.contentRepository, this.provenanceRepository, this.resourceClaimManager);
    }

    public String getContentRepoFileStoreName(String containerName) {
        return this.contentRepository.getContainerFileStoreName(containerName);
    }

    public String getFlowRepoFileStoreName() {
        return this.flowFileRepository.getFileStoreName();
    }

    public String getProvenanceRepoFileStoreName(String containerName) {
        return this.provenanceRepository.getContainerFileStoreName(containerName);
    }

    private void verifyBundleInVersionedFlow(Bundle requiredBundle, Set<BundleCoordinate> supportedBundles) {
        BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
        if (!supportedBundles.contains(requiredCoordinate)) {
            throw new IllegalStateException("Unsupported bundle: " + String.valueOf(requiredCoordinate));
        }
    }

    private void verifyProcessorsInVersionedFlow(VersionedProcessGroup versionedFlow, Map<String, Set<BundleCoordinate>> supportedTypes) {
        if (versionedFlow.getProcessors() != null) {
            versionedFlow.getProcessors().forEach(processor -> {
                if (processor.getBundle() == null) {
                    throw new IllegalArgumentException("Processor bundle must be specified.");
                }
                if (!supportedTypes.containsKey(processor.getType())) {
                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
                }
                this.verifyBundleInVersionedFlow(processor.getBundle(), (Set)supportedTypes.get(processor.getType()));
            });
        }
        if (versionedFlow.getProcessGroups() != null) {
            versionedFlow.getProcessGroups().forEach(processGroup -> this.verifyProcessorsInVersionedFlow((VersionedProcessGroup)processGroup, supportedTypes));
        }
    }

    private void verifyControllerServicesInVersionedFlow(VersionedProcessGroup versionedFlow, Map<String, Set<BundleCoordinate>> supportedTypes) {
        if (versionedFlow.getControllerServices() != null) {
            versionedFlow.getControllerServices().forEach(controllerService -> {
                if (supportedTypes.containsKey(controllerService.getType())) {
                    if (controllerService.getBundle() == null) {
                        throw new IllegalArgumentException("Controller Service bundle must be specified.");
                    }
                } else {
                    throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
                }
                this.verifyBundleInVersionedFlow(controllerService.getBundle(), (Set)supportedTypes.get(controllerService.getType()));
            });
        }
        if (versionedFlow.getProcessGroups() != null) {
            versionedFlow.getProcessGroups().forEach(processGroup -> this.verifyControllerServicesInVersionedFlow((VersionedProcessGroup)processGroup, supportedTypes));
        }
    }

    public void verifyComponentTypesInSnippet(VersionedProcessGroup versionedFlow) {
        HashMap<String, Set<BundleCoordinate>> processorClasses = new HashMap<String, Set<BundleCoordinate>>();
        for (Object extensionDefinition : this.extensionManager.getExtensions(Processor.class)) {
            String name = extensionDefinition.getImplementationClassName();
            processorClasses.put(name, this.extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
        }
        this.verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
        HashMap<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<String, Set<BundleCoordinate>>();
        for (Object extensionDefinition : this.extensionManager.getExtensions(ControllerService.class)) {
            String name = extensionDefinition.getImplementationClassName();
            controllerServiceClasses.put(name, this.extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
        }
        this.verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
        HashSet<String> prioritizerClasses = new HashSet<String>();
        for (ExtensionDefinition extensionDefinition : this.extensionManager.getExtensions(FlowFilePrioritizer.class)) {
            String name = extensionDefinition.getImplementationClassName();
            prioritizerClasses.add(name);
        }
        HashSet<VersionedConnection> allConns = new HashSet<VersionedConnection>();
        allConns.addAll(versionedFlow.getConnections());
        for (VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
            allConns.addAll(this.findAllConnections(childGroup));
        }
        for (VersionedConnection conn : allConns) {
            List prioritizers = conn.getPrioritizers();
            if (prioritizers == null) continue;
            for (String prioritizer : prioritizers) {
                if (prioritizerClasses.contains(prioritizer)) continue;
                throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
            }
        }
    }

    private Set<VersionedConnection> findAllConnections(VersionedProcessGroup group) {
        HashSet<VersionedConnection> conns = new HashSet<VersionedConnection>();
        for (VersionedConnection connection : group.getConnections()) {
            conns.add(connection);
        }
        for (VersionedProcessGroup childGroup : group.getProcessGroups()) {
            conns.addAll(this.findAllConnections(childGroup));
        }
        return conns;
    }

    private ProcessGroup lookupGroup(String id) {
        ProcessGroup group = this.flowManager.getGroup(id);
        if (group == null) {
            throw new IllegalStateException("No Group with ID " + id + " exists");
        }
        return group;
    }

    public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
        ArrayList<GarbageCollectionStatus> statuses = new ArrayList<GarbageCollectionStatus>();
        Date now = new Date();
        for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
            String managerName = mbean.getName();
            long count = mbean.getCollectionCount();
            long millis = mbean.getCollectionTime();
            StandardGarbageCollectionStatus status = new StandardGarbageCollectionStatus(managerName, now, count, millis);
            statuses.add((GarbageCollectionStatus)status);
        }
        return statuses;
    }

    public GarbageCollectionHistory getGarbageCollectionHistory() {
        return this.statusHistoryRepository.getGarbageCollectionHistory(new Date(0L), new Date());
    }

    public ReloadComponent getReloadComponent() {
        return this.reloadComponent;
    }

    public void startProcessor(String parentGroupId, String processorId) {
        this.startProcessor(parentGroupId, processorId, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startProcessor(String parentGroupId, String processorId, boolean failIfStopping) {
        ProcessGroup group = this.lookupGroup(parentGroupId);
        ProcessorNode node = group.getProcessor(processorId);
        if (node == null) {
            throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
        }
        this.writeLock.lock();
        try {
            if (this.initialized.get()) {
                group.startProcessor(node, failIfStopping);
            } else {
                this.startConnectablesAfterInitialization.add((Connectable)node);
            }
        }
        finally {
            this.writeLock.unlock("startProcessor");
        }
    }

    public void startProcessGroup(ProcessGroup processGroup) {
        this.writeLock.lock();
        try {
            if (this.initialized.get()) {
                processGroup.startProcessing();
            } else {
                this.startGroupsAfterInitialization.add(processGroup);
            }
        }
        finally {
            this.writeLock.unlock("startProcessGroup");
        }
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public boolean isFlowSynchronized() {
        return this.flowSynchronized.get();
    }

    public void startConnectable(Connectable connectable) {
        block10: {
            ProcessGroup group = Objects.requireNonNull(connectable).getProcessGroup();
            this.writeLock.lock();
            try {
                if (this.initialized.get()) {
                    switch (Objects.requireNonNull(connectable).getConnectableType()) {
                        case FUNNEL: {
                            group.startFunnel((Funnel)connectable);
                            break block10;
                        }
                        case INPUT_PORT: 
                        case REMOTE_INPUT_PORT: {
                            group.startInputPort((Port)connectable);
                            break block10;
                        }
                        case OUTPUT_PORT: 
                        case REMOTE_OUTPUT_PORT: {
                            group.startOutputPort((Port)connectable);
                            break block10;
                        }
                        case PROCESSOR: {
                            group.startProcessor((ProcessorNode)connectable, true);
                            break block10;
                        }
                        default: {
                            throw new IllegalArgumentException();
                        }
                    }
                }
                this.startConnectablesAfterInitialization.add(connectable);
            }
            finally {
                this.writeLock.unlock("startConnectable");
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void stopConnectable(Connectable connectable) {
        ProcessGroup group = Objects.requireNonNull(connectable).getProcessGroup();
        this.writeLock.lock();
        try {
            switch (Objects.requireNonNull(connectable).getConnectableType()) {
                case FUNNEL: {
                    return;
                }
                case INPUT_PORT: 
                case REMOTE_INPUT_PORT: {
                    this.startConnectablesAfterInitialization.remove(connectable);
                    group.stopInputPort((Port)connectable);
                    return;
                }
                case OUTPUT_PORT: 
                case REMOTE_OUTPUT_PORT: {
                    this.startConnectablesAfterInitialization.remove(connectable);
                    group.stopOutputPort((Port)connectable);
                    return;
                }
                case PROCESSOR: {
                    this.startConnectablesAfterInitialization.remove(connectable);
                    group.stopProcessor((ProcessorNode)connectable);
                    return;
                }
                default: {
                    throw new IllegalArgumentException();
                }
            }
        }
        finally {
            this.writeLock.unlock("stopConnectable");
        }
    }

    public void startTransmitting(RemoteGroupPort remoteGroupPort) {
        this.writeLock.lock();
        try {
            if (this.initialized.get()) {
                remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
            } else {
                this.startRemoteGroupPortsAfterInitialization.add(remoteGroupPort);
            }
        }
        finally {
            this.writeLock.unlock("startTransmitting");
        }
    }

    public void stopTransmitting(RemoteGroupPort remoteGroupPort) {
        this.writeLock.lock();
        try {
            if (this.initialized.get()) {
                remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
            } else {
                this.startRemoteGroupPortsAfterInitialization.remove(remoteGroupPort);
            }
        }
        finally {
            this.writeLock.unlock("stopTransmitting");
        }
    }

    public void stopProcessor(String parentGroupId, String processorId) {
        ProcessGroup group = this.lookupGroup(parentGroupId);
        ProcessorNode node = group.getProcessor(processorId);
        if (node == null) {
            throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
        }
        group.stopProcessor(node);
        this.startConnectablesAfterInitialization.remove(node);
    }

    public void stopGroup(String parentGroupId, String groupId) {
        ProcessGroup parent = this.lookupGroup(parentGroupId);
        ProcessGroup group = parent.getProcessGroup(groupId);
        if (group == null) {
            throw new IllegalStateException("Cannot find ProcessGroup with ID " + groupId + " within ProcessGroup with ID " + parentGroupId);
        }
        group.stopProcessing();
        this.startGroupsAfterInitialization.remove(group);
    }

    public void startReportingTask(ReportingTaskNode reportingTaskNode) {
        if (this.isTerminated()) {
            throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
        }
        reportingTaskNode.verifyCanStart();
        reportingTaskNode.reloadAdditionalResourcesIfNecessary();
        this.processScheduler.schedule(reportingTaskNode);
    }

    public void stopReportingTask(ReportingTaskNode reportingTaskNode) {
        if (this.isTerminated()) {
            return;
        }
        reportingTaskNode.verifyCanStop();
        this.processScheduler.unschedule(reportingTaskNode);
    }

    public FlowManager getFlowManager() {
        return this.flowManager;
    }

    public GarbageCollectionLog getGarbageCollectionLog() {
        return this.gcLog;
    }

    public RepositoryContextFactory getRepositoryContextFactory() {
        return this.repositoryContextFactory;
    }

    public ClusterCoordinator getClusterCoordinator() {
        return this.clusterCoordinator;
    }

    public Connection createConnection(final String id, String name, Connectable source, Connectable destination, Collection<String> relationshipNames) {
        StandardConnection.Builder builder = new StandardConnection.Builder((ProcessScheduler)this.processScheduler);
        ArrayList<Relationship> relationships = new ArrayList<Relationship>();
        for (String relationshipName : Objects.requireNonNull(relationshipNames)) {
            relationships.add(new Relationship.Builder().name(relationshipName).build());
        }
        final FlowFileSwapManager swapManager = this.createSwapManager();
        final EventReporter eventReporter = this.createEventReporter();
        try (NarCloseable narCloseable = NarCloseable.withNarLoader();){
            SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext(){

                public ResourceClaimManager getResourceClaimManager() {
                    return FlowController.this.resourceClaimManager;
                }

                public FlowFileRepository getFlowFileRepository() {
                    return FlowController.this.flowFileRepository;
                }

                public EventReporter getEventReporter() {
                    return eventReporter;
                }
            };
            swapManager.initialize(initializationContext);
        }
        FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory(){

            public FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ProcessGroup processGroup) {
                AbstractFlowFileQueue flowFileQueue;
                if (FlowController.this.clusterCoordinator == null) {
                    flowFileQueue = new StandardFlowFileQueue(id, FlowController.this.flowFileRepository, (ProvenanceEventRepository)FlowController.this.provenanceRepository, FlowController.this.resourceClaimManager, FlowController.this.processScheduler, swapManager, eventReporter, FlowController.this.nifiProperties.getQueueSwapThreshold(), processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
                } else {
                    flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, FlowController.this.processScheduler, FlowController.this.flowFileRepository, (ProvenanceEventRepository)FlowController.this.provenanceRepository, FlowController.this.contentRepository, FlowController.this.resourceClaimManager, FlowController.this.clusterCoordinator, FlowController.this.loadBalanceClientRegistry, swapManager, FlowController.this.nifiProperties.getQueueSwapThreshold(), eventReporter);
                    flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
                    flowFileQueue.setBackPressureObjectThreshold(processGroup.getDefaultBackPressureObjectThreshold());
                    flowFileQueue.setBackPressureDataSizeThreshold(processGroup.getDefaultBackPressureDataSizeThreshold());
                }
                return flowFileQueue;
            }
        };
        StandardConnection connection = builder.id(Objects.requireNonNull(id).intern()).name(name == null ? null : name.intern()).processGroup(destination.getProcessGroup()).relationships(relationships).source(Objects.requireNonNull(source)).destination(destination).flowFileQueueFactory(flowFileQueueFactory).build();
        return connection;
    }

    public ReportingTaskNode getReportingTaskNode(String identifier) {
        return this.flowManager.getReportingTaskNode(identifier);
    }

    public ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded) {
        return this.flowManager.createReportingTask(type, id, bundleCoordinate, firstTimeAdded);
    }

    public Set<ReportingTaskNode> getAllReportingTasks() {
        return this.flowManager.getAllReportingTasks();
    }

    public void removeReportingTask(ReportingTaskNode reportingTaskNode) {
        this.flowManager.removeReportingTask(reportingTaskNode);
    }

    public ControllerServiceProvider getControllerServiceProvider() {
        return this.controllerServiceProvider;
    }

    public ControllerServiceResolver getControllerServiceResolver() {
        return this.controllerServiceResolver;
    }

    public PythonBridge getPythonBridge() {
        return this.pythonBridge;
    }

    public ProvenanceAuthorizableFactory getProvenanceAuthorizableFactory() {
        return this.provenanceAuthorizableFactory;
    }

    public void enableReportingTask(ReportingTaskNode reportingTaskNode) {
        reportingTaskNode.verifyCanEnable();
        reportingTaskNode.reloadAdditionalResourcesIfNecessary();
        this.processScheduler.enableReportingTask(reportingTaskNode);
    }

    public void disableReportingTask(ReportingTaskNode reportingTaskNode) {
        reportingTaskNode.verifyCanDisable();
        this.processScheduler.disableReportingTask(reportingTaskNode);
    }

    public List<Counter> getCounters() {
        ArrayList<Counter> counters = new ArrayList<Counter>();
        CounterRepository counterRepo = this.counterRepositoryRef.get();
        counters.addAll(counterRepo.getCounters());
        return counters;
    }

    public CounterRepository getCounterRepository() {
        return this.counterRepositoryRef.get();
    }

    public Counter resetCounter(String identifier) {
        CounterRepository counterRepo = this.counterRepositoryRef.get();
        Counter resetValue = counterRepo.resetCounter(identifier);
        return resetValue;
    }

    public GroupStatusCounts getGroupStatusCounts(ProcessGroup group) {
        return new GroupStatusCounts(group);
    }

    public int getActiveThreadCount() {
        return this.timerDrivenEngineRef.get().getActiveCount();
    }

    public void startHeartbeating() throws IllegalStateException {
        if (!this.isConfiguredForClustering()) {
            throw new IllegalStateException("Unable to start heartbeating because heartbeating is not configured.");
        }
        this.writeLock.lock();
        try {
            this.stopHeartbeating();
            HeartbeatSendTask sendTask = new HeartbeatSendTask();
            this.heartbeatSendTask.set(sendTask);
            this.heartbeatSenderFuture = this.clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0L, this.heartbeatDelaySeconds, TimeUnit.SECONDS);
        }
        finally {
            this.writeLock.unlock("startHeartbeating");
        }
    }

    public void suspendHeartbeats() {
        this.heartbeatsSuspended.set(true);
    }

    public void resumeHeartbeats() {
        this.heartbeatsSuspended.set(false);
    }

    public void stopHeartbeating() throws IllegalStateException {
        if (!this.isConfiguredForClustering()) {
            throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
        }
        LOG.info("Will no longer send heartbeats");
        this.writeLock.lock();
        try {
            if (!this.isHeartbeating()) {
                return;
            }
            if (this.heartbeatSenderFuture != null) {
                LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator");
                this.heartbeatSenderFuture.cancel(false);
            }
        }
        finally {
            this.writeLock.unlock("stopHeartbeating");
        }
    }

    public boolean isHeartbeating() {
        this.readLock.lock();
        try {
            boolean bl = this.heartbeatSenderFuture != null && !this.heartbeatSenderFuture.isCancelled();
            return bl;
        }
        finally {
            this.readLock.unlock("isHeartbeating");
        }
    }

    public int getHeartbeatDelaySeconds() {
        this.readLock.lock();
        try {
            int n = this.heartbeatDelaySeconds;
            return n;
        }
        finally {
            this.readLock.unlock("getHeartbeatDelaySeconds");
        }
    }

    public NodeIdentifier getNodeId() {
        return this.nodeId;
    }

    public void setNodeId(NodeIdentifier nodeId) {
        this.nodeId = nodeId;
    }

    public boolean isClustered() {
        this.readLock.lock();
        try {
            boolean bl = this.clustered;
            return bl;
        }
        finally {
            this.readLock.unlock("isClustered");
        }
    }

    public Set<String> getClusterMembers() {
        if (this.isClustered()) {
            return this.clusterCoordinator.getConnectionStatuses().stream().map(s -> s.getNodeIdentifier().getApiAddress()).collect(Collectors.toSet());
        }
        return Collections.emptySet();
    }

    public Optional<String> getCurrentNode() {
        if (this.isClustered() && this.getNodeId() != null) {
            return Optional.of(this.getNodeId().getApiAddress());
        }
        return Optional.empty();
    }

    public boolean isConfiguredForClustering() {
        return this.configuredForClustering;
    }

    void registerForClusterCoordinator(boolean participate) {
        final String participantId = participate ? this.heartbeatMonitor.getHeartbeatAddress() : null;
        this.leaderElectionManager.register("Cluster Coordinator", new LeaderElectionStateChangeListener(){

            public synchronized void onStopLeading() {
                LOG.info("This node is no longer the elected Active {}", (Object)"Cluster Coordinator");
                String message = String.format("%s is no longer the elected Active %s", participantId, "Cluster Coordinator");
                FlowController.this.bulletinRepository.addBulletin(BulletinFactory.createBulletin((String)"Cluster Coordinator", (String)Severity.INFO.name(), (String)message));
            }

            public synchronized void onStartLeading() {
                LOG.info("This node has been elected Active {}", (Object)"Cluster Coordinator");
                String message = String.format("%s has been elected Active %s", participantId, "Cluster Coordinator");
                FlowController.this.bulletinRepository.addBulletin(BulletinFactory.createBulletin((String)"Cluster Coordinator", (String)Severity.INFO.name(), (String)message));
                FlowController.this.heartbeatMonitor.purgeHeartbeats();
            }
        }, participantId);
    }

    void registerForPrimaryNode() {
        String participantId = this.heartbeatMonitor.getHeartbeatAddress();
        this.leaderElectionManager.register("Primary Node", new LeaderElectionStateChangeListener(){

            public void onStartLeading() {
                FlowController.this.setPrimary(true);
            }

            public void onStopLeading() {
                FlowController.this.setPrimary(false);
            }
        }, participantId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setClustered(boolean clustered, String clusterInstanceId) {
        this.writeLock.lock();
        try {
            boolean isChanging = false;
            if (this.clustered != clustered) {
                isChanging = true;
                if (clustered) {
                    LOG.info("Cluster State changed from Not Clustered to Clustered");
                } else {
                    LOG.info("Cluster State changed from Clustered to Not Clustered");
                }
            }
            this.clustered = clustered;
            if (clusterInstanceId != null) {
                this.instanceId = clusterInstanceId;
            }
            if (isChanging) {
                if (clustered) {
                    this.onClusterConnect();
                    this.leaderElectionManager.start();
                    this.stateManagerProvider.enableClusterProvider();
                    this.loadBalanceClientRegistry.start();
                    this.heartbeat();
                } else {
                    this.stateManagerProvider.disableClusterProvider();
                    this.setPrimary(false);
                }
                List remoteGroups = this.flowManager.getRootGroup().findAllRemoteProcessGroups();
                for (RemoteProcessGroup remoteGroup : remoteGroups) {
                    remoteGroup.reinitialize(clustered);
                }
            }
            if (!clustered) {
                this.onClusterDisconnect();
            }
            this.heartbeatBeanRef.set(new HeartbeatBean(this.flowManager.getRootGroup(), this.isPrimary()));
        }
        finally {
            this.writeLock.unlock("setClustered");
        }
    }

    public void onClusterConnect() {
        this.registerForPrimaryNode();
        this.registerForClusterCoordinator(true);
        this.resumeHeartbeats();
    }

    public void onClusterDisconnect() {
        try {
            this.leaderElectionManager.unregister("Primary Node");
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister this node as a Primary Node candidate", (Throwable)e);
        }
        try {
            this.leaderElectionManager.unregister("Cluster Coordinator");
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister this node as a Cluster Coordinator candidate", (Throwable)e);
        }
    }

    public LeaderElectionManager getLeaderElectionManager() {
        return this.leaderElectionManager;
    }

    public boolean isPrimary() {
        return this.isClustered() && this.leaderElectionManager != null && this.leaderElectionManager.isLeader("Primary Node");
    }

    public boolean isClusterCoordinator() {
        return this.isClustered() && this.leaderElectionManager != null && this.leaderElectionManager.isLeader("Cluster Coordinator");
    }

    public void setPrimary(boolean primary) {
        PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
        ProcessGroup rootGroup = this.flowManager.getRootGroup();
        for (ProcessorNode procNode : rootGroup.findAllProcessors()) {
            this.processScheduler.submitFrameworkTask(() -> this.processScheduler.notifyPrimaryNodeStateChange(procNode, nodeState));
        }
        for (ControllerServiceNode serviceNode : this.flowManager.getAllControllerServices()) {
            this.processScheduler.submitFrameworkTask(() -> this.processScheduler.notifyPrimaryNodeStateChange(serviceNode, nodeState));
        }
        for (ReportingTaskNode reportingTaskNode : this.getAllReportingTasks()) {
            this.processScheduler.submitFrameworkTask(() -> this.processScheduler.notifyPrimaryNodeStateChange(reportingTaskNode, nodeState));
        }
        HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
        if (oldBean == null || oldBean.isPrimary() != primary) {
            String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
            Bulletin bulletin = BulletinFactory.createBulletin((String)"Primary Node", (String)Severity.INFO.name(), (String)message);
            this.bulletinRepository.addBulletin(bulletin);
            LOG.info(message);
        }
    }

    static boolean areEqual(String a, String b) {
        if (a == null && b == null) {
            return true;
        }
        if (a == b) {
            return true;
        }
        if (a == null || b == null) {
            return false;
        }
        return a.equals(b);
    }

    static boolean areEqual(Long a, Long b) {
        if (a == null && b == null) {
            return true;
        }
        if (a == b) {
            return true;
        }
        if (a == null || b == null) {
            return false;
        }
        return a.compareTo(b) == 0;
    }

    public ContentAvailability getContentAvailability(final ProvenanceEventRecord event) {
        final String replayFailure = this.getReplayFailureReason(event);
        return new ContentAvailability(){

            public String getReasonNotReplayable() {
                return replayFailure;
            }

            public boolean isContentSame() {
                return FlowController.areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer()) && FlowController.areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection()) && FlowController.areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier()) && FlowController.areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset()) && FlowController.areEqual(event.getPreviousFileSize(), event.getFileSize());
            }

            public boolean isInputAvailable() {
                try {
                    return FlowController.this.contentRepository.isAccessible(this.createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
                }
                catch (IOException e) {
                    return false;
                }
            }

            public boolean isOutputAvailable() {
                try {
                    return FlowController.this.contentRepository.isAccessible(this.createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier(), event.getContentClaimOffset()));
                }
                catch (IOException e) {
                    return false;
                }
            }

            private ContentClaim createClaim(String container, String section, String identifier, Long offset) {
                if (container == null || section == null || identifier == null) {
                    return null;
                }
                ResourceClaim resourceClaim = FlowController.this.resourceClaimManager.newResourceClaim(container, section, identifier, false, false);
                return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset);
            }

            public boolean isReplayable() {
                return replayFailure == null;
            }
        };
    }

    public InputStream getContent(ProvenanceEventRecord provEvent, ContentDirection direction, String requestor, String requestUri) throws IOException {
        long size;
        long offset;
        StandardContentClaim claim;
        Objects.requireNonNull(provEvent);
        Objects.requireNonNull(direction);
        Objects.requireNonNull(requestor);
        Objects.requireNonNull(requestUri);
        if (direction == ContentDirection.INPUT) {
            if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
                throw new IllegalArgumentException("Input Content Claim not specified");
            }
            resourceClaim = this.resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false, false);
            claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset().longValue());
            offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
            size = provEvent.getPreviousFileSize();
        } else {
            if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
                throw new IllegalArgumentException("Output Content Claim not specified");
            }
            resourceClaim = this.resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false, false);
            claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset().longValue());
            offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
            size = provEvent.getFileSize();
        }
        InputStream rawStream = this.contentRepository.read((ContentClaim)claim);
        ResourceClaim resourceClaim = claim.getResourceClaim();
        StandardProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder().setEventType(ProvenanceEventType.DOWNLOAD).setFlowFileUUID(provEvent.getFlowFileUuid()).setAttributes(provEvent.getAttributes(), Collections.emptyMap()).setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(offset), size).setTransitUri(requestUri).setEventTime(System.currentTimeMillis()).setFlowFileEntryDate(provEvent.getFlowFileEntryDate()).setLineageStartDate(provEvent.getLineageStartDate()).setComponentType(this.flowManager.getRootGroup().getName()).setComponentId(this.flowManager.getRootGroupId()).setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId()).build();
        this.provenanceRepository.registerEvent((ProvenanceEventRecord)sendEvent);
        return new LimitedInputStream(rawStream, size);
    }

    public InputStream getContent(FlowFileRecord flowFile, String requestor, String requestUri) throws IOException {
        InputStream stream;
        ResourceClaim resourceClaim;
        Objects.requireNonNull(flowFile);
        Objects.requireNonNull(requestor);
        Objects.requireNonNull(requestUri);
        ContentClaim contentClaim = flowFile.getContentClaim();
        if (contentClaim == null) {
            resourceClaim = null;
            stream = new ByteArrayInputStream(new byte[0]);
        } else {
            resourceClaim = flowFile.getContentClaim().getResourceClaim();
            stream = this.contentRepository.read(flowFile.getContentClaim());
            long contentClaimOffset = flowFile.getContentClaimOffset();
            if (contentClaimOffset > 0L) {
                StreamUtils.skip((InputStream)stream, (long)contentClaimOffset);
            }
            stream = new LimitingInputStream(stream, flowFile.getSize());
        }
        StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder().setEventType(ProvenanceEventType.DOWNLOAD).setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key())).setAttributes(flowFile.getAttributes(), Collections.emptyMap()).setTransitUri(requestUri).setEventTime(System.currentTimeMillis()).setFlowFileEntryDate(flowFile.getEntryDate()).setLineageStartDate(flowFile.getLineageStartDate()).setComponentType(this.flowManager.getRootGroup().getName()).setComponentId(this.flowManager.getRootGroupId()).setDetails("Download of Content requested by " + requestor + " for " + String.valueOf(flowFile));
        if (contentClaim != null) {
            sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
        }
        StandardProvenanceEventRecord sendEvent = sendEventBuilder.build();
        this.provenanceRepository.registerEvent((ProvenanceEventRecord)sendEvent);
        return stream;
    }

    private int countNulls(Object ... values) {
        int nullCount = 0;
        for (Object value : values) {
            if (value != null) continue;
            ++nullCount;
        }
        return nullCount;
    }

    private String getReplayFailureReason(ProvenanceEventRecord event) {
        String contentClaimContainer;
        String contentClaimSection;
        String contentClaimId;
        ProvenanceEventType type = event.getEventType();
        if (type == ProvenanceEventType.JOIN) {
            return "Cannot replay events that are created from multiple parents";
        }
        Long contentSize = event.getPreviousFileSize();
        int nullCount = this.countNulls(contentSize, contentClaimId = event.getPreviousContentClaimIdentifier(), contentClaimSection = event.getPreviousContentClaimSection(), contentClaimContainer = event.getPreviousContentClaimContainer());
        if (nullCount > 0 && nullCount < 4) {
            return "Cannot replay data from Provenance Event because the event does not contain the required Content Claim";
        }
        if (nullCount == 0) {
            try {
                ResourceClaim resourceClaim = this.resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
                StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset().longValue());
                if (!this.contentRepository.isAccessible((ContentClaim)contentClaim)) {
                    return "Content is no longer available in Content Repository";
                }
            }
            catch (IOException ioe) {
                return "Failed to determine whether or not content was available in Content Repository due to " + ioe.toString();
            }
        }
        if (event.getSourceQueueIdentifier() == null) {
            return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
        }
        Connection connection = this.flowManager.getConnection(event.getSourceQueueIdentifier());
        if (connection == null) {
            return "Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists";
        }
        return null;
    }

    public ProvenanceEventRecord replayFlowFile(long provenanceEventRecordId, NiFiUser user) throws IOException {
        ProvenanceEventRecord record = this.provenanceRepository.getEvent(provenanceEventRecordId, user);
        if (record == null) {
            throw new IllegalStateException("Cannot find Provenance Event with ID " + provenanceEventRecordId);
        }
        return this.replayFlowFile(record, user);
    }

    public ProvenanceEventRecord replayFlowFile(ProvenanceEventRecord event, NiFiUser user) throws IOException {
        StandardContentClaim contentClaim;
        int nullCount;
        if (event == null) {
            throw new NullPointerException();
        }
        ProvenanceEventType type = event.getEventType();
        if (type == ProvenanceEventType.JOIN) {
            throw new IllegalArgumentException("Cannot replay events that are created from multiple parents");
        }
        boolean usePrevious = true;
        Long contentSize = event.getPreviousFileSize();
        String contentClaimId = event.getPreviousContentClaimIdentifier();
        String contentClaimSection = event.getPreviousContentClaimSection();
        String contentClaimContainer = event.getPreviousContentClaimContainer();
        Long contentClaimOffset = event.getPreviousContentClaimOffset();
        int previousClaimNulls = this.countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer);
        if (previousClaimNulls == 4) {
            contentClaimId = event.getContentClaimIdentifier();
            contentClaimSection = event.getContentClaimSection();
            contentClaimContainer = event.getContentClaimContainer();
            int currentClaimNullCounts = this.countNulls(contentClaimId, contentClaimSection, contentClaimContainer);
            boolean bl = usePrevious = currentClaimNullCounts == 3;
            if (!usePrevious) {
                contentSize = event.getFileSize();
                contentClaimOffset = event.getContentClaimOffset();
            }
        }
        if ((nullCount = this.countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer)) > 0 && nullCount < 4) {
            throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not contain the required Content Claim");
        }
        if (event.getSourceQueueIdentifier() == null) {
            throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
        }
        Connection connection = this.flowManager.getConnection(event.getSourceQueueIdentifier());
        if (connection == null) {
            throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists");
        }
        if (contentClaimContainer == null) {
            contentClaim = null;
        } else {
            ResourceClaim resourceClaim = this.resourceClaimManager.getResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId);
            if (resourceClaim == null) {
                resourceClaim = this.resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
            }
            this.resourceClaimManager.incrementClaimantCount(resourceClaim);
            long claimOffset = contentClaimOffset == null ? 0L : contentClaimOffset;
            contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
            contentClaim.setLength(contentSize == null ? -1L : contentSize);
            if (!this.contentRepository.isAccessible((ContentClaim)contentClaim)) {
                this.resourceClaimManager.decrementClaimantCount(resourceClaim);
                throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
            }
        }
        String parentUUID = event.getFlowFileUuid();
        String newFlowFileUUID = UUID.randomUUID().toString();
        FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder().addAttributes(usePrevious ? event.getPreviousAttributes() : event.getAttributes()).contentClaim(contentClaim).contentClaimOffset(0L).entryDate(System.currentTimeMillis()).id(this.flowFileRepository.getNextFlowFileSequence()).lineageStart(event.getLineageStartDate(), 0L).size(Optional.ofNullable(contentSize).orElse(0L).longValue()).addAttribute("flowfile.replay", "true").addAttribute("flowfile.replay.timestamp", String.valueOf(new Date())).addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID).removeAttributes(new String[]{CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key()}).build();
        StandardProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder().setEventType(ProvenanceEventType.REPLAY).addChildUuid(newFlowFileUUID).addParentUuid(parentUUID).setFlowFileUUID(parentUUID).setAttributes(Collections.emptyMap(), flowFileRecord.getAttributes()).setCurrentContentClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize()).setDetails("Replay requested by " + user.getIdentity()).setEventTime(System.currentTimeMillis()).setFlowFileEntryDate(System.currentTimeMillis()).setLineageStartDate(event.getLineageStartDate()).setComponentType(event.getComponentType()).setComponentId(event.getComponentId()).setSourceQueueIdentifier(event.getSourceQueueIdentifier()).build();
        this.provenanceRepository.registerEvent((ProvenanceEventRecord)replayEvent);
        FlowFileQueue queue = connection.getFlowFileQueue();
        StandardRepositoryRecord record = new StandardRepositoryRecord(queue);
        record.setWorking(flowFileRecord, false);
        record.setDestination(queue);
        this.flowFileRepository.updateRepository(Collections.singleton(record));
        queue.put(flowFileRecord);
        return replayEvent;
    }

    public ResourceClaimManager getResourceClaimManager() {
        return this.resourceClaimManager;
    }

    public boolean isConnected() {
        this.rwLock.readLock().lock();
        try {
            boolean bl = this.connectionStatus != null && this.connectionStatus.getState() == NodeConnectionState.CONNECTED;
            return bl;
        }
        finally {
            this.rwLock.readLock().unlock();
        }
    }

    public void setConnectionStatus(NodeConnectionStatus connectionStatus) {
        this.rwLock.writeLock().lock();
        try {
            this.connectionStatus = connectionStatus;
            this.heartbeatBeanRef.set(new HeartbeatBean(this.flowManager.getRootGroup(), this.isPrimary()));
        }
        finally {
            this.rwLock.writeLock().unlock();
        }
    }

    public void heartbeat() {
        if (!this.isClustered()) {
            return;
        }
        if (this.shutdown) {
            return;
        }
        HeartbeatSendTask task = this.heartbeatSendTask.get();
        if (task != null) {
            this.clusterTaskExecutor.submit(task);
        }
    }

    HeartbeatMessage createHeartbeatMessage() {
        try {
            HeartbeatBean bean = this.heartbeatBeanRef.get();
            if (bean == null) {
                bean = new HeartbeatBean(this.flowManager.getRootGroup(), this.isPrimary());
            }
            HeartbeatPayload hbPayload = new HeartbeatPayload();
            hbPayload.setSystemStartTime(this.systemStartTime);
            hbPayload.setActiveThreadCount(this.getActiveThreadCount());
            hbPayload.setRevisionUpdateCount(this.revisionManager.getRevisionUpdateCount());
            QueueSize queueSize = bean.getRootGroup().getQueueSize();
            hbPayload.setTotalFlowFileCount((long)queueSize.getObjectCount());
            hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
            hbPayload.setClusterStatus(this.clusterCoordinator.getConnectionStatuses());
            NodeIdentifier nodeId = this.getNodeId();
            if (nodeId == null) {
                LOG.warn("Cannot create Heartbeat Message because node's identifier is not known at this time");
                return null;
            }
            Heartbeat heartbeat = new Heartbeat(nodeId, this.connectionStatus, hbPayload.marshal());
            HeartbeatMessage message = new HeartbeatMessage();
            message.setHeartbeat(heartbeat);
            LOG.debug("Generated heartbeat");
            return message;
        }
        catch (Throwable ex) {
            LOG.warn("Failed to create heartbeat", ex);
            return null;
        }
    }

    private void updateRemoteProcessGroups() {
        List remoteGroups = this.flowManager.getRootGroup().findAllRemoteProcessGroups();
        for (RemoteProcessGroup remoteGroup : remoteGroups) {
            try {
                remoteGroup.refreshFlowContents();
            }
            catch (CommunicationsException e) {
                LOG.warn("Unable to communicate with remote instance {}", (Object)remoteGroup, (Object)e);
            }
        }
    }

    public int getPerformanceTrackingPercentage() {
        return this.nifiProperties.getPerformanceMetricTrackingPercentage();
    }

    public Integer getRemoteSiteListeningPort() {
        return this.remoteInputSocketPort;
    }

    public Integer getRemoteSiteListeningHttpPort() {
        return this.remoteInputHttpPort;
    }

    public Boolean isRemoteSiteCommsSecure() {
        return this.isSiteToSiteSecure;
    }

    public StandardProcessScheduler getProcessScheduler() {
        return this.processScheduler;
    }

    public long getBoredYieldDuration(TimeUnit timeUnit) {
        return (long)FormatUtils.getPreciseTimeDuration((String)this.nifiProperties.getBoredYieldDuration(), (TimeUnit)timeUnit);
    }

    public AuditService getAuditService() {
        return this.auditService;
    }

    public ProvenanceRepository getProvenanceRepository() {
        return this.provenanceRepository;
    }

    public StatusHistoryDTO getConnectionStatusHistory(String connectionId) {
        return this.getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE);
    }

    public StatusHistoryDTO getConnectionStatusHistory(String connectionId, Date startTime, Date endTime, int preferredDataPoints) {
        return StatusHistoryUtil.createStatusHistoryDTO(this.statusHistoryRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints));
    }

    public StatusHistoryDTO getProcessorStatusHistory(String processorId, boolean includeCounters) {
        return this.getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters);
    }

    public StatusHistoryDTO getProcessorStatusHistory(String processorId, Date startTime, Date endTime, int preferredDataPoints, boolean includeCounters) {
        return StatusHistoryUtil.createStatusHistoryDTO(this.statusHistoryRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters));
    }

    public StatusHistoryDTO getProcessGroupStatusHistory(String processGroupId) {
        return this.getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE);
    }

    public StatusHistoryDTO getProcessGroupStatusHistory(String processGroupId, Date startTime, Date endTime, int preferredDataPoints) {
        return StatusHistoryUtil.createStatusHistoryDTO(this.statusHistoryRepository.getProcessGroupStatusHistory(processGroupId, startTime, endTime, preferredDataPoints));
    }

    public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String remoteGroupId) {
        return this.getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE);
    }

    public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String remoteGroupId, Date startTime, Date endTime, int preferredDataPoints) {
        return StatusHistoryUtil.createStatusHistoryDTO(this.statusHistoryRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startTime, endTime, preferredDataPoints));
    }

    public StatusHistoryDTO getNodeStatusHistory() {
        return StatusHistoryUtil.createStatusHistoryDTO(this.statusHistoryRepository.getNodeStatusHistory(null, null));
    }

    private NodeStatus getNodeStatusSnapshot() {
        SystemDiagnostics systemDiagnostics = this.getSystemDiagnostics();
        NodeStatus result = new NodeStatus();
        result.setCreatedAtInMs(systemDiagnostics.getCreationTimestamp());
        result.setFreeHeap(systemDiagnostics.getFreeHeap());
        result.setUsedHeap(systemDiagnostics.getUsedHeap());
        result.setHeapUtilization((long)systemDiagnostics.getHeapUtilization());
        result.setFreeNonHeap(systemDiagnostics.getFreeNonHeap());
        result.setUsedNonHeap(systemDiagnostics.getUsedNonHeap());
        result.setOpenFileHandlers(systemDiagnostics.getOpenFileHandles());
        result.setProcessorLoadAverage(systemDiagnostics.getProcessorLoadAverage().doubleValue());
        result.setTotalThreads((long)systemDiagnostics.getTotalThreads());
        result.setTimerDrivenThreads((long)this.getActiveTimerDrivenThreadCount());
        result.setFlowFileRepositoryFreeSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getFreeSpace());
        result.setFlowFileRepositoryUsedSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getUsedSpace());
        result.setContentRepositories(systemDiagnostics.getContentRepositoryStorageUsage().entrySet().stream().map(e -> FlowController.getStorageStatus(e)).collect(Collectors.toList()));
        result.setProvenanceRepositories(systemDiagnostics.getProvenanceRepositoryStorageUsage().entrySet().stream().map(e -> FlowController.getStorageStatus(e)).collect(Collectors.toList()));
        return result;
    }

    private static StorageStatus getStorageStatus(Map.Entry<String, StorageUsage> storageUsage) {
        StorageStatus result = new StorageStatus();
        result.setName(storageUsage.getKey());
        result.setFreeSpace(storageUsage.getValue().getFreeSpace());
        result.setUsedSpace(storageUsage.getValue().getUsedSpace());
        return result;
    }

    public FlowFileEventRepository getFlowFileEventRepository() {
        return this.flowFileEventRepository;
    }

    private boolean isEncryptionProtocolVersionConfigured(NiFiProperties properties) {
        String version = properties.getProperty("nifi.repository.encryption.protocol.version");
        return Integer.toString(EncryptionProtocol.VERSION_1.getVersionNumber()).equals(version);
    }

    public FlowAnalysisRuleNode createFlowAnalysisRule(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws FlowAnalysisRuleInstantiationException {
        return this.flowManager.createFlowAnalysisRule(type, id, bundleCoordinate, firstTimeAdded);
    }

    public FlowAnalysisRuleNode getFlowAnalysisRuleNode(String identifier) {
        return this.flowManager.getFlowAnalysisRuleNode(identifier);
    }

    public Set<FlowAnalysisRuleNode> getAllFlowAnalysisRules() {
        return this.flowManager.getAllFlowAnalysisRules();
    }

    public void removeFlowAnalysisRule(FlowAnalysisRuleNode flowAnalysisRule) {
        this.flowManager.removeFlowAnalysisRule(flowAnalysisRule);
    }

    public void enableFlowAnalysisRule(FlowAnalysisRuleNode flowAnalysisRule) {
        flowAnalysisRule.verifyCanEnable();
        flowAnalysisRule.reloadAdditionalResourcesIfNecessary();
        flowAnalysisRule.enable();
    }

    public void disableFlowAnalysisRule(FlowAnalysisRuleNode flowAnalysisRule) {
        flowAnalysisRule.verifyCanDisable();
        flowAnalysisRule.disable();
    }

    private static class HeartbeatBean {
        private final ProcessGroup rootGroup;
        private final boolean primary;

        public HeartbeatBean(ProcessGroup rootGroup, boolean primary) {
            this.rootGroup = rootGroup;
            this.primary = primary;
        }

        public ProcessGroup getRootGroup() {
            return this.rootGroup;
        }

        public boolean isPrimary() {
            return this.primary;
        }
    }

    public class GroupStatusCounts {
        private int queuedCount = 0;
        private long queuedContentSize = 0L;
        private int activeThreadCount = 0;
        private int terminatedThreadCount = 0;

        public GroupStatusCounts(ProcessGroup group) {
            this.calculateCounts(group);
        }

        private void calculateCounts(ProcessGroup group) {
            for (Connection connection : group.getConnections()) {
                Connectable destination;
                QueueSize size = connection.getFlowFileQueue().size();
                this.queuedCount += size.getObjectCount();
                this.queuedContentSize += size.getByteCount();
                Connectable source = connection.getSource();
                if (ConnectableType.REMOTE_OUTPUT_PORT.equals((Object)source.getConnectableType())) {
                    RemoteGroupPort remoteOutputPort = (RemoteGroupPort)source;
                    this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(remoteOutputPort);
                }
                if (!ConnectableType.REMOTE_INPUT_PORT.equals((Object)(destination = connection.getDestination()).getConnectableType())) continue;
                RemoteGroupPort remoteInputPort = (RemoteGroupPort)destination;
                this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(remoteInputPort);
            }
            for (ProcessorNode processor : group.getProcessors()) {
                this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(processor);
                this.terminatedThreadCount += processor.getTerminatedThreadCount();
            }
            for (Port port : group.getInputPorts()) {
                this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(port);
            }
            for (Port port : group.getOutputPorts()) {
                this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(port);
            }
            for (Funnel funnel : group.getFunnels()) {
                this.activeThreadCount += FlowController.this.processScheduler.getActiveThreadCount(funnel);
            }
            for (ProcessGroup childGroup : group.getProcessGroups()) {
                this.calculateCounts(childGroup);
            }
        }

        public int getQueuedCount() {
            return this.queuedCount;
        }

        public long getQueuedContentSize() {
            return this.queuedContentSize;
        }

        public int getActiveThreadCount() {
            return this.activeThreadCount;
        }

        public int getTerminatedThreadCount() {
            return this.terminatedThreadCount;
        }
    }

    private class HeartbeatSendTask
    implements Runnable {
        private HeartbeatSendTask() {
        }

        @Override
        public void run() {
            try (NarCloseable narCloseable = NarCloseable.withFrameworkNar();){
                if (FlowController.this.heartbeatsSuspended.get()) {
                    return;
                }
                HeartbeatMessage message = FlowController.this.createHeartbeatMessage();
                if (message == null) {
                    LOG.debug("No heartbeat to send");
                    return;
                }
                FlowController.this.heartbeater.send(message);
            }
            catch (UnknownServiceAddressException usae) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(usae.getMessage());
                }
            }
            catch (Throwable ex) {
                LOG.warn("Failed to send heartbeat", ex);
            }
        }
    }
}

