package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.class */
public class ConsumerNetworkClient implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
    private static final long MAX_POLL_TIMEOUT_MS = 5000;
    private final KafkaClient client;
    private final Metadata metadata;
    private final Time time;
    private final long retryBackoffMs;
    private final long unsentExpiryMs;
    private final Map<Node, List<ClientRequest>> unsent = new HashMap();
    private int wakeupDisabledCount = 0;
    private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean wakeup = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient$PollCondition.class */
    public interface PollCondition {
        boolean shouldBlock();
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient$RequestFutureCompletionHandler.class */
    public class RequestFutureCompletionHandler implements RequestCompletionHandler {
        private final RequestFuture<ClientResponse> future = new RequestFuture<>();
        private ClientResponse response;
        private RuntimeException e;

        public RequestFutureCompletionHandler() {
        }

        public void fireCompletion() {
            if (this.e != null) {
                this.future.raise(this.e);
                return;
            }
            if (!this.response.wasDisconnected()) {
                this.future.complete(this.response);
                return;
            }
            ClientRequest request = this.response.request();
            RequestSend request2 = request.request();
            ConsumerNetworkClient.log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", new Object[]{ApiKeys.forId(request2.header().apiKey()), request, Integer.valueOf(request2.header().correlationId()), request2.destination()});
            this.future.raise(DisconnectException.INSTANCE);
        }

        public void onFailure(RuntimeException runtimeException) {
            this.e = runtimeException;
            ConsumerNetworkClient.this.pendingCompletion.add(this);
        }

        @Override // org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            this.response = clientResponse;
            ConsumerNetworkClient.this.pendingCompletion.add(this);
        }
    }

    public ConsumerNetworkClient(KafkaClient kafkaClient, Metadata metadata, Time time, long j, long j2) {
        this.client = kafkaClient;
        this.metadata = metadata;
        this.time = time;
        this.retryBackoffMs = j;
        this.unsentExpiryMs = j2;
    }

    public RequestFuture<ClientResponse> send(Node node, ApiKeys apiKeys, AbstractRequest abstractRequest) {
        return send(node, apiKeys, ProtoUtils.latestVersion(apiKeys.id), abstractRequest);
    }

    private RequestFuture<ClientResponse> send(Node node, ApiKeys apiKeys, short s, AbstractRequest abstractRequest) {
        long milliseconds = this.time.milliseconds();
        RequestFutureCompletionHandler requestFutureCompletionHandler = new RequestFutureCompletionHandler();
        put(node, new ClientRequest(milliseconds, true, new RequestSend(node.idString(), this.client.nextRequestHeader(apiKeys, s), abstractRequest.toStruct()), requestFutureCompletionHandler));
        this.client.wakeup();
        return requestFutureCompletionHandler.future;
    }

    private void put(Node node, ClientRequest clientRequest) {
        synchronized (this) {
            List<ClientRequest> list = this.unsent.get(node);
            if (list == null) {
                list = new ArrayList();
                this.unsent.put(node, list);
            }
            list.add(clientRequest);
        }
    }

    public Node leastLoadedNode() {
        Node leastLoadedNode;
        synchronized (this) {
            leastLoadedNode = this.client.leastLoadedNode(this.time.milliseconds());
        }
        return leastLoadedNode;
    }

    public void awaitMetadataUpdate() {
        awaitMetadataUpdate(Long.MAX_VALUE);
    }

    public boolean awaitMetadataUpdate(long j) {
        long milliseconds = this.time.milliseconds();
        int requestUpdate = this.metadata.requestUpdate();
        do {
            poll(j);
            if (this.metadata.version() != requestUpdate) {
                break;
            }
        } while (this.time.milliseconds() - milliseconds < j);
        return this.metadata.version() > requestUpdate;
    }

    public void ensureFreshMetadata() {
        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(this.time.milliseconds()) == 0) {
            awaitMetadataUpdate();
        }
    }

    public void wakeup() {
        this.wakeup.set(true);
        this.client.wakeup();
    }

    public void poll(RequestFuture<?> requestFuture) {
        while (!requestFuture.isDone()) {
            poll(MAX_POLL_TIMEOUT_MS, this.time.milliseconds(), requestFuture);
        }
    }

    public boolean poll(RequestFuture<?> requestFuture, long j) {
        long milliseconds = this.time.milliseconds();
        long j2 = j;
        long j3 = milliseconds;
        do {
            poll(j2, j3, requestFuture);
            j3 = this.time.milliseconds();
            j2 = j - (j3 - milliseconds);
            if (requestFuture.isDone()) {
                break;
            }
        } while (j2 > 0);
        return requestFuture.isDone();
    }

    public void poll(long j) {
        poll(j, this.time.milliseconds(), null);
    }

    public void poll(long j, long j2, PollCondition pollCondition) {
        firePendingCompletedRequests();
        synchronized (this) {
            trySend(j2);
            if (pollCondition == null || pollCondition.shouldBlock()) {
                if (this.client.inFlightRequestCount() == 0) {
                    j = Math.min(j, this.retryBackoffMs);
                }
                this.client.poll(Math.min(MAX_POLL_TIMEOUT_MS, j), j2);
                j2 = this.time.milliseconds();
            } else {
                this.client.poll(0L, j2);
            }
            checkDisconnects(j2);
            maybeTriggerWakeup();
            trySend(j2);
            failExpiredRequests(j2);
        }
        firePendingCompletedRequests();
    }

    public void pollNoWakeup() {
        disableWakeups();
        try {
            poll(0L, this.time.milliseconds(), null);
            enableWakeups();
        } catch (Throwable th) {
            enableWakeups();
            throw th;
        }
    }

    public void awaitPendingRequests(Node node) {
        while (pendingRequestCount(node) > 0) {
            poll(this.retryBackoffMs);
        }
    }

    public int pendingRequestCount(Node node) {
        int size;
        synchronized (this) {
            List<ClientRequest> list = this.unsent.get(node);
            size = (list == null ? 0 : list.size()) + this.client.inFlightRequestCount(node.idString());
        }
        return size;
    }

    public int pendingRequestCount() {
        int inFlightRequestCount;
        synchronized (this) {
            int i = 0;
            Iterator<List<ClientRequest>> it = this.unsent.values().iterator();
            while (it.hasNext()) {
                i += it.next().size();
            }
            inFlightRequestCount = i + this.client.inFlightRequestCount();
        }
        return inFlightRequestCount;
    }

    private void firePendingCompletedRequests() {
        boolean z;
        boolean z2 = false;
        while (true) {
            z = z2;
            RequestFutureCompletionHandler poll = this.pendingCompletion.poll();
            if (poll == null) {
                break;
            }
            poll.fireCompletion();
            z2 = true;
        }
        if (z) {
            this.client.wakeup();
        }
    }

    private void checkDisconnects(long j) {
        Iterator<Map.Entry<Node, List<ClientRequest>>> it = this.unsent.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Node, List<ClientRequest>> next = it.next();
            if (this.client.connectionFailed(next.getKey())) {
                it.remove();
                for (ClientRequest clientRequest : next.getValue()) {
                    ((RequestFutureCompletionHandler) clientRequest.callback()).onComplete(new ClientResponse(clientRequest, j, true, null));
                }
            }
        }
    }

    private void failExpiredRequests(long j) {
        Iterator<Map.Entry<Node, List<ClientRequest>>> it = this.unsent.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Node, List<ClientRequest>> next = it.next();
            Iterator<ClientRequest> it2 = next.getValue().iterator();
            while (it2.hasNext()) {
                ClientRequest next2 = it2.next();
                if (next2.createdTimeMs() >= j - this.unsentExpiryMs) {
                    break;
                }
                ((RequestFutureCompletionHandler) next2.callback()).onFailure(new TimeoutException("Failed to send request after " + this.unsentExpiryMs + " ms."));
                it2.remove();
            }
            if (next.getValue().isEmpty()) {
                it.remove();
            }
        }
    }

    public void failUnsentRequests(Node node, RuntimeException runtimeException) {
        synchronized (this) {
            List<ClientRequest> remove = this.unsent.remove(node);
            if (remove != null) {
                Iterator<ClientRequest> it = remove.iterator();
                while (it.hasNext()) {
                    ((RequestFutureCompletionHandler) it.next().callback()).onFailure(runtimeException);
                }
            }
        }
        firePendingCompletedRequests();
    }

    private boolean trySend(long j) {
        boolean z = false;
        for (Map.Entry<Node, List<ClientRequest>> entry : this.unsent.entrySet()) {
            Node key = entry.getKey();
            Iterator<ClientRequest> it = entry.getValue().iterator();
            while (it.hasNext()) {
                ClientRequest next = it.next();
                if (this.client.ready(key, j)) {
                    this.client.send(next, j);
                    it.remove();
                    z = true;
                }
            }
        }
        return z;
    }

    private void maybeTriggerWakeup() {
        if (this.wakeupDisabledCount == 0 && this.wakeup.get()) {
            this.wakeup.set(false);
            throw new WakeupException();
        }
    }

    public void disableWakeups() {
        synchronized (this) {
            this.wakeupDisabledCount++;
        }
    }

    public void enableWakeups() {
        synchronized (this) {
            if (this.wakeupDisabledCount <= 0) {
                throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
            }
            this.wakeupDisabledCount--;
            if (this.wakeupDisabledCount == 0 && this.wakeup.get()) {
                this.client.wakeup();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this) {
            this.client.close();
        }
    }

    public boolean connectionFailed(Node node) {
        boolean connectionFailed;
        synchronized (this) {
            connectionFailed = this.client.connectionFailed(node);
        }
        return connectionFailed;
    }

    public void tryConnect(Node node) {
        synchronized (this) {
            this.client.ready(node, this.time.milliseconds());
        }
    }
}
