package io.confluent.parallelconsumer.internal;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/ConsumerManager.class */
public class ConsumerManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final Consumer<K, V> consumer;
    private final Duration offsetCommitTimeout;
    private final Duration saslAuthenticationRetryTimeout;
    private final Duration saslAuthenticationRetryBackOff;
    private ConsumerGroupMetadata metaCache;
    private boolean commitRequested;
    private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final AtomicLong pendingRequests = new AtomicLong(0);
    private volatile int pausedPartitionSizeCache = 0;
    private int erroneousWakups = 0;
    private int correctPollWakeups = 0;
    private int noWakeups = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerRecords<K, V> poll(Duration duration) {
        Duration duration2 = duration;
        ConsumerRecords<K, V> consumerRecords = null;
        try {
            try {
                if (this.commitRequested) {
                    log.debug("Commit requested, so will not long poll as need to perform the commit");
                    duration2 = Duration.ofMillis(1L);
                    this.commitRequested = false;
                }
                this.pollingBroker.set(true);
                updateCache();
                log.debug("Poll starting with timeout: {}", duration2);
                Instant now = Instant.now();
                long j = 0;
                boolean z = false;
                try {
                    this.pendingRequests.addAndGet(1L);
                    while (!this.shutdownRequested.get()) {
                        j++;
                        try {
                            consumerRecords = this.consumer.poll(duration2);
                            z = true;
                            break;
                        } catch (SaslAuthenticationException e) {
                            if (!(Duration.between(now, Instant.now()).toMillis() < this.saslAuthenticationRetryTimeout.toMillis())) {
                                log.error("Poll error: SaslAuthenticationException. {} tries attempted, since {}", new Object[]{Long.valueOf(j), now, e});
                                throw e;
                            }
                            log.warn("Poll error: SaslAuthenticationException. Retrying ({})", Long.valueOf(j));
                            try {
                                retryBackOff(this.saslAuthenticationRetryBackOff.toMillis());
                            } catch (InterruptedException e2) {
                                throw new RuntimeException("Poll interrupted", e2);
                            }
                        }
                    }
                    if (z) {
                        log.debug("Poll completed normally (after timeout of {} on try {}) and returned {}...", new Object[]{duration2, Long.valueOf(j), Integer.valueOf(consumerRecords.count())});
                    } else {
                        log.debug("Poll did not completed (after timeout of {} and tries {}), shutdownRequested {}", new Object[]{duration2, Long.valueOf(j), Boolean.valueOf(this.shutdownRequested.get())});
                    }
                    this.pendingRequests.addAndGet(-1L);
                    updateCache();
                    this.pollingBroker.set(false);
                } catch (Throwable th) {
                    if (0 != 0) {
                        log.debug("Poll completed normally (after timeout of {} on try {}) and returned {}...", new Object[]{duration2, 0L, Integer.valueOf(consumerRecords.count())});
                    } else {
                        log.debug("Poll did not completed (after timeout of {} and tries {}), shutdownRequested {}", new Object[]{duration2, 0L, Boolean.valueOf(this.shutdownRequested.get())});
                    }
                    this.pendingRequests.addAndGet(-1L);
                    throw th;
                }
            } catch (Throwable th2) {
                this.pollingBroker.set(false);
                throw th2;
            }
        } catch (WakeupException e3) {
            this.correctPollWakeups++;
            log.debug("Awoken from broker poll");
            log.trace("Wakeup caller is:", e3);
            consumerRecords = new ConsumerRecords<>(UniMaps.of());
            this.pollingBroker.set(false);
        }
        return consumerRecords != null ? consumerRecords : new ConsumerRecords<>(UniMaps.of());
    }

    protected void updateCache() {
        this.metaCache = this.consumer.groupMetadata();
        this.pausedPartitionSizeCache = this.consumer.paused().size();
    }

    public void wakeup() {
        if (this.pollingBroker.get()) {
            log.debug("Waking up consumer");
            this.consumer.wakeup();
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        boolean z = true;
        this.noWakeups++;
        while (z) {
            try {
                try {
                    this.pendingRequests.addAndGet(1L);
                    long j = 0;
                    while (true) {
                        if (j != 0 && this.shutdownRequested.get()) {
                            break;
                        }
                        j++;
                        Instant now = Instant.now();
                        try {
                            this.consumer.commitSync(map);
                            break;
                        } catch (SaslAuthenticationException e) {
                            if (!(Duration.between(now, Instant.now()).toMillis() <= this.saslAuthenticationRetryTimeout.toMillis())) {
                                log.error("Offset commit failed due to SaslAuthenticationException (tried {} times)", Long.valueOf(j));
                                throw e;
                            }
                            log.warn("Encountered SaslAuthenticationException while committing offset. Retrying ({})", Long.valueOf(j));
                            try {
                                retryBackOff(this.saslAuthenticationRetryBackOff.toMillis());
                            } catch (InterruptedException e2) {
                                log.warn("Offset Commit was interrupted", e2);
                                throw new RuntimeException("Offset Commit was interrupted");
                            }
                        } catch (CommitFailedException e3) {
                            log.warn("Failed to commit offset due to group rebalancing. Will ignore the error for now.", e3);
                        } catch (TimeoutException e4) {
                            if (!(Duration.between(now, Instant.now()).toMillis() <= this.offsetCommitTimeout.toMillis())) {
                                log.error("Offset commit took too long due to TimeoutException (tried {} times)", Long.valueOf(j));
                                throw e4;
                            }
                            log.warn("Encountered timeout while committing offset. Retrying ({})", Long.valueOf(j));
                        }
                    }
                } catch (Throwable th) {
                    this.pendingRequests.addAndGet(-1L);
                    throw th;
                }
            } catch (WakeupException e5) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, e5);
                this.erroneousWakups++;
                this.pendingRequests.addAndGet(-1L);
            }
        }
        return;
        z = false;
        this.pendingRequests.addAndGet(-1L);
    }

    private boolean retryBackOff(long j) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(100);
            if (this.shutdownRequested.get()) {
                return false;
            }
        }
        return true;
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        boolean z = true;
        this.noWakeups++;
        while (z) {
            try {
                this.consumer.commitAsync(map, offsetCommitCallback);
                z = false;
            } catch (WakeupException e) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, e);
                this.erroneousWakups++;
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.metaCache;
    }

    public void signalStop() {
        if (this.shutdownRequested.get()) {
            return;
        }
        log.info("Signaling Consumer Manager to stop");
        this.shutdownRequested.set(true);
    }

    public void close(Duration duration) {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        log.debug("Consumer Manager Closing...");
        this.shutdownRequested.set(true);
        log.debug("ConsumerManager close waiting for max of {} for pending requests to complete", duration);
        while (this.pendingRequests.get() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                throw new RuntimeException("Wait interrupted");
            }
        }
        log.debug("ConsumerManager close wait completed.");
        this.consumer.close(duration);
        log.debug("ConsumerManager closed");
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public void pause(Set<TopicPartition> set) {
        this.consumer.pause(set);
    }

    public Set<TopicPartition> paused() {
        return this.consumer.paused();
    }

    public int getPausedPartitionSize() {
        return this.pausedPartitionSizeCache;
    }

    public void resume(Set<TopicPartition> set) {
        this.consumer.resume(set);
    }

    public void onCommitRequested() {
        this.commitRequested = true;
    }

    public ConsumerManager(Consumer<K, V> consumer, Duration duration, Duration duration2, Duration duration3) {
        this.consumer = consumer;
        this.offsetCommitTimeout = duration;
        this.saslAuthenticationRetryTimeout = duration2;
        this.saslAuthenticationRetryBackOff = duration3;
    }
}
