/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.http.replication;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.replication.AsyncClusterResponse;
import org.apache.nifi.cluster.coordination.http.replication.CompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.http.replication.ResponseUtils;
import org.apache.nifi.cluster.coordination.http.replication.StandardAsyncClusterResponse;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadPoolRequestReplicator
implements RequestReplicator {
    private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
    private static final int MAX_CONCURRENT_REQUESTS = 100;
    private final Client client;
    private final int connectionTimeoutMs;
    private final int readTimeoutMs;
    private final HttpResponseMerger responseMerger;
    private final EventReporter eventReporter;
    private final RequestCompletionCallback callback;
    private final ClusterCoordinator clusterCoordinator;
    private ExecutorService executorService;
    private ScheduledExecutorService maintenanceExecutor;
    private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<String, StandardAsyncClusterResponse>();
    private final ConcurrentMap<NodeIdentifier, AtomicInteger> sequentialLongRequestCounts = new ConcurrentHashMap<NodeIdentifier, AtomicInteger>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();

    public ThreadPoolRequestReplicator(int numThreads, Client client, ClusterCoordinator clusterCoordinator, RequestCompletionCallback callback, EventReporter eventReporter, NiFiProperties nifiProperties) {
        this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
    }

    public ThreadPoolRequestReplicator(int numThreads, Client client, ClusterCoordinator clusterCoordinator, String connectionTimeout, String readTimeout, RequestCompletionCallback callback, EventReporter eventReporter, NiFiProperties nifiProperties) {
        if (numThreads <= 0) {
            throw new IllegalArgumentException("The number of threads must be greater than zero.");
        }
        if (client == null) {
            throw new IllegalArgumentException("Client may not be null.");
        }
        this.client = client;
        this.clusterCoordinator = clusterCoordinator;
        this.connectionTimeoutMs = (int)FormatUtils.getTimeDuration((String)connectionTimeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.readTimeoutMs = (int)FormatUtils.getTimeDuration((String)readTimeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.responseMerger = new StandardHttpResponseMerger(nifiProperties);
        this.eventReporter = eventReporter;
        this.callback = callback;
        client.getProperties().put("com.sun.jersey.client.property.connectTimeout", this.connectionTimeoutMs);
        client.getProperties().put("com.sun.jersey.client.property.readTimeout", this.readTimeoutMs);
        client.getProperties().put("com.sun.jersey.client.property.followRedirects", Boolean.TRUE);
        AtomicInteger threadId = new AtomicInteger(0);
        this.executorService = Executors.newFixedThreadPool(numThreads, r -> {
            Thread t = Executors.defaultThreadFactory().newThread(r);
            t.setDaemon(true);
            t.setName("Replicate Request Thread-" + threadId.incrementAndGet());
            return t;
        });
        this.maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = Executors.defaultThreadFactory().newThread(r);
                t.setDaemon(true);
                t.setName(ThreadPoolRequestReplicator.class.getSimpleName() + " Maintenance Thread");
                return t;
            }
        });
        this.maintenanceExecutor.scheduleWithFixedDelay(() -> this.purgeExpiredRequests(), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void shutdown() {
        this.executorService.shutdown();
        this.maintenanceExecutor.shutdown();
    }

    @Override
    public AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers) {
        List nodeIds;
        Map stateMap = this.clusterCoordinator.getConnectionStates();
        boolean mutable = this.isMutableRequest(method, uri.getPath());
        if (mutable) {
            List disconnected = (List)stateMap.get(NodeConnectionState.DISCONNECTED);
            if (disconnected != null && !disconnected.isEmpty()) {
                if (disconnected.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + disconnected.iterator().next() + " is currently disconnected");
                }
                throw new DisconnectedNodeMutableRequestException(disconnected.size() + " Nodes are currently disconnected");
            }
            List disconnecting = (List)stateMap.get(NodeConnectionState.DISCONNECTING);
            if (disconnecting != null && !disconnecting.isEmpty()) {
                if (disconnecting.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + disconnecting.iterator().next() + " is currently disconnecting");
                }
                throw new DisconnectedNodeMutableRequestException(disconnecting.size() + " Nodes are currently disconnecting");
            }
            List connecting = (List)stateMap.get(NodeConnectionState.CONNECTING);
            if (connecting != null && !connecting.isEmpty()) {
                if (connecting.size() == 1) {
                    throw new ConnectingNodeMutableRequestException("Node " + connecting.iterator().next() + " is currently connecting");
                }
                throw new ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently connecting");
            }
        }
        if ((nodeIds = (List)stateMap.get(NodeConnectionState.CONNECTED)) == null || nodeIds.isEmpty()) {
            throw new NoConnectedNodesException();
        }
        HashSet<NodeIdentifier> nodeIdSet = new HashSet<NodeIdentifier>(nodeIds);
        return this.replicate(nodeIdSet, method, uri, entity, headers, true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification) {
        NiFiUser user;
        HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
        updatedHeaders.put("X-Cluster-Id-Generation-Seed", ComponentIdGenerator.generateId().toString());
        if (indicateReplicated) {
            updatedHeaders.put("X-Request-Replicated", "true");
        }
        if ((user = NiFiUserUtils.getNiFiUser()) != null && !user.isAnonymous()) {
            String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString((NiFiUser)user);
            updatedHeaders.put("X-ProxiedEntitiesChain", proxiedEntitiesChain);
        }
        if (indicateReplicated) {
            Lock lock = this.isMutableRequest(method, uri.getPath()) ? this.writeLock : this.readLock;
            logger.debug("Obtaining lock {} in order to replicate request {} {}", (Object)method, (Object)uri);
            lock.lock();
            try {
                logger.debug("Lock {} obtained in order to replicate request {} {}", (Object)method, (Object)uri);
                AsyncClusterResponse asyncClusterResponse = this.replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
                return asyncClusterResponse;
            }
            finally {
                lock.unlock();
            }
        }
        return this.replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
    }

    @Override
    public AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers) {
        HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
        NiFiUser user = NiFiUserUtils.getNiFiUser();
        if (user != null && !user.isAnonymous()) {
            String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString((NiFiUser)user);
            updatedHeaders.put("X-ProxiedEntitiesChain", proxiedEntitiesChain);
        }
        return this.replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false);
    }

    private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification, StandardAsyncClusterResponse response, boolean executionPhase) {
        int numRequests;
        Objects.requireNonNull(nodeIds);
        Objects.requireNonNull(method);
        Objects.requireNonNull(uri);
        Objects.requireNonNull(entity);
        Objects.requireNonNull(headers);
        if (nodeIds.isEmpty()) {
            throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
        }
        for (NodeIdentifier nodeId2 : nodeIds) {
            NodeConnectionStatus status = this.clusterCoordinator.getConnectionStatus(nodeId2);
            if (status == null) {
                throw new UnknownNodeException("Node " + nodeId2 + " does not exist in this cluster");
            }
            if (status.getState() == NodeConnectionState.CONNECTED) continue;
            throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId2 + " because the node is not connected");
        }
        logger.debug("Replicating request {} {} with entity {} to {}; response is {}", new Object[]{method, uri, entity, nodeIds, response});
        HashMap<String, String> updatedHeaders = new HashMap<String, String>(headers);
        String requestId = updatedHeaders.computeIfAbsent("X-RequestTransactionId", key -> UUID.randomUUID().toString());
        if (performVerification) {
            this.verifyClusterState(method, uri.getPath());
        }
        if ((numRequests = this.responseMap.size()) >= 100) {
            numRequests = this.purgeExpiredRequests();
        }
        if (numRequests >= 100) {
            Map<String, Long> countsByUri = this.responseMap.values().stream().collect(Collectors.groupingBy(StandardAsyncClusterResponse::getURIPath, Collectors.counting()));
            logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", new Object[]{method, uri.getPath(), numRequests, countsByUri});
            throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
        }
        CompletionCallback completionCallback = clusterResponse -> this.onCompletedResponse(requestId);
        Runnable responseConsumedCallback = () -> this.onResponseConsumed(requestId);
        if (response == null) {
            response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, this.responseMerger, completionCallback, responseConsumedCallback);
            this.responseMap.put(requestId, response);
        }
        logger.debug("For Request ID {}, response object is {}", (Object)requestId, (Object)response);
        boolean mutableRequest = this.isMutableRequest(method, uri.getPath());
        if (mutableRequest && performVerification) {
            logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", (Object)requestId);
            this.performVerification(nodeIds, method, uri, entity, updatedHeaders, response);
            return response;
        }
        StandardAsyncClusterResponse finalResponse = response;
        NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
            logger.debug("Received response from {} for {} {}", new Object[]{nodeResponse.getNodeId(), method, uri.getPath()});
            finalResponse.add(nodeResponse);
        };
        if (mutableRequest && executionPhase) {
            updatedHeaders.put("X-Execution-Continue", "true");
        }
        Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, this.createURI(uri, (NodeIdentifier)nodeId), entity, updatedHeaders, nodeCompletionCallback);
        this.replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
        return response;
    }

    private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers, final StandardAsyncClusterResponse clusterResponse) {
        logger.debug("Verifying that mutable request {} {} can be made", (Object)method, (Object)uri.getPath());
        HashMap<String, String> validationHeaders = new HashMap<String, String>(headers);
        validationHeaders.put("X-Validation-Expects", "150-NodeContinue");
        final int numNodes = nodeIds.size();
        NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback(){
            final Set<NodeResponse> nodeResponses = Collections.synchronizedSet(new HashSet());

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onCompletion(NodeResponse nodeResponse) {
                block10: {
                    boolean allNodesResponded;
                    Set<NodeResponse> set = this.nodeResponses;
                    synchronized (set) {
                        this.nodeResponses.add(nodeResponse);
                        allNodesResponded = this.nodeResponses.size() == numNodes;
                    }
                    try {
                        if (!allNodesResponded) break block10;
                        final long dissentingCount = this.nodeResponses.stream().filter(p -> p.getStatus() != 150).count();
                        if (dissentingCount == 0L) {
                            logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", new Object[]{numNodes, method, uri.getPath()});
                            ThreadPoolRequestReplicator.this.replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true);
                            return;
                        }
                        final HashMap<String, String> cancelLockHeaders = new HashMap<String, String>(headers);
                        cancelLockHeaders.put("X-Cancel-Transaction", "true");
                        Thread cancelLockThread = new Thread(new Runnable(){

                            @Override
                            public void run() {
                                logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", new Object[]{dissentingCount, method, uri.getPath()});
                                Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, ThreadPoolRequestReplicator.this.createURI(uri, nodeId), entity, cancelLockHeaders, null);
                                ThreadPoolRequestReplicator.this.replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                            }
                        });
                        cancelLockThread.setName("Cancel Flow Locks");
                        cancelLockThread.start();
                        for (NodeResponse response : this.nodeResponses) {
                            String message;
                            if (response.getStatus() == 150) continue;
                            ClientResponse clientResponse = response.getClientResponse();
                            if (clientResponse == null) {
                                message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
                                logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", new Object[]{response.getStatus(), response.getNodeId(), method, uri.getPath()});
                            } else {
                                String nodeExplanation = (String)clientResponse.getEntity(String.class);
                                message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
                                logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", new Object[]{response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation});
                            }
                            Object failure = response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() ? (response.hasThrowable() ? new AccessDeniedException(message, response.getThrowable()) : new AccessDeniedException(message)) : (response.hasThrowable() ? new IllegalClusterStateException(message, response.getThrowable()) : new IllegalClusterStateException(message));
                            clusterResponse.setFailure((RuntimeException)failure);
                            break;
                        }
                    }
                    catch (Exception e) {
                        clusterResponse.add(new NodeResponse(nodeResponse.getNodeId(), method, uri, e));
                        for (NodeResponse otherResponse : this.nodeResponses) {
                            if (otherResponse.getNodeId().equals((Object)nodeResponse.getNodeId())) continue;
                            clusterResponse.add(otherResponse);
                        }
                    }
                }
            }
        };
        Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest((NodeIdentifier)nodeId, method, this.createURI(uri, (NodeIdentifier)nodeId), entity, validationHeaders, completionCallback);
        this.replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
    }

    @Override
    public AsyncClusterResponse getClusterResponse(String identifier) {
        AsyncClusterResponse response = (AsyncClusterResponse)this.responseMap.get(identifier);
        if (response == null) {
            return null;
        }
        return response;
    }

    protected NodeResponse replicateRequest(WebResource.Builder resourceBuilder, NodeIdentifier nodeId, String method, URI uri, String requestId, Map<String, String> headers) {
        ClientResponse clientResponse;
        long startNanos = System.nanoTime();
        logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", new Object[]{method, uri, requestId, headers});
        switch (method.toUpperCase()) {
            case "DELETE": {
                clientResponse = (ClientResponse)resourceBuilder.delete(ClientResponse.class);
                break;
            }
            case "GET": {
                clientResponse = (ClientResponse)resourceBuilder.get(ClientResponse.class);
                break;
            }
            case "HEAD": {
                clientResponse = resourceBuilder.head();
                break;
            }
            case "OPTIONS": {
                clientResponse = (ClientResponse)resourceBuilder.options(ClientResponse.class);
                break;
            }
            case "POST": {
                clientResponse = (ClientResponse)resourceBuilder.post(ClientResponse.class);
                break;
            }
            case "PUT": {
                clientResponse = (ClientResponse)resourceBuilder.put(ClientResponse.class);
                break;
            }
            default: {
                throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication.");
            }
        }
        return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
    }

    private boolean isMutableRequest(String method, String uriPath) {
        switch (method.toUpperCase()) {
            case "GET": 
            case "HEAD": 
            case "OPTIONS": {
                return false;
            }
        }
        return true;
    }

    private void verifyClusterState(String httpMethod, String uriPath) {
        boolean mutableRequest;
        boolean bl = mutableRequest = "DELETE".equals(httpMethod) || "POST".equals(httpMethod) || "PUT".equals(httpMethod);
        if (mutableRequest) {
            Map connectionStates = this.clusterCoordinator.getConnectionStates();
            if (connectionStates.containsKey(NodeConnectionState.DISCONNECTED) || connectionStates.containsKey(NodeConnectionState.DISCONNECTING)) {
                throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is disconnected from the cluster");
            }
            if (connectionStates.containsKey(NodeConnectionState.CONNECTING)) {
                throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster");
            }
        }
    }

    private void onResponseConsumed(String requestId) {
        this.responseMap.remove(requestId);
    }

    private void onCompletedResponse(String requestId) {
        AsyncClusterResponse response = (AsyncClusterResponse)this.responseMap.get(requestId);
        if (response != null && this.callback != null) {
            try {
                this.callback.afterRequest(response.getURIPath(), response.getMethod(), response.getCompletedNodeResponses());
            }
            catch (Exception e) {
                logger.warn("Completed request {} {} but failed to properly handle the Request Completion Callback due to {}", new Object[]{response.getMethod(), response.getURIPath(), e.toString()});
                logger.warn("", (Throwable)e);
            }
        }
        if (response != null && logger.isDebugEnabled()) {
            this.logTimingInfo(response);
        }
        Set<NodeIdentifier> slowResponseNodes = ResponseUtils.findLongResponseTimes(response, 1.5);
        for (NodeIdentifier nodeId : response.getNodesInvolved()) {
            AtomicInteger counter = this.sequentialLongRequestCounts.computeIfAbsent(nodeId, id -> new AtomicInteger(0));
            if (slowResponseNodes.contains(nodeId)) {
                int sequentialLongRequests = counter.incrementAndGet();
                if (sequentialLongRequests < 3) continue;
                String message = "Response time from " + nodeId + " was slow for each of the last 3 requests made. " + "To see more information about timing, enable DEBUG logging for " + logger.getName();
                logger.warn(message);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.WARNING, "Node Response Time", message);
                }
                counter.set(0);
                continue;
            }
            counter.set(0);
        }
    }

    private void logTimingInfo(AsyncClusterResponse response) {
        LongSummaryStatistics stats = response.getNodesInvolved().stream().map(p -> response.getNodeResponse((NodeIdentifier)p).getRequestDuration(TimeUnit.MILLISECONDS)).collect(Collectors.summarizingLong(Long::longValue));
        StringBuilder sb = new StringBuilder();
        sb.append("Node Responses for ").append(response.getMethod()).append(" ").append(response.getURIPath()).append(" (Request ID ").append(response.getRequestIdentifier()).append("):\n");
        for (NodeIdentifier node : response.getNodesInvolved()) {
            sb.append(node).append(": ").append(response.getNodeResponse(node).getRequestDuration(TimeUnit.MILLISECONDS)).append(" millis\n");
        }
        logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", new Object[]{response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage()});
        logger.debug(sb.toString());
    }

    private void replicateRequest(Set<NodeIdentifier> nodeIds, String scheme, String path, Function<NodeIdentifier, NodeHttpRequest> callableFactory, Map<String, String> headers) {
        if (nodeIds.isEmpty()) {
            return;
        }
        for (NodeIdentifier nodeId : nodeIds) {
            NodeHttpRequest callable = callableFactory.apply(nodeId);
            this.executorService.submit(callable);
        }
    }

    private URI createURI(URI exampleUri, NodeIdentifier nodeId) {
        return this.createURI(exampleUri.getScheme(), nodeId.getApiAddress(), nodeId.getApiPort(), exampleUri.getPath(), exampleUri.getQuery());
    }

    private URI createURI(String scheme, String nodeApiAddress, int nodeApiPort, String path, String query) {
        try {
            return new URI(scheme, null, nodeApiAddress, nodeApiPort, path, query, null);
        }
        catch (URISyntaxException e) {
            throw new UriConstructionException(e);
        }
    }

    private synchronized int purgeExpiredRequests() {
        Set<String> expiredRequestIds = this.responseMap.entrySet().stream().filter(entry -> ((StandardAsyncClusterResponse)entry.getValue()).isOlderThan(30L, TimeUnit.SECONDS)).filter(entry -> ((StandardAsyncClusterResponse)entry.getValue()).isComplete()).map(entry -> (String)entry.getKey()).collect(Collectors.toSet());
        expiredRequestIds.forEach(id -> this.onResponseConsumed((String)id));
        return this.responseMap.size();
    }

    private static interface NodeRequestCompletionCallback {
        public void onCompletion(NodeResponse var1);
    }

    private class NodeHttpRequest
    implements Runnable {
        private final NodeIdentifier nodeId;
        private final String method;
        private final URI uri;
        private final Object entity;
        private final Map<String, String> headers = new HashMap<String, String>();
        private final NodeRequestCompletionCallback callback;

        private NodeHttpRequest(NodeIdentifier nodeId, String method, URI uri, Object entity, Map<String, String> headers, NodeRequestCompletionCallback callback) {
            this.nodeId = nodeId;
            this.method = method;
            this.uri = uri;
            this.entity = entity;
            this.headers.putAll(headers);
            this.callback = callback;
        }

        @Override
        public void run() {
            NodeResponse nodeResponse;
            try {
                WebResource.Builder resourceBuilder = this.createResourceBuilder();
                String requestId = this.headers.get("x-nifi-request-id");
                logger.debug("Replicating request {} {} to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                nodeResponse = ThreadPoolRequestReplicator.this.replicateRequest(resourceBuilder, this.nodeId, this.method, this.uri, requestId, this.headers);
            }
            catch (Exception e) {
                nodeResponse = new NodeResponse(this.nodeId, this.method, this.uri, e);
                logger.warn("Failed to replicate request {} {} to {} due to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId, e});
                logger.warn("", (Throwable)e);
            }
            if (this.callback != null) {
                logger.debug("Request {} {} completed for {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                this.callback.onCompletion(nodeResponse);
            }
        }

        private WebResource.Builder createResourceBuilder() {
            WebResource.Builder builder;
            MultivaluedMapImpl map = new MultivaluedMapImpl();
            if (this.entity instanceof MultivaluedMap) {
                map.putAll((Map)this.entity);
            }
            WebResource resource = ThreadPoolRequestReplicator.this.client.resource(this.uri);
            if (ThreadPoolRequestReplicator.this.responseMerger.isResponseInterpreted(this.uri, this.method)) {
                resource.addFilter((ClientFilter)new GZIPContentEncodingFilter(false));
            }
            if ("DELETE".equalsIgnoreCase(this.method) || "HEAD".equalsIgnoreCase(this.method) || "GET".equalsIgnoreCase(this.method) || "OPTIONS".equalsIgnoreCase(this.method)) {
                resource = resource.queryParams((MultivaluedMap)map);
                builder = resource.getRequestBuilder();
            } else {
                builder = this.entity == null ? resource.entity((Object)map) : resource.entity(this.entity);
            }
            boolean foundContentType = false;
            for (Map.Entry<String, String> entry : this.headers.entrySet()) {
                builder.header(entry.getKey(), (Object)entry.getValue());
                if (!entry.getKey().equalsIgnoreCase("content-type")) continue;
                foundContentType = true;
            }
            if (!foundContentType) {
                builder.type("application/x-www-form-urlencoded");
            }
            return builder;
        }
    }
}

