/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.http.replication;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.replication.AsyncClusterResponse;
import org.apache.nifi.cluster.coordination.http.replication.CompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.http.replication.ResponseUtils;
import org.apache.nifi.cluster.coordination.http.replication.StandardAsyncClusterResponse;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.glassfish.jersey.client.filter.EncodingFilter;
import org.glassfish.jersey.message.GZipEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadPoolRequestReplicator
implements RequestReplicator {
    private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
    private final Client client;
    private final int connectionTimeoutMs;
    private final int readTimeoutMs;
    private final int maxConcurrentRequests;
    private final HttpResponseMapper responseMapper;
    private final EventReporter eventReporter;
    private final RequestCompletionCallback callback;
    private final ClusterCoordinator clusterCoordinator;
    private final NiFiProperties nifiProperties;
    private ThreadPoolExecutor executorService;
    private ScheduledExecutorService maintenanceExecutor;
    private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<String, StandardAsyncClusterResponse>();
    private final ConcurrentMap<NodeIdentifier, AtomicInteger> sequentialLongRequestCounts = new ConcurrentHashMap<NodeIdentifier, AtomicInteger>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();

    public ThreadPoolRequestReplicator(int corePoolSize, int maxPoolSize, int maxConcurrentRequests, Client client, ClusterCoordinator clusterCoordinator, RequestCompletionCallback callback, EventReporter eventReporter, NiFiProperties nifiProperties) {
        this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
    }

    public ThreadPoolRequestReplicator(int corePoolSize, int maxPoolSize, int maxConcurrentRequests, Client client, ClusterCoordinator clusterCoordinator, String connectionTimeout, String readTimeout, RequestCompletionCallback callback, EventReporter eventReporter, NiFiProperties nifiProperties) {
        if (corePoolSize <= 0) {
            throw new IllegalArgumentException("The Core Pool Size must be greater than zero.");
        }
        if (maxPoolSize < corePoolSize) {
            throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size.");
        }
        if (client == null) {
            throw new IllegalArgumentException("Client may not be null.");
        }
        this.client = client;
        this.clusterCoordinator = clusterCoordinator;
        this.connectionTimeoutMs = (int)FormatUtils.getTimeDuration((String)connectionTimeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.readTimeoutMs = (int)FormatUtils.getTimeDuration((String)readTimeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.maxConcurrentRequests = maxConcurrentRequests;
        this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
        this.eventReporter = eventReporter;
        this.callback = callback;
        this.nifiProperties = nifiProperties;
        client.property("jersey.config.client.connectTimeout", (Object)this.connectionTimeoutMs);
        client.property("jersey.config.client.readTimeout", (Object)this.readTimeoutMs);
        client.property("jersey.config.client.followRedirects", (Object)Boolean.TRUE);
        AtomicInteger threadId = new AtomicInteger(0);
        ThreadFactory threadFactory = r -> {
            Thread t = Executors.defaultThreadFactory().newThread(r);
            t.setDaemon(true);
            t.setName("Replicate Request Thread-" + threadId.incrementAndGet());
            return t;
        };
        this.executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = Executors.defaultThreadFactory().newThread(r);
                t.setDaemon(true);
                t.setName(ThreadPoolRequestReplicator.class.getSimpleName() + " Maintenance Thread");
                return t;
            }
        });
        this.maintenanceExecutor.scheduleWithFixedDelay(() -> this.purgeExpiredRequests(), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void shutdown() {
        this.executorService.shutdown();
        this.maintenanceExecutor.shutdown();
    }

    @Override
    public AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers) {
        return this.replicate(NiFiUserUtils.getNiFiUser(), method, uri, entity, headers);
    }

    @Override
    public AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) {
        List nodeIds;
        Map stateMap = this.clusterCoordinator.getConnectionStates();
        boolean mutable = this.isMutableRequest(method, uri.getPath());
        if (mutable) {
            List disconnected = (List)stateMap.get(NodeConnectionState.DISCONNECTED);
            if (disconnected != null && !disconnected.isEmpty()) {
                if (disconnected.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + disconnected.iterator().next() + " is currently disconnected");
                }
                throw new DisconnectedNodeMutableRequestException(disconnected.size() + " Nodes are currently disconnected");
            }
            List disconnecting = (List)stateMap.get(NodeConnectionState.DISCONNECTING);
            if (disconnecting != null && !disconnecting.isEmpty()) {
                if (disconnecting.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + disconnecting.iterator().next() + " is currently disconnecting");
                }
                throw new DisconnectedNodeMutableRequestException(disconnecting.size() + " Nodes are currently disconnecting");
            }
            List connecting = (List)stateMap.get(NodeConnectionState.CONNECTING);
            if (connecting != null && !connecting.isEmpty()) {
                if (connecting.size() == 1) {
                    throw new ConnectingNodeMutableRequestException("Node " + connecting.iterator().next() + " is currently connecting");
                }
                throw new ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently connecting");
            }
        }
        if ((nodeIds = (List)stateMap.get(NodeConnectionState.CONNECTED)) == null || nodeIds.isEmpty()) {
            throw new NoConnectedNodesException();
        }
        HashSet<NodeIdentifier> nodeIdSet = new HashSet<NodeIdentifier>(nodeIds);
        return this.replicate(nodeIdSet, user, method, uri, entity, headers, true, true);
    }

    void updateRequestHeaders(Map<String, String> headers, NiFiUser user) {
        if (user == null) {
            throw new AccessDeniedException("Unknown user");
        }
        String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString((NiFiUser)user);
        headers.put("X-ProxiedEntitiesChain", proxiedEntitiesChain);
        headers.remove("Authorization");
        String knoxCookieName = this.nifiProperties.getKnoxCookieName();
        if (headers.containsKey("Cookie") && StringUtils.isNotBlank((CharSequence)knoxCookieName)) {
            String rawCookies = headers.get("Cookie");
            String[] rawCookieParts = rawCookies.split(";");
            Set filteredCookieParts = Stream.of(rawCookieParts).map(String::trim).filter(cookie -> !cookie.startsWith(knoxCookieName + "=")).collect(Collectors.toSet());
            if (filteredCookieParts.isEmpty()) {
                headers.remove("Cookie");
            } else {
                headers.put("Cookie", StringUtils.join(filteredCookieParts, (String)"; "));
            }
        }
        headers.remove("Host");
    }

    @Override
    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification) {
        return this.replicate(nodeIds, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers, indicateReplicated, performVerification);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification) {
        boolean bl;
        HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
        updatedHeaders.put("X-Cluster-Id-Generation-Seed", ComponentIdGenerator.generateId().toString());
        if (indicateReplicated) {
            updatedHeaders.put("X-Request-Replicated", "true");
        }
        this.updateRequestHeaders(updatedHeaders, user);
        if (indicateReplicated) {
            AsyncClusterResponse asyncClusterResponse;
            Lock lock = this.isMutableRequest(method, uri.getPath()) ? this.writeLock : this.readLock;
            logger.debug("Obtaining lock {} in order to replicate request {} {}", new Object[]{lock, method, uri});
            lock.lock();
            try {
                Object monitor;
                logger.debug("Lock {} obtained in order to replicate request {} {}", new Object[]{lock, method, uri});
                Object object = monitor = new Object();
                synchronized (object) {
                    AsyncClusterResponse response = this.replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, monitor);
                    try {
                        monitor.wait();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    asyncClusterResponse = response;
                }
                lock.unlock();
            }
            catch (Throwable throwable) {
                lock.unlock();
                logger.debug("Unlocked {} after replication completed for {} {}", new Object[]{lock, method, uri});
                throw throwable;
            }
            logger.debug("Unlocked {} after replication completed for {} {}", new Object[]{lock, method, uri});
            return asyncClusterResponse;
        }
        if (!performVerification) {
            bl = true;
            return this.replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, bl, true, null);
        }
        bl = false;
        return this.replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, bl, true, null);
    }

    @Override
    public AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers) {
        return this.forwardToCoordinator(coordinatorNodeId, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers);
    }

    @Override
    public AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) {
        HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
        this.updateRequestHeaders(updatedHeaders, user);
        return this.replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification, StandardAsyncClusterResponse response, boolean executionPhase, boolean merge, Object monitor) {
        try {
            int numRequests;
            Objects.requireNonNull(nodeIds);
            Objects.requireNonNull(method);
            Objects.requireNonNull(uri);
            Objects.requireNonNull(entity);
            Objects.requireNonNull(headers);
            if (nodeIds.isEmpty()) {
                throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
            }
            for (NodeIdentifier nodeId2 : nodeIds) {
                NodeConnectionStatus status = this.clusterCoordinator.getConnectionStatus(nodeId2);
                if (status == null) {
                    throw new UnknownNodeException("Node " + nodeId2 + " does not exist in this cluster");
                }
                if (status.getState() == NodeConnectionState.CONNECTED) continue;
                throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId2 + " because the node is not connected");
            }
            logger.debug("Replicating request {} {} with entity {} to {}; response is {}", new Object[]{method, uri, entity, nodeIds, response});
            HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
            String requestId = updatedHeaders.computeIfAbsent("X-RequestTransactionId", key -> UUID.randomUUID().toString());
            long verifyClusterStateNanos = -1L;
            if (performVerification) {
                long start = System.nanoTime();
                this.verifyClusterState(method, uri.getPath());
                verifyClusterStateNanos = System.nanoTime() - start;
            }
            if ((numRequests = this.responseMap.size()) >= this.maxConcurrentRequests) {
                numRequests = this.purgeExpiredRequests();
            }
            if (numRequests >= this.maxConcurrentRequests) {
                Map<String, Long> countsByUri = this.responseMap.values().stream().collect(Collectors.groupingBy(StandardAsyncClusterResponse::getURIPath, Collectors.counting()));
                logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", new Object[]{method, uri.getPath(), numRequests, countsByUri});
                throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
            }
            if (response == null) {
                CompletionCallback completionCallback = clusterResponse -> {
                    block9: {
                        Object object;
                        try {
                            this.onCompletedResponse(requestId);
                            if (monitor == null) break block9;
                            object = monitor;
                        }
                        catch (Throwable throwable) {
                            if (monitor != null) {
                                Object object2 = monitor;
                                synchronized (object2) {
                                    monitor.notify();
                                }
                                logger.debug("Notified monitor {} because request {} {} has completed", new Object[]{monitor, method, uri});
                            }
                            throw throwable;
                        }
                        synchronized (object) {
                            monitor.notify();
                        }
                        logger.debug("Notified monitor {} because request {} {} has completed", new Object[]{monitor, method, uri});
                    }
                };
                Runnable responseConsumedCallback = () -> this.onResponseConsumed(requestId);
                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, this.responseMapper, completionCallback, responseConsumedCallback, merge);
                this.responseMap.put(requestId, response);
            }
            if (verifyClusterStateNanos > -1L) {
                response.addTiming("Verify Cluster State", "All Nodes", verifyClusterStateNanos);
            }
            logger.debug("For Request ID {}, response object is {}", (Object)requestId, (Object)response);
            boolean mutableRequest = this.isMutableRequest(method, uri.getPath());
            if (mutableRequest && performVerification) {
                logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", (Object)requestId);
                this.performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
                return response;
            }
            if (mutableRequest) {
                response.setPhase("Execution Phase");
            }
            StandardAsyncClusterResponse finalResponse = response;
            NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
                logger.debug("Received response from {} for {} {}", new Object[]{nodeResponse.getNodeId(), method, uri.getPath()});
                finalResponse.add(nodeResponse);
            };
            if (mutableRequest && executionPhase) {
                updatedHeaders.put("X-Execution-Continue", "true");
            }
            Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, this.createURI(uri, (NodeIdentifier)nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse);
            this.submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
            return response;
        }
        catch (Throwable t) {
            if (monitor != null) {
                Object requestId = monitor;
                synchronized (requestId) {
                    monitor.notify();
                }
                logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", new Object[]{monitor, method, uri, t});
            }
            if (response != null) {
                RuntimeException failure = t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException("Failed to submit Replication Request to background thread", t);
                response.setFailure(failure, new NodeIdentifier());
            }
            throw t;
        }
    }

    private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers, final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
        logger.debug("Verifying that mutable request {} {} can be made", (Object)method, (Object)uri.getPath());
        HashMap<String, String> validationHeaders = new HashMap<String, String>(headers);
        validationHeaders.put("X-Validation-Expects", "150-NodeContinue");
        final long startNanos = System.nanoTime();
        final int numNodes = nodeIds.size();
        NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback(){
            final Set<NodeResponse> nodeResponses = Collections.synchronizedSet(new HashSet());

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onCompletion(NodeResponse nodeResponse) {
                block19: {
                    boolean allNodesResponded;
                    Set<NodeResponse> set = this.nodeResponses;
                    synchronized (set) {
                        this.nodeResponses.add(nodeResponse);
                        allNodesResponded = this.nodeResponses.size() == numNodes;
                    }
                    try {
                        Object object;
                        long nanos = System.nanoTime() - startNanos;
                        clusterResponse.addTiming("Completed Verification", nodeResponse.getNodeId().toString(), nanos);
                        if (!allNodesResponded) break block19;
                        clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
                        final long dissentingCount = this.nodeResponses.stream().filter(p -> p.getStatus() != 150).count();
                        if (dissentingCount == 0L) {
                            logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", new Object[]{numNodes, method, uri.getPath()});
                            ThreadPoolRequestReplicator.this.replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge, monitor);
                            return;
                        }
                        try {
                            final HashMap<String, String> cancelLockHeaders = new HashMap<String, String>(headers);
                            cancelLockHeaders.put("X-Cancel-Transaction", "true");
                            Thread cancelLockThread = new Thread(new Runnable(){

                                @Override
                                public void run() {
                                    logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", new Object[]{dissentingCount, method, uri.getPath()});
                                    Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, ThreadPoolRequestReplicator.this.createURI(uri, nodeId), entity, cancelLockHeaders, null, clusterResponse);
                                    ThreadPoolRequestReplicator.this.submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                                }
                            });
                            cancelLockThread.setName("Cancel Flow Locks");
                            cancelLockThread.start();
                            for (NodeResponse response : this.nodeResponses) {
                                String message;
                                if (response.getStatus() == 150) continue;
                                Response clientResponse = response.getClientResponse();
                                if (clientResponse == null) {
                                    message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
                                    logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", new Object[]{response.getStatus(), response.getNodeId(), method, uri.getPath()});
                                } else {
                                    String nodeExplanation = (String)clientResponse.readEntity(String.class);
                                    message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
                                    logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", new Object[]{response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation});
                                }
                                Object failure = response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() ? (response.hasThrowable() ? new AccessDeniedException(message, response.getThrowable()) : new AccessDeniedException(message)) : (response.hasThrowable() ? new IllegalClusterStateException(message, response.getThrowable()) : new IllegalClusterStateException(message));
                                clusterResponse.setFailure((RuntimeException)failure, response.getNodeId());
                            }
                            if (monitor == null) break block19;
                            object = monitor;
                        }
                        catch (Throwable throwable) {
                            if (monitor != null) {
                                Object object2 = monitor;
                                synchronized (object2) {
                                    monitor.notify();
                                }
                                logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", new Object[]{monitor, method, uri});
                            }
                            throw throwable;
                        }
                        synchronized (object) {
                            monitor.notify();
                        }
                        logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", new Object[]{monitor, method, uri});
                    }
                    catch (Exception e) {
                        clusterResponse.add(new NodeResponse(nodeResponse.getNodeId(), method, uri, e));
                        for (NodeResponse otherResponse : this.nodeResponses) {
                            if (otherResponse.getNodeId().equals((Object)nodeResponse.getNodeId())) continue;
                            clusterResponse.add(otherResponse);
                        }
                    }
                }
            }
        };
        Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, this.createURI(uri, (NodeIdentifier)nodeId), entity, validationHeaders, completionCallback, clusterResponse);
        this.submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
    }

    @Override
    public AsyncClusterResponse getClusterResponse(String identifier) {
        AsyncClusterResponse response = (AsyncClusterResponse)this.responseMap.get(identifier);
        if (response == null) {
            return null;
        }
        return response;
    }

    protected NodeResponse replicateRequest(Invocation invocation, NodeIdentifier nodeId, String method, URI uri, String requestId, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
        boolean canBufferResponse;
        int length;
        long startNanos = System.nanoTime();
        logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", new Object[]{method, uri, requestId, headers});
        Response response = invocation.invoke();
        long nanos = System.nanoTime() - startNanos;
        clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
        NodeResponse nodeResponse = new NodeResponse(nodeId, method, uri, response, System.nanoTime() - startNanos, requestId);
        if (nodeResponse.is2xx() && (length = nodeResponse.getClientResponse().getLength()) > 0 && (canBufferResponse = clusterResponse.requestBuffer(length))) {
            nodeResponse.bufferResponse();
        }
        return nodeResponse;
    }

    private boolean isMutableRequest(String method, String uriPath) {
        switch (method.toUpperCase()) {
            case "GET": 
            case "HEAD": 
            case "OPTIONS": {
                return false;
            }
        }
        return true;
    }

    private void verifyClusterState(String httpMethod, String uriPath) {
        boolean mutableRequest;
        boolean bl = mutableRequest = "DELETE".equals(httpMethod) || "POST".equals(httpMethod) || "PUT".equals(httpMethod);
        if (mutableRequest) {
            Map connectionStates = this.clusterCoordinator.getConnectionStates();
            if (connectionStates.containsKey(NodeConnectionState.DISCONNECTED) || connectionStates.containsKey(NodeConnectionState.DISCONNECTING)) {
                throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is disconnected from the cluster");
            }
            if (connectionStates.containsKey(NodeConnectionState.CONNECTING)) {
                throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster");
            }
        }
    }

    private void onResponseConsumed(String requestId) {
        this.responseMap.remove(requestId);
    }

    private void onCompletedResponse(String requestId) {
        AsyncClusterResponse response = (AsyncClusterResponse)this.responseMap.get(requestId);
        if (response != null && this.callback != null) {
            try {
                this.callback.afterRequest(response.getURIPath(), response.getMethod(), response.getCompletedNodeResponses());
            }
            catch (Exception e) {
                logger.warn("Completed request {} {} but failed to properly handle the Request Completion Callback due to {}", new Object[]{response.getMethod(), response.getURIPath(), e.toString()});
                logger.warn("", (Throwable)e);
            }
        }
        if (response != null && logger.isDebugEnabled()) {
            this.logTimingInfo(response);
        }
        Set<NodeIdentifier> slowResponseNodes = ResponseUtils.findLongResponseTimes(response, 1.5);
        for (NodeIdentifier nodeId : response.getNodesInvolved()) {
            AtomicInteger counter = this.sequentialLongRequestCounts.computeIfAbsent(nodeId, id -> new AtomicInteger(0));
            if (slowResponseNodes.contains(nodeId)) {
                int sequentialLongRequests = counter.incrementAndGet();
                if (sequentialLongRequests < 3) continue;
                String message = "Response time from " + nodeId + " was slow for each of the last 3 requests made. To see more information about timing, enable DEBUG logging for " + logger.getName();
                logger.warn(message);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.WARNING, "Node Response Time", message);
                }
                counter.set(0);
                continue;
            }
            counter.set(0);
        }
    }

    private void logTimingInfo(AsyncClusterResponse response) {
        LongSummaryStatistics stats = response.getNodesInvolved().stream().map(p -> response.getNodeResponse((NodeIdentifier)p).getRequestDuration(TimeUnit.MILLISECONDS)).collect(Collectors.summarizingLong(Long::longValue));
        StringBuilder sb = new StringBuilder();
        sb.append("Node Responses for ").append(response.getMethod()).append(" ").append(response.getURIPath()).append(" (Request ID ").append(response.getRequestIdentifier()).append("):\n");
        for (NodeIdentifier node : response.getNodesInvolved()) {
            sb.append(node).append(": ").append(response.getNodeResponse(node).getRequestDuration(TimeUnit.MILLISECONDS)).append(" millis\n");
        }
        logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", new Object[]{response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage()});
        logger.debug(sb.toString());
    }

    private void submitAsyncRequest(Set<NodeIdentifier> nodeIds, String scheme, String path, Function<NodeIdentifier, NodeHttpRequest> callableFactory, Map<String, String> headers) {
        if (nodeIds.isEmpty()) {
            return;
        }
        for (NodeIdentifier nodeId : nodeIds) {
            NodeHttpRequest callable = callableFactory.apply(nodeId);
            this.executorService.submit(callable);
        }
    }

    private URI createURI(URI exampleUri, NodeIdentifier nodeId) {
        return this.createURI(exampleUri.getScheme(), nodeId.getApiAddress(), nodeId.getApiPort(), exampleUri.getPath(), exampleUri.getQuery());
    }

    private URI createURI(String scheme, String nodeApiAddress, int nodeApiPort, String path, String query) {
        try {
            return new URI(scheme, null, nodeApiAddress, nodeApiPort, path, query, null);
        }
        catch (URISyntaxException e) {
            throw new UriConstructionException(e);
        }
    }

    private synchronized int purgeExpiredRequests() {
        Set<String> expiredRequestIds = this.responseMap.entrySet().stream().filter(entry -> ((StandardAsyncClusterResponse)entry.getValue()).isOlderThan(30L, TimeUnit.SECONDS)).filter(entry -> ((StandardAsyncClusterResponse)entry.getValue()).isComplete()).map(entry -> (String)entry.getKey()).collect(Collectors.toSet());
        expiredRequestIds.forEach(id -> this.onResponseConsumed((String)id));
        return this.responseMap.size();
    }

    private static interface NodeRequestCompletionCallback {
        public void onCompletion(NodeResponse var1);
    }

    private class NodeHttpRequest
    implements Runnable {
        private final NodeIdentifier nodeId;
        private final String method;
        private final URI uri;
        private final Object entity;
        private final Map<String, String> headers = new HashMap<String, String>();
        private final NodeRequestCompletionCallback callback;
        private final StandardAsyncClusterResponse clusterResponse;
        private final long creationNanos = System.nanoTime();
        private final GZipEncoder gzipEncoder = new GZipEncoder();

        private NodeHttpRequest(NodeIdentifier nodeId, String method, URI uri, Object entity, Map<String, String> headers, NodeRequestCompletionCallback callback, StandardAsyncClusterResponse clusterResponse) {
            this.nodeId = nodeId;
            this.method = method;
            this.uri = uri;
            this.entity = entity;
            this.headers.putAll(headers);
            this.callback = callback;
            this.clusterResponse = clusterResponse;
        }

        @Override
        public void run() {
            NodeResponse nodeResponse;
            long waitForScheduleNanos = System.nanoTime() - this.creationNanos;
            this.clusterResponse.addTiming("Wait for HTTP Request Replication to be triggered", this.nodeId.toString(), waitForScheduleNanos);
            try {
                boolean useGzip;
                String rawAcceptEncoding = this.headers.get("Accept-Encoding");
                if (rawAcceptEncoding == null) {
                    useGzip = false;
                } else {
                    String[] acceptEncodingTokens = rawAcceptEncoding.split(",");
                    Set acceptEncoding = Stream.of(acceptEncodingTokens).map(String::trim).filter(enc -> StringUtils.isNotEmpty((CharSequence)enc)).map(String::toLowerCase).collect(Collectors.toSet());
                    Set supportedEncodings = this.gzipEncoder.getSupportedEncodings();
                    useGzip = supportedEncodings.stream().anyMatch(supportedEncoding -> acceptEncoding.contains(supportedEncoding.toLowerCase()));
                }
                Invocation invocation = this.createInvocation(useGzip);
                String requestId = this.headers.get("x-nifi-request-id");
                logger.debug("Replicating request {} {} to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                nodeResponse = ThreadPoolRequestReplicator.this.replicateRequest(invocation, this.nodeId, this.method, this.uri, requestId, this.headers, this.clusterResponse);
            }
            catch (Exception e) {
                nodeResponse = new NodeResponse(this.nodeId, this.method, this.uri, e);
                logger.warn("Failed to replicate request {} {} to {} due to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId, e.toString()});
                logger.warn("", (Throwable)e);
            }
            if (this.callback != null) {
                logger.debug("Request {} {} completed for {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                this.callback.onCompletion(nodeResponse);
            }
        }

        private Invocation createInvocation(boolean useGzip) {
            Invocation invocation;
            MultivaluedHashMap map = new MultivaluedHashMap();
            if (this.entity instanceof MultivaluedMap) {
                map.putAll((Map)this.entity);
            }
            WebTarget webTarget = ThreadPoolRequestReplicator.this.client.target(this.uri);
            if (useGzip) {
                webTarget = (WebTarget)((WebTarget)webTarget.register(EncodingFilter.class)).register((Object)this.gzipEncoder);
            }
            if ("DELETE".equalsIgnoreCase(this.method) || "HEAD".equalsIgnoreCase(this.method) || "GET".equalsIgnoreCase(this.method) || "OPTIONS".equalsIgnoreCase(this.method)) {
                for (Object queryEntry : map.entrySet()) {
                    webTarget = webTarget.queryParam((String)queryEntry.getKey(), ((List)queryEntry.getValue()).toArray());
                }
                Invocation.Builder builder = webTarget.request();
                for (Map.Entry entry : this.headers.entrySet()) {
                    builder = builder.header((String)entry.getKey(), entry.getValue());
                }
                invocation = builder.build(this.method);
            } else {
                Invocation.Builder builder = webTarget.request();
                String contentType = null;
                for (Map.Entry<String, String> entry : this.headers.entrySet()) {
                    builder.header(entry.getKey(), (Object)entry.getValue());
                    if (!entry.getKey().equalsIgnoreCase("content-type")) continue;
                    contentType = entry.getValue();
                }
                if (contentType == null) {
                    contentType = "application/x-www-form-urlencoded";
                }
                invocation = this.entity == null ? builder.build(this.method, Entity.entity((Object)map, (String)contentType)) : builder.build(this.method, Entity.entity((Object)this.entity, (String)contentType));
            }
            return invocation;
        }
    }
}

