package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.class */
public class ConsumerNetworkThread extends KafkaThread implements Closeable {
    static final long MAX_POLL_TIMEOUT_MS = 5000;
    private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
    private final Time time;
    private final Logger log;
    private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier;
    private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier;
    private final Supplier<RequestManagers> requestManagersSupplier;
    private ApplicationEventProcessor applicationEventProcessor;
    private NetworkClientDelegate networkClientDelegate;
    private RequestManagers requestManagers;
    private volatile boolean running;
    private final IdempotentCloser closer;
    private volatile Duration closeTimeout;
    private volatile long cachedMaximumTimeToWait;

    public ConsumerNetworkThread(LogContext logContext, Time time, Supplier<ApplicationEventProcessor> supplier, Supplier<NetworkClientDelegate> supplier2, Supplier<RequestManagers> supplier3) {
        super(BACKGROUND_THREAD_NAME, true);
        this.closer = new IdempotentCloser();
        this.closeTimeout = Duration.ofMillis(ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS);
        this.cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
        this.time = time;
        this.log = logContext.logger(getClass());
        this.applicationEventProcessorSupplier = supplier;
        this.networkClientDelegateSupplier = supplier2;
        this.requestManagersSupplier = supplier3;
        this.running = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.log.debug("Consumer network thread started");
            initializeResources();
            while (this.running) {
                try {
                    runOnce();
                } catch (Throwable th) {
                    this.log.error("Unexpected error caught in consumer network thread", th);
                }
            }
        } finally {
            cleanup();
        }
    }

    void initializeResources() {
        this.applicationEventProcessor = this.applicationEventProcessorSupplier.get();
        this.networkClientDelegate = this.networkClientDelegateSupplier.get();
        this.requestManagers = this.requestManagersSupplier.get();
    }

    void runOnce() {
        this.applicationEventProcessor.process();
        long milliseconds = this.time.milliseconds();
        Stream map = this.requestManagers.entries().stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(requestManager -> {
            return requestManager.poll(milliseconds);
        });
        NetworkClientDelegate networkClientDelegate = this.networkClientDelegate;
        networkClientDelegate.getClass();
        this.networkClientDelegate.poll(((Long) map.map(networkClientDelegate::addAll).reduce(Long.valueOf(MAX_POLL_TIMEOUT_MS), (v0, v1) -> {
            return Math.min(v0, v1);
        })).longValue(), milliseconds);
        this.cachedMaximumTimeToWait = ((Long) this.requestManagers.entries().stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(requestManager2 -> {
            return Long.valueOf(requestManager2.maximumTimeToWait(milliseconds));
        }).reduce(Long.valueOf(NetworkClientDelegate.PollResult.WAIT_FOREVER), (v0, v1) -> {
            return Math.min(v0, v1);
        })).longValue();
    }

    static void runAtClose(Collection<Optional<? extends RequestManager>> collection, NetworkClientDelegate networkClientDelegate, Timer timer) {
        Stream map = collection.stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map((v0) -> {
            return v0.pollOnClose();
        });
        networkClientDelegate.getClass();
        map.forEach(networkClientDelegate::addAll);
    }

    public boolean isRunning() {
        return this.running;
    }

    public void wakeup() {
        if (this.networkClientDelegate != null) {
            this.networkClientDelegate.wakeup();
        }
    }

    public long maximumTimeToWait() {
        return this.cachedMaximumTimeToWait;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(this.closeTimeout);
    }

    public void close(Duration duration) {
        Objects.requireNonNull(duration, "Close timeout for consumer network thread must be non-null");
        this.closer.close(() -> {
            closeInternal(duration);
        }, () -> {
            this.log.warn("The consumer network thread was already closed");
        });
    }

    private void closeInternal(Duration duration) {
        this.log.trace("Signaling the consumer network thread to close in {}ms", Long.valueOf(duration.toMillis()));
        this.running = false;
        this.closeTimeout = duration;
        wakeup();
        try {
            join();
        } catch (InterruptedException e) {
            this.log.error("Interrupted while waiting for consumer network thread to complete", e);
        }
    }

    private void sendUnsentRequests(Timer timer) {
        if (this.networkClientDelegate.unsentRequests().isEmpty()) {
            return;
        }
        do {
            this.networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
            timer.update();
            if (!timer.notExpired()) {
                return;
            }
        } while (!this.networkClientDelegate.unsentRequests().isEmpty());
    }

    void cleanup() {
        this.log.trace("Closing the consumer network thread");
        Timer timer = this.time.timer(this.closeTimeout);
        try {
            runAtClose(this.requestManagers.entries(), this.networkClientDelegate, timer);
        } catch (Exception e) {
            this.log.error("Unexpected error during shutdown.  Proceed with closing.", e);
        } finally {
            sendUnsentRequests(timer);
            org.apache.kafka.common.utils.Utils.closeQuietly(this.requestManagers, "request managers");
            org.apache.kafka.common.utils.Utils.closeQuietly(this.networkClientDelegate, "network client delegate");
            org.apache.kafka.common.utils.Utils.closeQuietly(this.applicationEventProcessor, "application event processor");
            this.log.debug("Closed the consumer network thread");
        }
    }
}
