/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.client.async.nio;

import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NioAsyncLoadBalanceClientTask
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
    private static final String EVENT_CATEGORY = "Load-Balanced Connection";
    private final NioAsyncLoadBalanceClientRegistry clientRegistry;
    private final ClusterCoordinator clusterCoordinator;
    private final EventReporter eventReporter;
    private volatile boolean running = true;

    public NioAsyncLoadBalanceClientTask(NioAsyncLoadBalanceClientRegistry clientRegistry, ClusterCoordinator clusterCoordinator, EventReporter eventReporter) {
        this.clientRegistry = clientRegistry;
        this.clusterCoordinator = clusterCoordinator;
        this.eventReporter = eventReporter;
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                boolean success = false;
                for (AsyncLoadBalanceClient client : this.clientRegistry.getAllClients()) {
                    if (!client.isRunning()) {
                        logger.trace("Client {} is not running so will not communicate with it", (Object)client);
                        continue;
                    }
                    if (client.isPenalized()) {
                        logger.trace("Client {} is penalized so will not communicate with it", (Object)client);
                        continue;
                    }
                    NodeIdentifier clientNodeId = client.getNodeIdentifier();
                    NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(clientNodeId);
                    if (connectionStatus == null) {
                        logger.debug("Could not determine Connection Status for Node with ID {}; will not communicate with it", (Object)clientNodeId);
                        continue;
                    }
                    NodeConnectionState connectionState = connectionStatus.getState();
                    if (connectionState != NodeConnectionState.CONNECTED) {
                        logger.debug("Notifying Client {} that node is not connected because current state is {}", (Object)client, (Object)connectionState);
                        client.nodeDisconnected();
                        continue;
                    }
                    try {
                        while (client.communicate()) {
                            success = true;
                            logger.trace("Client {} was able to make progress communicating with peer. Will continue to communicate with peer.", (Object)client);
                        }
                    }
                    catch (Exception e) {
                        this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to communicate with Peer " + client.getNodeIdentifier() + " while trying to load balance data across the cluster due to " + e.toString());
                        logger.error("Failed to communicate with Peer {} while trying to load balance data across the cluster.", (Object)client.getNodeIdentifier(), (Object)e);
                    }
                    logger.trace("Client {} was no longer able to make progress communicating with peer. Will move on to the next client", (Object)client);
                }
                if (success) continue;
                logger.trace("Was unable to communicate with any client. Will sleep for 10 milliseconds.");
                Thread.sleep(10L);
            }
            catch (Exception e) {
                logger.error("Failed to communicate with peer while trying to load balance data across the cluster", (Throwable)e);
                this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to comunicate with Peer while trying to load balance data across the cluster due to " + e);
            }
        }
    }

    public void stop() {
        this.running = false;
    }
}

