/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.client.async.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
import org.apache.nifi.controller.queue.clustered.SimpleLimitThreshold;
import org.apache.nifi.controller.queue.clustered.TransactionThreshold;
import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
import org.apache.nifi.controller.queue.clustered.client.async.nio.LoadBalanceSession;
import org.apache.nifi.controller.queue.clustered.client.async.nio.PeerChannel;
import org.apache.nifi.controller.queue.clustered.client.async.nio.RegisteredPartition;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NioAsyncLoadBalanceClient
implements AsyncLoadBalanceClient {
    private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClient.class);
    private static final long PENALIZATION_MILLIS = TimeUnit.SECONDS.toMillis(1L);
    private final NodeIdentifier nodeIdentifier;
    private final SSLContext sslContext;
    private final int timeoutMillis;
    private final FlowFileContentAccess flowFileContentAccess;
    private final LoadBalanceFlowFileCodec flowFileCodec;
    private final EventReporter eventReporter;
    private final ClusterCoordinator clusterCoordinator;
    private volatile boolean running = false;
    private final AtomicLong penalizationEnd = new AtomicLong(0L);
    private final Map<String, RegisteredPartition> registeredPartitions = new HashMap<String, RegisteredPartition>();
    private final Queue<RegisteredPartition> partitionQueue = new LinkedBlockingQueue<RegisteredPartition>();
    private PeerChannel channel;
    private Selector selector;
    private SelectionKey selectionKey;
    private final Lock loadBalanceSessionLock = new ReentrantLock();
    private LoadBalanceSession loadBalanceSession = null;

    public NioAsyncLoadBalanceClient(NodeIdentifier nodeIdentifier, SSLContext sslContext, int timeoutMillis, FlowFileContentAccess flowFileContentAccess, LoadBalanceFlowFileCodec flowFileCodec, EventReporter eventReporter, ClusterCoordinator clusterCoordinator) {
        this.nodeIdentifier = nodeIdentifier;
        this.sslContext = sslContext;
        this.timeoutMillis = timeoutMillis;
        this.flowFileContentAccess = flowFileContentAccess;
        this.flowFileCodec = flowFileCodec;
        this.eventReporter = eventReporter;
        this.clusterCoordinator = clusterCoordinator;
    }

    @Override
    public NodeIdentifier getNodeIdentifier() {
        return this.nodeIdentifier;
    }

    @Override
    public synchronized void register(String connectionId, BooleanSupplier emptySupplier, Supplier<FlowFileRecord> flowFileSupplier, TransactionFailureCallback failureCallback, TransactionCompleteCallback successCallback, Supplier<LoadBalanceCompression> compressionSupplier, BooleanSupplier honorBackpressureSupplier) {
        if (this.registeredPartitions.containsKey(connectionId)) {
            throw new IllegalStateException("Connection with ID " + connectionId + " is already registered");
        }
        RegisteredPartition partition = new RegisteredPartition(connectionId, emptySupplier, flowFileSupplier, failureCallback, successCallback, compressionSupplier, honorBackpressureSupplier);
        this.registeredPartitions.put(connectionId, partition);
        this.partitionQueue.add(partition);
    }

    @Override
    public synchronized void unregister(String connectionId) {
        boolean validSession;
        RegisteredPartition removedPartition = this.registeredPartitions.remove(connectionId);
        if (removedPartition == null) {
            logger.debug("{} Unregistered Connection with ID {} but there were no Registered Partitions", (Object)this, (Object)connectionId);
            return;
        }
        logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", new Object[]{this, connectionId, removedPartition});
        boolean bl = validSession = this.loadBalanceSession != null && connectionId.equals(this.loadBalanceSession.getPartition().getConnectionId());
        if (validSession && !this.loadBalanceSession.isComplete() && this.loadBalanceSession.cancel()) {
            List<FlowFileRecord> flowFilesSent = this.loadBalanceSession.getAndPurgeFlowFilesSent();
            logger.debug("{} Triggering failure callback for {} FlowFiles for Registered Partition {} because partition was unregistered", new Object[]{this, flowFilesSent.size(), removedPartition});
            removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
        }
    }

    @Override
    public synchronized int getRegisteredConnectionCount() {
        return this.registeredPartitions.size();
    }

    private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() {
        return new HashMap<String, RegisteredPartition>(this.registeredPartitions);
    }

    @Override
    public void start() {
        this.running = true;
        logger.debug("{} started", (Object)this);
    }

    @Override
    public void stop() {
        this.running = false;
        logger.debug("{} stopped", (Object)this);
        this.close();
    }

    private synchronized void close() {
        if (this.selector != null && this.selector.isOpen()) {
            try {
                this.selector.close();
            }
            catch (Exception e) {
                logger.warn("Failed to close NIO Selector", (Throwable)e);
            }
        }
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            }
            catch (Exception e) {
                logger.warn("Failed to close Socket Channel to {} for Load Balancing", (Object)this.nodeIdentifier, (Object)e);
            }
        }
        this.channel = null;
        this.selector = null;
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public boolean isPenalized() {
        long endTimestamp = this.penalizationEnd.get();
        if (endTimestamp == 0L) {
            return false;
        }
        if (endTimestamp < System.currentTimeMillis()) {
            this.penalizationEnd.compareAndSet(endTimestamp, 0L);
            return false;
        }
        return true;
    }

    private void penalize() {
        logger.debug("Penalizing {}", (Object)this);
        this.penalizationEnd.set(System.currentTimeMillis() + PENALIZATION_MILLIS);
    }

    @Override
    public boolean communicate() throws IOException {
        if (!this.running) {
            return false;
        }
        if (!this.loadBalanceSessionLock.tryLock()) {
            return false;
        }
        try {
            boolean success;
            boolean ready;
            LoadBalanceSession loadBalanceSession;
            RegisteredPartition readyPartition = null;
            if (!this.isConnectionEstablished()) {
                readyPartition = this.getReadyPartition();
                if (readyPartition == null) {
                    logger.debug("{} has no connection with data ready to be transmitted so will penalize Client without communicating", (Object)this);
                    this.penalize();
                    boolean bl = false;
                    return bl;
                }
                try {
                    this.establishConnection();
                }
                catch (IOException e) {
                    this.penalize();
                    this.partitionQueue.offer(readyPartition);
                    for (RegisteredPartition partition : this.getRegisteredPartitions().values()) {
                        logger.debug("Triggering Transaction Failure Callback for {} with Transaction Phase of CONNECTING", (Object)partition);
                        partition.getFailureCallback().onTransactionFailed(Collections.emptyList(), e, TransactionFailureCallback.TransactionPhase.CONNECTING);
                    }
                    boolean bl = false;
                    this.loadBalanceSessionLock.unlock();
                    return bl;
                }
            }
            if ((loadBalanceSession = this.getActiveTransaction(readyPartition)) == null) {
                this.penalize();
                boolean bl = false;
                return bl;
            }
            this.selector.selectNow();
            boolean bl = ready = (loadBalanceSession.getDesiredReadinessFlag() & this.selectionKey.readyOps()) != 0;
            if (!ready) {
                boolean partition = false;
                return partition;
            }
            boolean anySuccess = false;
            do {
                try {
                    success = loadBalanceSession.communicate();
                }
                catch (Exception e) {
                    logger.error("Failed to communicate with Peer {}", (Object)this.nodeIdentifier.toString(), (Object)e);
                    this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to communicate with Peer " + this.nodeIdentifier + " when load balancing data for Connection with ID " + loadBalanceSession.getPartition().getConnectionId() + " due to " + e);
                    this.penalize();
                    loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
                    this.close();
                    boolean bl2 = false;
                    this.loadBalanceSessionLock.unlock();
                    return bl2;
                }
                boolean bl3 = anySuccess = anySuccess || success;
            } while (success);
            if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
                loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), this.nodeIdentifier);
            }
            boolean bl4 = anySuccess;
            return bl4;
        }
        catch (Exception e) {
            this.close();
            this.loadBalanceSession = null;
            throw e;
        }
        finally {
            this.loadBalanceSessionLock.unlock();
        }
    }

    @Override
    public void nodeDisconnected() {
        if (!this.loadBalanceSessionLock.tryLock()) {
            return;
        }
        try {
            LoadBalanceSession session = this.getFailoverSession();
            if (session != null) {
                this.loadBalanceSession = null;
                logger.debug("Node {} disconnected so will terminate the Load Balancing Session", (Object)this.nodeIdentifier);
                List<FlowFileRecord> flowFilesSent = session.getAndPurgeFlowFilesSent();
                if (!flowFilesSent.isEmpty()) {
                    session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
                }
                this.close();
                this.penalize();
                return;
            }
            RegisteredPartition readyPartition = this.getReadyPartition(false, partition -> partition.getFailureCallback().isRebalanceOnFailure());
            if (readyPartition == null) {
                return;
            }
            this.partitionQueue.offer(readyPartition);
            this.failFlowFiles(readyPartition);
            this.penalize();
        }
        finally {
            this.loadBalanceSessionLock.unlock();
        }
    }

    private void failFlowFiles(RegisteredPartition partition) {
        FlowFileRecord flowFile;
        TransactionThreshold threshold = this.newTransactionThreshold();
        ArrayList<FlowFileRecord> flowFiles = new ArrayList<FlowFileRecord>();
        while (!threshold.isThresholdMet() && (flowFile = partition.getFlowFileRecordSupplier().get()) != null) {
            flowFiles.add(flowFile);
            threshold.adjust(1, flowFile.getSize());
        }
        logger.debug("Node {} not connected so failing {} FlowFiles for Load Balancing", (Object)this.nodeIdentifier, (Object)flowFiles.size());
        partition.getFailureCallback().onTransactionFailed(flowFiles, TransactionFailureCallback.TransactionPhase.SENDING);
    }

    private synchronized LoadBalanceSession getFailoverSession() {
        if (this.loadBalanceSession != null && !this.loadBalanceSession.isComplete()) {
            return this.loadBalanceSession;
        }
        return null;
    }

    private RegisteredPartition getReadyPartition() {
        return this.getReadyPartition(true, partition -> true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized RegisteredPartition getReadyPartition(boolean requireNodeConnected, Predicate<RegisteredPartition> filter) {
        ArrayList<RegisteredPartition> polledPartitions = new ArrayList<RegisteredPartition>();
        try {
            RegisteredPartition partition;
            while ((partition = this.partitionQueue.poll()) != null) {
                if (partition.isEmpty() || partition.isPenalized() || requireNodeConnected && !this.checkNodeConnected(partition) || !filter.test(partition)) {
                    polledPartitions.add(partition);
                    continue;
                }
                RegisteredPartition registeredPartition = partition;
                return registeredPartition;
            }
            RegisteredPartition registeredPartition = null;
            return registeredPartition;
        }
        finally {
            this.partitionQueue.addAll(polledPartitions);
        }
    }

    private synchronized boolean checkNodeConnected(RegisteredPartition partition) {
        boolean connected;
        NodeConnectionStatus status = this.clusterCoordinator.getConnectionStatus(this.nodeIdentifier);
        boolean bl = connected = status != null && status.getState() == NodeConnectionState.CONNECTED;
        if (!connected) {
            this.failFlowFiles(partition);
        }
        return connected;
    }

    private synchronized LoadBalanceSession getActiveTransaction(RegisteredPartition proposedPartition) {
        RegisteredPartition readyPartition;
        if (this.loadBalanceSession != null && !this.loadBalanceSession.isComplete()) {
            return this.loadBalanceSession;
        }
        RegisteredPartition registeredPartition = readyPartition = proposedPartition == null ? this.getReadyPartition() : proposedPartition;
        if (readyPartition == null) {
            return null;
        }
        this.loadBalanceSession = new LoadBalanceSession(readyPartition, this.flowFileContentAccess, this.flowFileCodec, this.channel, this.timeoutMillis, this.newTransactionThreshold());
        this.partitionQueue.offer(readyPartition);
        return this.loadBalanceSession;
    }

    private TransactionThreshold newTransactionThreshold() {
        return new SimpleLimitThreshold(1000, 10000000L);
    }

    private synchronized boolean isConnectionEstablished() {
        return this.selector != null && this.channel != null && this.channel.isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void establishConnection() throws IOException {
        SocketChannel socketChannel = null;
        try {
            PeerChannel peerChannel;
            NioAsyncLoadBalanceClient nioAsyncLoadBalanceClient = this;
            synchronized (nioAsyncLoadBalanceClient) {
                if (this.isConnectionEstablished()) {
                    return;
                }
                this.selector = Selector.open();
                socketChannel = this.createChannel();
                socketChannel.configureBlocking(true);
                this.channel = peerChannel = this.createPeerChannel(socketChannel, socketChannel.getLocalAddress() + "::" + socketChannel.getRemoteAddress());
            }
            peerChannel.performHandshake();
            nioAsyncLoadBalanceClient = this;
            synchronized (nioAsyncLoadBalanceClient) {
                socketChannel.configureBlocking(false);
                this.selectionKey = socketChannel.register(this.selector, 5);
            }
        }
        catch (Exception e) {
            logger.error("Unable to connect to {} for load balancing", (Object)this.nodeIdentifier, (Object)e);
            this.close();
            if (socketChannel != null) {
                try {
                    socketChannel.close();
                }
                catch (Exception e1) {
                    e.addSuppressed(e1);
                }
            }
            throw e;
        }
    }

    private PeerChannel createPeerChannel(SocketChannel channel, String peerDescription) {
        if (this.sslContext == null) {
            logger.debug("No SSL Context is available so will not perform SSL Handshake with Peer {}", (Object)peerDescription);
            return new PeerChannel(channel, null, peerDescription);
        }
        logger.debug("Performing SSL Handshake with Peer {}", (Object)peerDescription);
        SSLEngine sslEngine = this.sslContext.createSSLEngine();
        sslEngine.setUseClientMode(true);
        sslEngine.setNeedClientAuth(true);
        return new PeerChannel(channel, sslEngine, peerDescription);
    }

    private SocketChannel createChannel() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            Socket socket = socketChannel.socket();
            socket.setSoTimeout(this.timeoutMillis);
            socket.connect(new InetSocketAddress(this.nodeIdentifier.getLoadBalanceAddress(), this.nodeIdentifier.getLoadBalancePort()));
            socket.setSoTimeout(this.timeoutMillis);
            return socketChannel;
        }
        catch (Exception e) {
            try {
                socketChannel.close();
            }
            catch (Exception closeException) {
                e.addSuppressed(closeException);
            }
            throw e;
        }
    }

    public String toString() {
        return "NioAsyncLoadBalanceClient[nodeId=" + this.nodeIdentifier + "]";
    }
}

