/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
import org.apache.nifi.authorization.ManagedAuthorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationStrategy;
import org.apache.nifi.controller.MissingBundleException;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SnippetManager;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.StandardFlowSynchronizer;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardFlowConfigurationDAO;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.UserAwareEventAccess;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.nifi.web.revision.RevisionSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardFlowService
implements FlowService,
ProtocolHandler {
    private static final String EVENT_CATEGORY = "Controller";
    private static final String CLUSTER_NODE_CONFIG = "Cluster Node Configuration";
    private static final String NODE_UUID = "Node UUID";
    private final FlowController controller;
    private final FlowConfigurationDAO dao;
    private final int gracefulShutdownSeconds;
    private final boolean autoResumeState;
    private final Authorizer authorizer;
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<Object>(null);
    private final AtomicReference<SaveHolder> saveHolder = new AtomicReference<Object>(null);
    private final ClusterCoordinator clusterCoordinator;
    private final RevisionManager revisionManager;
    private volatile SaveReportingTask saveReportingTask;
    private final NodeProtocolSenderListener senderListener;
    private final boolean configuredForClustering;
    private NodeIdentifier nodeId;
    private boolean firstControllerInitialization = true;
    private final NiFiProperties nifiProperties;
    private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster";
    private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);

    public static StandardFlowService createStandaloneInstance(FlowController controller, NiFiProperties nifiProperties, PropertyEncryptor encryptor, RevisionManager revisionManager, Authorizer authorizer, FlowSerializationStrategy serializationStrategy) throws IOException {
        return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer, serializationStrategy);
    }

    public static StandardFlowService createClusteredInstance(FlowController controller, NiFiProperties nifiProperties, NodeProtocolSenderListener senderListener, ClusterCoordinator coordinator, PropertyEncryptor encryptor, RevisionManager revisionManager, Authorizer authorizer) throws IOException {
        return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer, FlowSerializationStrategy.WRITE_XML_AND_JSON);
    }

    private StandardFlowService(FlowController controller, NiFiProperties nifiProperties, NodeProtocolSenderListener senderListener, PropertyEncryptor encryptor, boolean configuredForClustering, ClusterCoordinator clusterCoordinator, RevisionManager revisionManager, Authorizer authorizer, FlowSerializationStrategy serializationStrategy) throws IOException {
        this.nifiProperties = nifiProperties;
        this.controller = controller;
        this.gracefulShutdownSeconds = (int)FormatUtils.getTimeDuration((String)nifiProperties.getProperty("nifi.flowcontroller.graceful.shutdown.period"), (TimeUnit)TimeUnit.SECONDS);
        this.autoResumeState = nifiProperties.getAutoResumeState();
        this.dao = new StandardFlowConfigurationDAO(encryptor, nifiProperties, controller.getExtensionManager(), serializationStrategy);
        this.clusterCoordinator = clusterCoordinator;
        if (clusterCoordinator != null) {
            clusterCoordinator.setFlowService((FlowService)this);
        }
        this.revisionManager = revisionManager;
        this.authorizer = authorizer;
        if (configuredForClustering) {
            this.configuredForClustering = configuredForClustering;
            this.senderListener = senderListener;
            senderListener.addHandler((ProtocolHandler)this);
            InetSocketAddress nodeApiAddress = nifiProperties.getNodeApiAddress();
            InetSocketAddress nodeSocketAddress = nifiProperties.getClusterNodeProtocolAddress();
            InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
            String nodeUuid = null;
            StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG);
            if (stateManager != null) {
                nodeUuid = stateManager.getState(Scope.LOCAL).get(NODE_UUID);
            }
            if (nodeUuid == null) {
                nodeUuid = UUID.randomUUID().toString();
            }
            this.nodeId = new NodeIdentifier(nodeUuid, nodeApiAddress.getHostName(), nodeApiAddress.getPort(), nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), nifiProperties.getRemoteInputHost(), nifiProperties.getRemoteInputPort(), nifiProperties.getRemoteInputHttpPort(), nifiProperties.isSiteToSiteSecure().booleanValue());
        } else {
            this.configuredForClustering = false;
            this.senderListener = null;
        }
    }

    public void saveFlowChanges() throws IOException {
        this.writeLock.lock();
        try {
            this.dao.save(this.controller);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void saveFlowChanges(TimeUnit delayUnit, long delay) {
        boolean archiveEnabled = this.nifiProperties.isFlowConfigurationArchiveEnabled();
        this.saveFlowChanges(delayUnit, delay, archiveEnabled);
    }

    public void saveFlowChanges(TimeUnit delayUnit, long delay, boolean archive) {
        Calendar saveTime = Calendar.getInstance();
        long delayInMs = TimeUnit.MILLISECONDS.convert(delay, delayUnit);
        int finalDelayMs = 500;
        if (delayInMs <= Integer.MAX_VALUE) {
            finalDelayMs = (int)delayInMs;
        }
        saveTime.add(14, finalDelayMs);
        if (logger.isTraceEnabled()) {
            logger.trace(" A request to save the flow has been made with delay {} for time {}", (Object)finalDelayMs, (Object)saveTime.getTime());
        }
        this.saveHolder.set(new SaveHolder(saveTime, archive));
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void start() throws LifeCycleStartException {
        this.writeLock.lock();
        try {
            if (this.isRunning()) {
                return;
            }
            this.running.set(true);
            FlowEngine newExecutor = new FlowEngine(2, "Flow Service Tasks");
            this.saveReportingTask = new SaveReportingTask();
            newExecutor.scheduleWithFixedDelay(this.saveReportingTask, 0L, 500L, TimeUnit.MILLISECONDS);
            this.executor.set((ScheduledExecutorService)newExecutor);
            if (this.configuredForClustering) {
                this.senderListener.start();
            }
        }
        catch (IOException ioe) {
            try {
                this.stop(true);
            }
            catch (Exception exception) {
                // empty catch block
            }
            throw new LifeCycleStartException("Failed to start Flow Service due to: " + ioe, (Throwable)ioe);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void stop(boolean force) {
        ScheduledExecutorService executorService;
        this.writeLock.lock();
        try {
            if (!this.isRunning()) {
                return;
            }
            this.running.set(false);
            if (this.clusterCoordinator != null) {
                try {
                    this.clusterCoordinator.shutdown();
                }
                catch (Throwable t) {
                    logger.error("Failed to properly shutdown coordinator", t);
                }
            }
            if (!this.controller.isTerminated()) {
                this.controller.shutdown(force);
            }
            if (this.configuredForClustering && this.senderListener != null) {
                try {
                    this.senderListener.stop();
                }
                catch (IOException ioe) {
                    logger.warn("Protocol sender/listener did not stop gracefully due to: " + ioe);
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
        if ((executorService = this.executor.get()) != null) {
            boolean graceful;
            if (force) {
                executorService.shutdownNow();
            } else {
                executorService.shutdown();
            }
            try {
                graceful = executorService.awaitTermination(this.gracefulShutdownSeconds, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                graceful = false;
            }
            if (!graceful) {
                logger.warn("Scheduling service did not gracefully shutdown within configured " + this.gracefulShutdownSeconds + " second window");
            }
        }
        this.saveReportingTask.run();
    }

    public boolean canHandle(ProtocolMessage msg) {
        switch (msg.getType()) {
            case RECONNECTION_REQUEST: 
            case OFFLOAD_REQUEST: 
            case DISCONNECTION_REQUEST: 
            case FLOW_REQUEST: {
                return true;
            }
        }
        return false;
    }

    public ProtocolMessage handle(final ProtocolMessage request, Set<String> nodeIdentities) throws ProtocolException {
        long startNanos = System.nanoTime();
        try {
            switch (request.getType()) {
                case FLOW_REQUEST: {
                    FlowResponseMessage flowResponseMessage = this.handleFlowRequest((FlowRequestMessage)request);
                    return flowResponseMessage;
                }
                case RECONNECTION_REQUEST: {
                    this.controller.suspendHeartbeats();
                    Thread t = new Thread(new Runnable(){

                        @Override
                        public void run() {
                            StandardFlowService.this.handleReconnectionRequest((ReconnectionRequestMessage)request);
                        }
                    }, "Reconnect to Cluster");
                    t.setDaemon(true);
                    t.start();
                    ReconnectionResponseMessage reconnectionResponseMessage = new ReconnectionResponseMessage();
                    return reconnectionResponseMessage;
                }
                case OFFLOAD_REQUEST: {
                    Thread t = new Thread(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                StandardFlowService.this.handleOffloadRequest((OffloadMessage)request);
                            }
                            catch (InterruptedException e) {
                                throw new ProtocolException("Could not complete offload request", (Throwable)e);
                            }
                        }
                    }, "Offload Flow Files from Node");
                    t.setDaemon(true);
                    t.start();
                    ProtocolMessage protocolMessage = null;
                    return protocolMessage;
                }
                case DISCONNECTION_REQUEST: {
                    Thread t = new Thread(new Runnable(){

                        @Override
                        public void run() {
                            StandardFlowService.this.handleDisconnectionRequest((DisconnectMessage)request);
                        }
                    }, "Disconnect from Cluster");
                    t.setDaemon(true);
                    t.start();
                    ProtocolMessage protocolMessage = null;
                    return protocolMessage;
                }
            }
            throw new ProtocolException("Handler cannot handle message type: " + request.getType());
        }
        finally {
            if (logger.isDebugEnabled()) {
                long procNanos = System.nanoTime() - startNanos;
                long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
                logger.debug("Finished Processing Protocol Message of type {} in {} millis", (Object)request.getType(), (Object)procMillis);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void load(DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
        if (this.configuredForClustering) {
            DataFlow initialFlow;
            Object object = initialFlow = dataFlow == null ? this.createDataFlow() : dataFlow;
            if (logger.isTraceEnabled()) {
                logger.trace("InitialFlow = " + new String(initialFlow.getFlow(), StandardCharsets.UTF_8));
            }
            this.writeLock.lock();
            try {
                this.loadFromBytes(initialFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
            }
            finally {
                this.writeLock.unlock();
            }
            StandardDataFlow proposedFlow = this.createDataFlowFromController();
            if (logger.isTraceEnabled()) {
                logger.trace("ProposedFlow = " + new String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
            }
            boolean localFlowEmpty = StandardFlowSynchronizer.isFlowEmpty((DataFlow)proposedFlow);
            ConnectionResponse response = this.connect(true, localFlowEmpty, (DataFlow)proposedFlow);
            this.writeLock.lock();
            try {
                block19: {
                    if (response == null || response.shouldTryLater()) {
                        logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received.");
                        this.controller.setNodeId(this.nodeId);
                        this.clusterCoordinator.setLocalNodeIdentifier(this.nodeId);
                        this.controller.setClustered(true, null);
                        this.clusterCoordinator.setConnected(false);
                        this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, DisconnectionCode.NOT_YET_CONNECTED));
                        this.controller.startHeartbeating();
                        this.initializeController();
                        try {
                            this.controller.onFlowInitialized(this.autoResumeState);
                        }
                        catch (Exception ex) {
                            logger.warn("Unable to start all processors due to invalid flow configuration.");
                            if (logger.isDebugEnabled()) {
                                logger.warn("", (Throwable)ex);
                            }
                            break block19;
                        }
                    }
                    try {
                        this.loadFromConnectionResponse(response);
                    }
                    catch (Exception e) {
                        logger.error("Failed to load flow from cluster due to: " + e, (Throwable)e);
                        this.handleConnectionFailure(e);
                        throw new IOException(e);
                    }
                }
                this.dao.save(this.controller, true);
            }
            finally {
                this.writeLock.unlock();
            }
        }
        this.writeLock.lock();
        try {
            this.loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
            this.initializeController();
            this.dao.save(this.controller, true);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void handleConnectionFailure(Exception ex) {
        DisconnectionCode disconnectionCode = ex instanceof UninheritableFlowException ? DisconnectionCode.MISMATCHED_FLOWS : (ex instanceof MissingBundleException ? DisconnectionCode.MISSING_BUNDLE : (ex instanceof FlowSynchronizationException ? DisconnectionCode.MISMATCHED_FLOWS : DisconnectionCode.STARTUP_FAILURE));
        this.clusterCoordinator.disconnectionRequestedByNode(this.getNodeId(), disconnectionCode, ex.toString());
        this.controller.setClustered(false, null);
        this.clusterCoordinator.setConnected(false);
    }

    private FlowResponseMessage handleFlowRequest(FlowRequestMessage request) throws ProtocolException {
        this.readLock.lock();
        try {
            logger.info("Received flow request message from cluster coordinator.");
            FlowResponseMessage response = new FlowResponseMessage();
            response.setDataFlow(this.createDataFlowFromController());
            FlowResponseMessage flowResponseMessage = response;
            return flowResponseMessage;
        }
        catch (Exception ex) {
            throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, (Throwable)ex);
        }
        finally {
            this.readLock.unlock();
        }
    }

    private byte[] getAuthorizerFingerprint() {
        boolean isInternalAuthorizer = AuthorizerCapabilityDetection.isManagedAuthorizer((Authorizer)this.authorizer);
        return isInternalAuthorizer ? ((ManagedAuthorizer)this.authorizer).getFingerprint().getBytes(StandardCharsets.UTF_8) : null;
    }

    public StandardDataFlow createDataFlow() throws IOException {
        if (this.dao.isFlowPresent()) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            this.dao.load(baos);
            byte[] bytes = baos.toByteArray();
            byte[] snippetBytes = this.controller.getSnippetManager().export();
            byte[] authorizerFingerprint = this.getAuthorizerFingerprint();
            StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint, new HashSet());
            return fromDisk;
        }
        return this.createDataFlowFromController();
    }

    public StandardDataFlow createDataFlowFromController() throws IOException {
        byte[] snippetBytes = this.controller.getSnippetManager().export();
        byte[] authorizerFingerprint = this.getAuthorizerFingerprint();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        this.dao.save(this.controller, baos);
        byte[] flowBytes = baos.toByteArray();
        baos.reset();
        FlowManager flowManager = this.controller.getFlowManager();
        HashSet missingComponents = new HashSet();
        flowManager.getRootGroup().findAllProcessors().stream().filter(AbstractComponentNode::isExtensionMissing).forEach(p -> missingComponents.add(p.getIdentifier()));
        flowManager.getAllControllerServices().stream().filter(ComponentNode::isExtensionMissing).forEach(cs -> missingComponents.add(cs.getIdentifier()));
        this.controller.getAllReportingTasks().stream().filter(ComponentNode::isExtensionMissing).forEach(r -> missingComponents.add(r.getIdentifier()));
        return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint, missingComponents);
    }

    private NodeIdentifier getNodeId() {
        this.readLock.lock();
        try {
            NodeIdentifier nodeIdentifier = this.nodeId;
            return nodeIdentifier;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private void handleReconnectionRequest(ReconnectionRequestMessage request) {
        try {
            ConnectionResponse connectionResponse;
            logger.info("Processing reconnection request from cluster coordinator.");
            if (this.controller.isConnected()) {
                this.controller.onClusterDisconnect();
            }
            if ((connectionResponse = new ConnectionResponse(this.getNodeId(), (DataFlow)request.getDataFlow(), request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions())).getDataFlow() == null) {
                logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
                connectionResponse = this.connect(false, false, (DataFlow)this.createDataFlowFromController());
            }
            if (connectionResponse == null) {
                logger.warn("Received a Reconnection Request that contained no DataFlow, and was unable to communicate with an active Cluster Coordinator. Cannot connect to cluster at this time.");
                this.controller.resumeHeartbeats();
                return;
            }
            this.loadFromConnectionResponse(connectionResponse);
            this.clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream().collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
            this.saveFlowChanges();
            this.controller.onClusterConnect();
            logger.info("Node reconnected.");
        }
        catch (Exception ex) {
            if (this.controller.isClustered()) {
                this.disconnect("Failed to properly handle Reconnection request due to " + ex.toString());
            }
            logger.error("Handling reconnection request failed due to: " + ex, (Throwable)ex);
            this.handleConnectionFailure(ex);
        }
    }

    private void handleOffloadRequest(OffloadMessage request) throws InterruptedException {
        logger.info("Received offload request message from cluster coordinator with explanation: " + request.getExplanation());
        this.offload(request.getExplanation());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void offload(String explanation) throws InterruptedException {
        this.writeLock.lock();
        try {
            ProcessGroupStatus controllerStatus;
            logger.info("Offloading node due to " + explanation);
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation));
            FlowManager flowManager = this.controller.getFlowManager();
            flowManager.getRootGroup().stopProcessing();
            flowManager.getRootGroup().findAllProcessors().stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED).forEach(pn -> pn.getProcessGroup().terminateProcessor(pn));
            flowManager.getRootGroup().findAllRemoteProcessGroups().stream().filter(RemoteProcessGroup::isTransmitting).forEach(rpg -> {
                try {
                    rpg.stopTransmitting().get(rpg.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    logger.warn("Encountered failure while waiting for {} to shutdown", rpg, (Object)e);
                }
            });
            Set connections = flowManager.findAllConnections();
            for (Connection connection : connections) {
                connection.getFlowFileQueue().offloadQueue();
            }
            UserAwareEventAccess eventAccess = this.controller.getEventAccess();
            while ((controllerStatus = eventAccess.getControllerStatus()).getQueuedCount() > 0) {
                logger.debug("Offloading queues on node {}, remaining queued count: {}", (Object)this.getNodeId(), (Object)controllerStatus.getQueuedCount());
                Thread.sleep(1000L);
            }
            for (Connection connection : connections) {
                connection.getFlowFileQueue().resetOffloadedQueue();
            }
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation));
            this.clusterCoordinator.finishNodeOffload(this.getNodeId());
            logger.info("Node offloaded due to " + explanation);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void handleDisconnectionRequest(DisconnectMessage request) {
        logger.info("Received disconnection request message from cluster coordinator with explanation: " + request.getExplanation());
        this.disconnect(request.getExplanation());
    }

    private void disconnect(String explanation) {
        this.writeLock.lock();
        try {
            logger.info("Disconnecting node due to " + explanation);
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, DisconnectionCode.UNKNOWN, explanation));
            this.controller.setPrimary(false);
            this.controller.stopHeartbeating();
            this.controller.setClustered(false, null);
            this.clusterCoordinator.setConnected(false);
            logger.info("Node disconnected due to " + explanation);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void loadFromBytes(DataFlow proposedFlow, boolean allowEmptyFlow, BundleUpdateStrategy bundleUpdateStrategy) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
        HashSet missingComponents;
        byte[] authorizerFingerprint;
        byte[] flowBytes;
        logger.trace("Loading flow from bytes");
        if (proposedFlow == null) {
            ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
            this.copyCurrentFlow(flowOnDisk);
            flowBytes = flowOnDisk.toByteArray();
            authorizerFingerprint = this.getAuthorizerFingerprint();
            missingComponents = new HashSet();
            logger.debug("Loaded Flow from bytes");
        } else {
            flowBytes = proposedFlow.getFlow();
            authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
            missingComponents = proposedFlow.getMissingComponents();
            logger.debug("Loaded flow from proposed flow");
        }
        StandardDataFlow actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint, missingComponents);
        logger.debug("Loading proposed flow into FlowController");
        this.dao.load(this.controller, (DataFlow)actualProposedFlow, this, bundleUpdateStrategy);
        ProcessGroup rootGroup = this.controller.getFlowManager().getRootGroup();
        if (rootGroup.isEmpty() && !allowEmptyFlow) {
            throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
        }
        List<Template> templates = this.loadTemplates();
        for (Template template : templates) {
            Template existing = rootGroup.getTemplate(template.getIdentifier());
            if (existing == null) {
                logger.info("Imported Template '{}' to Root Group", (Object)template.getDetails().getName());
                rootGroup.addTemplate(template);
                continue;
            }
            logger.info("Template '{}' was already present in Root Group so will not import from file", (Object)template.getDetails().getName());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<Template> loadTemplates() throws IOException {
        Path templatePath = this.nifiProperties.getTemplateDirectory();
        File[] files = templatePath.toFile().listFiles(pathname -> {
            String lowerName = pathname.getName().toLowerCase();
            return lowerName.endsWith(".template") || lowerName.endsWith(".xml");
        });
        if (files == null) {
            return Collections.emptyList();
        }
        ArrayList<Template> templates = new ArrayList<Template>();
        File[] fileArray = files;
        int n = fileArray.length;
        int n2 = 0;
        while (n2 < n) {
            block34: {
                File file = fileArray[n2];
                FileInputStream fis = new FileInputStream(file);
                Throwable throwable = null;
                try {
                    BufferedInputStream bis = new BufferedInputStream(fis);
                    Throwable throwable2 = null;
                    try {
                        TemplateDTO templateDto;
                        try {
                            templateDto = TemplateDeserializer.deserialize(bis);
                        }
                        catch (Exception e) {
                            logger.error("Unable to interpret " + file + " as a Template. Skipping file.");
                            if (bis != null) {
                                if (throwable2 != null) {
                                    try {
                                        bis.close();
                                    }
                                    catch (Throwable throwable3) {
                                        throwable2.addSuppressed(throwable3);
                                    }
                                } else {
                                    bis.close();
                                }
                            }
                            if (fis == null) break block34;
                            if (throwable != null) {
                                try {
                                    fis.close();
                                }
                                catch (Throwable throwable4) {
                                    throwable.addSuppressed(throwable4);
                                }
                                break block34;
                            } else {
                                fis.close();
                            }
                            break block34;
                        }
                        if (templateDto.getId() == null) {
                            String uuid = UUID.nameUUIDFromBytes(templateDto.getName().getBytes(StandardCharsets.UTF_8)).toString();
                            templateDto.setId(uuid);
                        }
                        Template template = new Template(templateDto);
                        templates.add(template);
                    }
                    catch (Throwable throwable5) {
                        throwable2 = throwable5;
                        throw throwable5;
                    }
                    catch (Throwable throwable6) {
                        throw throwable6;
                    }
                }
                catch (Throwable throwable7) {
                    throwable = throwable7;
                    throw throwable7;
                }
                catch (Throwable throwable8) {
                    throw throwable8;
                }
            }
            ++n2;
        }
        return templates;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConnectionResponse connect(boolean retryOnCommsFailure, boolean retryIndefinitely, DataFlow dataFlow) throws ConnectionException {
        this.readLock.lock();
        try {
            logger.info("Connecting Node: " + this.nodeId);
            ConnectionRequest request = new ConnectionRequest(this.nodeId, dataFlow);
            ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
            requestMsg.setConnectionRequest(request);
            int maxAttempts = 10;
            ConnectionResponse response = null;
            for (int i = 0; i < 10 || retryIndefinitely; ++i) {
                try {
                    boolean activeCoordinatorParticipant = this.controller.getLeaderElectionManager().isActiveParticipant("Cluster Coordinator");
                    response = this.senderListener.requestConnection(requestMsg, activeCoordinatorParticipant).getConnectionResponse();
                    if (response.shouldTryLater()) {
                        logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason());
                        try {
                            Thread.sleep(response.getTryLaterSeconds() * 1000);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                    if (response.getRejectionReason() != null) {
                        logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason());
                        response = null;
                        break;
                    }
                    logger.info("Received successful response from Cluster Coordinator to Connection Request");
                }
                catch (NoClusterCoordinatorException ncce) {
                    logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node to become the active Cluster Coordinator and will attempt to connect to cluster again");
                    this.controller.registerForClusterCoordinator(true);
                    try {
                        Thread.sleep(1000L);
                        continue;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                catch (Exception pe) {
                    logger.warn("Failed to connect to cluster due to: " + pe);
                    if (logger.isDebugEnabled()) {
                        logger.warn("", (Throwable)pe);
                    }
                    if (!retryOnCommsFailure) break;
                    try {
                        Thread.sleep(response == null ? 5000L : (long)response.getTryLaterSeconds());
                        continue;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                break;
            }
            if (response == null) {
                ConnectionResponse i = response;
                return i;
            }
            if (response.shouldTryLater()) {
                logger.info("Received a 'try again' response from Cluster Coordinator when attempting to connect to cluster with explanation '" + response.getRejectionReason() + "'. However, the maximum number of retries have already completed. Will load local flow and connect to the cluster when able.");
                ConnectionResponse i = null;
                return i;
            }
            try {
                Map<String, String> map = Collections.singletonMap(NODE_UUID, response.getNodeIdentifier().getId());
                this.controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(map, Scope.LOCAL);
            }
            catch (IOException ioe) {
                logger.warn("Received successful response from Cluster Manager but failed to persist state about the Node's Unique Identifier and the Node's Index. This node may be assigned a different UUID when the node is restarted.", (Throwable)ioe);
            }
            ConnectionResponse connectionResponse = response;
            return connectionResponse;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private void loadFromConnectionResponse(ConnectionResponse response) throws ConnectionException {
        this.writeLock.lock();
        try {
            if (response.getNodeConnectionStatuses() != null) {
                this.clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream().collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
            }
            DataFlow dataFlow = response.getDataFlow();
            if (logger.isTraceEnabled()) {
                logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8));
            }
            logger.info("Setting Flow Controller's Node ID: " + this.nodeId);
            this.nodeId = response.getNodeIdentifier();
            this.controller.setNodeId(this.nodeId);
            this.loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
            this.clusterCoordinator.setLocalNodeIdentifier(this.nodeId);
            this.clusterCoordinator.setConnected(true);
            ComponentRevisionSnapshot componentRevisionSnapshot = response.getComponentRevisions();
            RevisionSnapshot revisionSnapshot = componentRevisionSnapshot.toRevisionSnapshot();
            this.revisionManager.reset(revisionSnapshot);
            this.controller.setClustered(true, response.getInstanceId());
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.CONNECTED));
            this.initializeController();
            this.controller.onFlowInitialized(this.autoResumeState);
            this.loadSnippets(dataFlow.getSnippets());
            this.controller.startHeartbeating();
        }
        catch (UninheritableFlowException ufe) {
            throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX, ufe);
        }
        catch (MissingBundleException mbe) {
            throw new MissingBundleException("Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node", mbe);
        }
        catch (FlowSerializationException fse) {
            throw new ConnectionException("Failed to connect node to cluster because local or cluster flow is malformed.", fse);
        }
        catch (FlowSynchronizationException fse) {
            throw new FlowSynchronizationException("Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption.", fse);
        }
        catch (Exception ex) {
            throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void initializeController() throws IOException {
        if (this.firstControllerInitialization) {
            logger.debug("First controller initialization, initializing controller...");
            this.controller.initializeFlow();
            this.firstControllerInitialization = false;
        }
    }

    public void copyCurrentFlow(OutputStream os) throws IOException {
        this.readLock.lock();
        try {
            this.dao.load(os);
        }
        finally {
            this.readLock.unlock();
        }
    }

    public void copyCurrentFlow(File file) throws IOException {
        try (FileOutputStream fos = new FileOutputStream(file);
             GZIPOutputStream gzipOut = new GZIPOutputStream((OutputStream)fos, 1);){
            this.copyCurrentFlow((OutputStream)gzipOut);
        }
    }

    public void loadSnippets(byte[] bytes) {
        if (bytes.length == 0) {
            return;
        }
        SnippetManager snippetManager = this.controller.getSnippetManager();
        snippetManager.clear();
        for (StandardSnippet snippet : SnippetManager.parseBytes(bytes)) {
            snippetManager.addSnippet(snippet);
        }
    }

    private class SaveHolder {
        private final Calendar saveTime;
        private final boolean shouldArchive;

        private SaveHolder(Calendar moment, boolean archive) {
            this.saveTime = moment;
            this.shouldArchive = archive;
        }
    }

    private class SaveReportingTask
    implements Runnable {
        private SaveReportingTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void run() {
            block15: {
                ClassLoader currentCl = null;
                Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();
                if (frameworkBundle != null) {
                    currentCl = Thread.currentThread().getContextClassLoader();
                    ClassLoader cl = frameworkBundle.getClassLoader();
                    Thread.currentThread().setContextClassLoader(cl);
                }
                try {
                    SaveHolder holder = (SaveHolder)StandardFlowService.this.saveHolder.get();
                    if (holder == null) {
                        return;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Save request time {} // Current time {}", (Object)holder.saveTime.getTime(), (Object)new Date());
                    }
                    Calendar now = Calendar.getInstance();
                    if (!holder.saveTime.before(now)) break block15;
                    if (logger.isTraceEnabled()) {
                        logger.trace("Waiting for write lock and then will save");
                    }
                    StandardFlowService.this.writeLock.lock();
                    try {
                        StandardFlowService.this.dao.save(StandardFlowService.this.controller, holder.shouldArchive);
                        boolean noSavePending = StandardFlowService.this.saveHolder.compareAndSet(holder, null);
                        logger.info("Saved flow controller {} // Another save pending = {}", (Object)StandardFlowService.this.controller, (Object)(!noSavePending ? 1 : 0));
                    }
                    finally {
                        StandardFlowService.this.writeLock.unlock();
                    }
                }
                catch (Throwable t) {
                    logger.error("Unable to save flow controller configuration due to: " + t, t);
                    if (logger.isDebugEnabled()) {
                        logger.error("", t);
                    }
                    Bulletin saveFailureBulletin = BulletinFactory.createBulletin((String)StandardFlowService.EVENT_CATEGORY, (String)LogLevel.ERROR.name(), (String)"Unable to save flow controller configuration.");
                    StandardFlowService.this.controller.getBulletinRepository().addBulletin(saveFailureBulletin);
                }
                finally {
                    if (currentCl != null) {
                        Thread.currentThread().setContextClassLoader(currentCl);
                    }
                }
            }
        }
    }
}

