package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.SupplierUtils;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
import io.confluent.parallelconsumer.PCRetriableException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.class */
public abstract class AbstractParallelEoSStreamProcessor<K, V> implements ParallelConsumer<K, V>, ConsumerRebalanceListener, Closeable {
    public static final String MDC_INSTANCE_ID = "pcId";
    private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";
    protected final ParallelConsumerOptions<K, V> options;
    private Clock clock;
    private Instant lastCommitCheckTime;
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    protected final Supplier<ThreadPoolExecutor> workerThreadPool;
    private Optional<Future<Boolean>> controlThreadFuture;
    protected WorkManager<K, V> wm;
    private final BlockingQueue<ControllerEventMessage<K, V>> workMailBox;
    private final AtomicBoolean isRebalanceInProgress;
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private final List<Runnable> controlLoopHooks;
    private Thread blockableControlThread;
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox;
    private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown;
    private final OffsetCommitter committer;
    private final AtomicBoolean commitCommand;
    protected final DynamicLoadFactor dynamicExtraLoadFactor;
    private Exception failureReason;
    private Instant lastCommitTime;
    private State state;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener;
    private int numberOfAssignedPartitions;
    private final RateLimiter queueStatsLimiter;
    PCModule<K, V> module;
    private boolean lastWorkRequestWasFulfilled;
    private Timer userProcessingTimer;
    private Gauge loadFactorGauge;
    private Gauge statusGauge;
    private Duration shutdownTimeout;
    private Duration drainTimeout;
    private PCMetrics pcMetrics;
    private Optional<String> myId;
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessor.class);
    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
    public static final Duration GRACE_PERIOD_FOR_OVERALL_SHUTDOWN = Duration.ofSeconds(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$parallelconsumer$internal$DrainingCloseable$DrainingMode;
        static final /* synthetic */ int[] $SwitchMap$io$confluent$parallelconsumer$internal$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$State[State.DRAINING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$State[State.CLOSING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$io$confluent$parallelconsumer$internal$DrainingCloseable$DrainingMode = new int[DrainingCloseable.DrainingMode.values().length];
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$DrainingCloseable$DrainingMode[DrainingCloseable.DrainingMode.DRAIN.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$DrainingCloseable$DrainingMode[DrainingCloseable.DrainingMode.DONT_DRAIN.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor$ControllerEventMessage.class */
    public static final class ControllerEventMessage<K, V> {
        private final WorkContainer<K, V> workContainer;
        private final EpochAndRecordsMap<K, V> consumerRecords;

        private boolean isWorkResult() {
            return this.workContainer != null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isNewConsumerRecords() {
            return !isWorkResult();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <K, V> ControllerEventMessage<K, V> of(EpochAndRecordsMap<K, V> epochAndRecordsMap) {
            return new ControllerEventMessage<>(null, epochAndRecordsMap);
        }

        public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> workContainer) {
            return new ControllerEventMessage<>(workContainer, null);
        }

        public WorkContainer<K, V> getWorkContainer() {
            return this.workContainer;
        }

        public EpochAndRecordsMap<K, V> getConsumerRecords() {
            return this.consumerRecords;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ControllerEventMessage)) {
                return false;
            }
            ControllerEventMessage controllerEventMessage = (ControllerEventMessage) obj;
            WorkContainer<K, V> workContainer = getWorkContainer();
            WorkContainer<K, V> workContainer2 = controllerEventMessage.getWorkContainer();
            if (workContainer == null) {
                if (workContainer2 != null) {
                    return false;
                }
            } else if (!workContainer.equals(workContainer2)) {
                return false;
            }
            EpochAndRecordsMap<K, V> consumerRecords = getConsumerRecords();
            EpochAndRecordsMap<K, V> consumerRecords2 = controllerEventMessage.getConsumerRecords();
            return consumerRecords == null ? consumerRecords2 == null : consumerRecords.equals(consumerRecords2);
        }

        public int hashCode() {
            WorkContainer<K, V> workContainer = getWorkContainer();
            int hashCode = (1 * 59) + (workContainer == null ? 43 : workContainer.hashCode());
            EpochAndRecordsMap<K, V> consumerRecords = getConsumerRecords();
            return (hashCode * 59) + (consumerRecords == null ? 43 : consumerRecords.hashCode());
        }

        public String toString() {
            return "AbstractParallelEoSStreamProcessor.ControllerEventMessage(workContainer=" + getWorkContainer() + ", consumerRecords=" + getConsumerRecords() + ")";
        }

        private ControllerEventMessage(WorkContainer<K, V> workContainer, EpochAndRecordsMap<K, V> epochAndRecordsMap) {
            this.workContainer = workContainer;
            this.consumerRecords = epochAndRecordsMap;
        }
    }

    @Deprecated
    public void setTimeBetweenCommits(Duration duration) {
        this.options.setCommitInterval(duration);
    }

    @Deprecated
    public Duration getTimeBetweenCommits() {
        return this.options.getCommitInterval();
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public boolean isClosedOrFailed() {
        boolean z = this.state == State.CLOSED;
        boolean z2 = false;
        if (this.controlThreadFuture.isPresent()) {
            Future<Boolean> future = this.controlThreadFuture.get();
            z2 = future.isDone() || future.isCancelled();
        }
        return z || z2;
    }

    public Exception getFailureCause() {
        return this.failureReason;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        this(parallelConsumerOptions, new PCModule(parallelConsumerOptions));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions, PCModule<K, V> pCModule) {
        this.clock = TimeUtils.getClock();
        this.lastCommitCheckTime = Instant.now();
        this.controlThreadFuture = Optional.empty();
        this.workMailBox = new LinkedBlockingQueue();
        this.isRebalanceInProgress = new AtomicBoolean(false);
        this.controlLoopHooks = new ArrayList();
        this.currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
        this.awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean();
        this.commitCommand = new AtomicBoolean(false);
        this.state = State.UNUSED;
        this.usersConsumerRebalanceListener = Optional.empty();
        this.queueStatsLimiter = new RateLimiter();
        this.lastWorkRequestWasFulfilled = false;
        this.myId = Optional.empty();
        Objects.requireNonNull(parallelConsumerOptions, "Options must be supplied");
        this.module = pCModule;
        this.options = parallelConsumerOptions;
        this.shutdownTimeout = this.options.getShutdownTimeout();
        this.drainTimeout = this.options.getDrainTimeout();
        this.consumer = this.options.getConsumer();
        validateConfiguration();
        pCModule.setParallelEoSStreamProcessor(this);
        log.info("Confluent Parallel Consumer initialise... groupId: {}, Options: {}", parallelConsumerOptions.getConsumer().groupMetadata().groupId(), parallelConsumerOptions);
        this.pcMetrics = pCModule.pcMetrics();
        this.dynamicExtraLoadFactor = pCModule.dynamicExtraLoadFactor();
        this.workerThreadPool = SupplierUtils.memoize(() -> {
            return setupWorkerPool(parallelConsumerOptions.getMaxConcurrency());
        });
        this.wm = pCModule.workManager();
        this.brokerPollSubsystem = pCModule.brokerPoller(this);
        if (this.options.isProducerSupplied()) {
            this.producerManager = Optional.of(pCModule.producerManager());
            if (this.options.isUsingTransactionalProducer()) {
                this.committer = this.producerManager.get();
            } else {
                this.committer = this.brokerPollSubsystem;
            }
        } else {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
        }
        initMetrics();
    }

    private void initMetrics() {
        this.userProcessingTimer = this.pcMetrics.getTimerFromMetricDef(PCMetricsDef.USER_FUNCTION_PROCESSING_TIME, new Tag[0]);
        this.loadFactorGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.DYNAMIC_EXTRA_LOAD_FACTOR, this.dynamicExtraLoadFactor, (v0) -> {
            return v0.getCurrentFactor();
        }, new Tag[0]);
        this.statusGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PC_STATUS, this, abstractParallelEoSStreamProcessor -> {
            return abstractParallelEoSStreamProcessor.state.getValue();
        }, new Tag[0]);
        new ExecutorServiceMetrics(getWorkerThreadPool().get(), "pc-user-function-executor", PCMetricsDef.USER_FUNCTION_EXECUTOR_PREFIX, this.pcMetrics.getCommonTags()).bindTo(this.pcMetrics.getMeterRegistry());
    }

    private void validateConfiguration() {
        this.options.validate();
        checkGroupIdConfigured(this.consumer);
        checkNotSubscribed(this.consumer);
        checkAutoCommitIsDisabled(this.consumer);
    }

    private void checkGroupIdConfigured(Consumer<K, V> consumer) {
        try {
            consumer.groupMetadata();
        } catch (RuntimeException e) {
            throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a configured GroupId on your Consumer?", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ThreadPoolExecutor setupWorkerPool(int i) {
        ThreadFactory defaultThreadFactory;
        try {
            defaultThreadFactory = (ThreadFactory) InitialContext.doLookup(this.options.getManagedThreadFactory());
        } catch (NamingException e) {
            log.debug("Using Java SE Thread", e);
            defaultThreadFactory = Executors.defaultThreadFactory();
        }
        ThreadFactory threadFactory = defaultThreadFactory;
        ThreadFactory threadFactory2 = runnable -> {
            Thread newThread = threadFactory.newThread(runnable);
            String name = newThread.getName();
            newThread.setName("pc-" + name);
            getMyId().ifPresent(str -> {
                newThread.setName("pc-" + name + "-" + str);
            });
            return newThread;
        };
        ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
        return new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory2, abortPolicy);
    }

    private void checkNotSubscribed(Consumer<K, V> consumer) {
        if (consumer instanceof MockConsumer) {
            return;
        }
        Set subscription = consumer.subscription();
        Set assignment = consumer.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by the Parallel Consumer. Use " + getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Collection<String> collection) {
        log.debug("Subscribing to {}", collection);
        this.consumer.subscribe(collection, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", pattern);
        this.consumer.subscribe(pattern, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        log.debug("Subscribing to {}", collection);
        this.usersConsumerRebalanceListener = Optional.of(consumerRebalanceListener);
        this.consumer.subscribe(collection, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        log.debug("Subscribing to {}", pattern);
        this.usersConsumerRebalanceListener = Optional.of(consumerRebalanceListener);
        this.consumer.subscribe(pattern, this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.debug("Partitions revoked {}, state: {}", collection, this.state);
        this.isRebalanceInProgress.set(true);
        while (isTransactionCommittingInProgress()) {
            Thread.sleep(100L);
        }
        this.numberOfAssignedPartitions -= collection.size();
        try {
            try {
                commitOffsetsThatAreReady();
                this.wm.onPartitionsRevoked(collection);
                this.isRebalanceInProgress.set(false);
                try {
                    this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
                        consumerRebalanceListener.onPartitionsRevoked(collection);
                    });
                } catch (Exception e) {
                    throw new ExceptionInUserFunctionException("Error from rebalance listener function after #onPartitionsRevoked", e);
                }
            } catch (Exception e2) {
                throw new InternalRuntimeException("onPartitionsRevoked event error", e2);
            }
        } catch (Throwable th) {
            this.isRebalanceInProgress.set(false);
            throw th;
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.numberOfAssignedPartitions += collection.size();
        log.info("Assigned {} total ({} new) partition(s) {}", new Object[]{Integer.valueOf(this.numberOfAssignedPartitions), Integer.valueOf(collection.size()), collection});
        this.wm.onPartitionsAssigned(collection);
        this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
            consumerRebalanceListener.onPartitionsAssigned(collection);
        });
        notifySomethingToDo();
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.numberOfAssignedPartitions -= collection.size();
        this.wm.onPartitionsLost(collection);
        this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
            consumerRebalanceListener.onPartitionsLost(collection);
        });
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        try {
            Optional<Boolean> autoCommitEnabled = getAutoCommitEnabled(consumer);
            if (autoCommitEnabled.isPresent() && autoCommitEnabled.get().booleanValue()) {
                throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library.");
            }
            if (autoCommitEnabled.isPresent()) {
                return;
            }
            if (!this.options.isIgnoreReflectiveAccessExceptionsForAutoCommitDisabledCheck()) {
                throw new ParallelConsumerException("Unable to check whether auto commit is enabled for consumer type " + consumer.getClass() + ". This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option.");
            }
            log.warn("Unable to check whether auto commit is enabled for consumer type {}. Ignoring because ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck is enabled.", consumer.getClass());
        } catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException | NullPointerException e) {
            if (!this.options.isIgnoreReflectiveAccessExceptionsForAutoCommitDisabledCheck()) {
                throw new ParallelConsumerException("Failed to check whether auto commit is enabled for consumer type " + consumer.getClass() + ". This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option.");
            }
            log.warn("Failed to check whether auto commit is enabled for consumer type {}. Ignoring because ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck is enabled.", consumer.getClass(), e);
        }
    }

    private static Optional<Boolean> getAutoCommitEnabled(Consumer<?, ?> consumer) throws ClassNotFoundException, IllegalAccessException, NoSuchFieldException {
        Field field;
        if (consumer instanceof MockConsumer) {
            log.debug("Detected MockConsumer class which doesn't do auto commits");
            return Optional.of(false);
        }
        if (!(consumer instanceof KafkaConsumer)) {
            log.warn("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: {}", consumer.getClass());
            return Optional.of(false);
        }
        KafkaConsumer kafkaConsumer = (KafkaConsumer) consumer;
        try {
            field = KafkaConsumer.class.getDeclaredField("delegate");
            field.setAccessible(true);
        } catch (NoSuchFieldException e) {
            field = null;
        }
        if (field == null) {
            return Optional.of(Boolean.valueOf(getAutoCommitEnabledFromCoordinator(kafkaConsumer.getClass(), kafkaConsumer)));
        }
        Consumer consumer2 = (Consumer) field.get(kafkaConsumer);
        Objects.requireNonNull(consumer2, "Consumer delegate must not be null");
        if ("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer".equals(consumer2.getClass().getName())) {
            return Optional.of(Boolean.valueOf(getAutoCommitEnabledFromCoordinator(consumer2.getClass(), consumer2)));
        }
        if (!"org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer".equals(consumer2.getClass().getName())) {
            log.warn("Encountered unknown consumer delegate {}", consumer.getClass());
            return Optional.empty();
        }
        Field declaredField = consumer2.getClass().getDeclaredField("autoCommitEnabled");
        declaredField.setAccessible(true);
        return Optional.of(Boolean.valueOf(((Boolean) declaredField.get(consumer2)).booleanValue()));
    }

    private static <T extends Consumer, U extends Consumer<?, ?>> boolean getAutoCommitEnabledFromCoordinator(Class<T> cls, U u) throws IllegalAccessException, NoSuchFieldException {
        Field declaredField = cls.getDeclaredField("coordinator");
        declaredField.setAccessible(true);
        ConsumerCoordinator consumerCoordinator = (ConsumerCoordinator) declaredField.get(u);
        Objects.requireNonNull(consumerCoordinator, "Consumer coordinator must not be null. Ensure that group.id is configured for this consumer.");
        Field declaredField2 = consumerCoordinator.getClass().getDeclaredField("autoCommitEnabled");
        declaredField2.setAccessible(true);
        return ((Boolean) declaredField2.get(consumerCoordinator)).booleanValue();
    }

    @Override // io.confluent.parallelconsumer.internal.DrainingCloseable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeDontDrainFirst();
    }

    @Override // io.confluent.parallelconsumer.internal.DrainingCloseable
    public void close(Duration duration, DrainingCloseable.DrainingMode drainingMode) {
        this.shutdownTimeout = duration;
        close(drainingMode);
    }

    @Override // io.confluent.parallelconsumer.internal.DrainingCloseable
    public void close(DrainingCloseable.DrainingMode drainingMode) {
        if (this.state != State.CLOSED) {
            log.info("Signaling to close...");
            switch (AnonymousClass1.$SwitchMap$io$confluent$parallelconsumer$internal$DrainingCloseable$DrainingMode[drainingMode.ordinal()]) {
                case 1:
                    log.info("Will wait for all in flight to complete before");
                    transitionToDraining();
                    waitForClose(this.drainTimeout.plus(this.shutdownTimeout).plus(GRACE_PERIOD_FOR_OVERALL_SHUTDOWN));
                    break;
                case DynamicLoadFactor.DEFAULT_INITIAL_LOADING_FACTOR /* 2 */:
                    log.info("Not waiting for remaining queued to complete, will finish in flight, then close...");
                    transitionToClosing();
                    waitForClose(this.shutdownTimeout.plus(GRACE_PERIOD_FOR_OVERALL_SHUTDOWN));
                    break;
            }
        } else {
            log.info("Already closed, checking end state..");
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            this.controlThreadFuture.get().get(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration duration) throws TimeoutException, ExecutionException {
        Future<Boolean> future;
        log.info("Waiting on closed state...");
        while (!this.state.equals(State.CLOSED)) {
            try {
                future = this.controlThreadFuture.get();
                log.debug("Blocking on control future, for duration {} seconds", Long.valueOf(BackportUtils.toSeconds(duration)));
            } catch (InterruptedException e) {
                log.trace("Interrupted", e);
            } catch (ExecutionException | TimeoutException e2) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close without draining.", this.state, e2);
                throw e2;
            }
            if (!future.get(BackportUtils.toSeconds(duration), TimeUnit.SECONDS).booleanValue()) {
                throw new TimeoutException("Timeout waiting for system to close (" + duration + ")");
                break;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration duration) throws TimeoutException, ExecutionException, InterruptedException {
        try {
            try {
                innerDoClose(duration);
                deregisterMeters();
                this.pcMetrics.close();
                log.debug("Close complete.");
                this.state = State.CLOSED;
                if (getFailureCause() != null) {
                    log.error("PC closed due to error: {}", getFailureCause(), (Object) null);
                }
            } catch (Exception e) {
                log.error("exception during close", e);
                throw e;
            }
        } catch (Throwable th) {
            deregisterMeters();
            this.pcMetrics.close();
            log.debug("Close complete.");
            this.state = State.CLOSED;
            if (getFailureCause() != null) {
                log.error("PC closed due to error: {}", getFailureCause(), (Object) null);
            }
            throw th;
        }
    }

    private void innerDoClose(Duration duration) throws TimeoutException, ExecutionException, InterruptedException {
        log.debug("Starting close process (state: {})...", this.state);
        this.brokerPollSubsystem.drain();
        log.debug("Shutting down execution pool...");
        this.workerThreadPool.get().getQueue().clear();
        this.workerThreadPool.get().shutdown();
        if (this.workerThreadPool.get().getActiveCount() > 0) {
            log.info("Inflight work in execution pool: {}, letting to finish on shutdown with timeout: {}", Integer.valueOf(this.workerThreadPool.get().getActiveCount()), duration);
        }
        log.debug("Awaiting worker pool termination...");
        this.awaitingInflightProcessingCompletionOnShutdown.getAndSet(true);
        boolean z = true;
        while (z) {
            log.debug("Still awaiting completion of inflight work");
            try {
                z = false;
                if (!this.workerThreadPool.get().awaitTermination(BackportUtils.toSeconds(duration), TimeUnit.SECONDS)) {
                    log.warn("Thread execution pool termination await timeout ({})! Were any processing jobs dead locked (test latch locks?) or otherwise stuck? Forcing shutdown of workers.", duration);
                    this.workerThreadPool.get().shutdownNow();
                    this.workerThreadPool.get().awaitTermination(BackportUtils.toSeconds(Duration.ofSeconds(1L)), TimeUnit.SECONDS);
                }
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
                z = true;
            }
        }
        this.awaitingInflightProcessingCompletionOnShutdown.getAndSet(false);
        if (this.workerThreadPool.get().getActiveCount() > 0) {
            log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", Integer.valueOf(this.workerThreadPool.get().getActiveCount()));
        }
        log.debug("Worker pool terminated.");
        processWorkCompleteMailBox(Duration.ZERO);
        if (Thread.currentThread().isInterrupted()) {
            log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition");
        }
        try {
            commitOffsetsThatAreReady();
        } catch (Exception e2) {
            log.warn("failed to commit during close sequence", e2);
        }
        log.debug("Closing and waiting for broker poll system...");
        try {
            this.brokerPollSubsystem.closeAndWait();
        } catch (Exception e3) {
            log.warn("failed to close brokerPollSubsystem during close sequence", e3);
        }
        try {
            maybeCloseConsumer();
        } catch (Exception e4) {
            log.warn("failed to maybeCloseConsumer during close sequence", e4);
        }
        this.producerManager.ifPresent(producerManager -> {
            producerManager.close(duration);
        });
    }

    private void deregisterMeters() {
        this.pcMetrics.removeMetersByPrefixAndCommonTags(PCMetricsDef.USER_FUNCTION_EXECUTOR_PREFIX);
    }

    private void maybeCloseConsumer() {
        if (isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean areMyThreadsDone = areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", Boolean.valueOf(isRecordsAwaitingProcessing), Boolean.valueOf(areMyThreadsDone));
        return isRecordsAwaitingProcessing || areMyThreadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.DRAINING;
        notifySomethingToDo();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R> void supervisorLoop(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) {
        ExecutorService newSingleThreadExecutor;
        if (this.state != State.UNUSED) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - you cannot call the poll* or pollAndProduce* methods more than once (they are asynchronous) (current state is {})", this.state));
        }
        this.state = State.RUNNING;
        this.brokerPollSubsystem.start(this.options.getManagedExecutorService());
        try {
            newSingleThreadExecutor = (ExecutorService) InitialContext.doLookup(this.options.getManagedExecutorService());
        } catch (NamingException e) {
            log.debug("Using Java SE Thread", e);
            newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        }
        this.controlThreadFuture = Optional.of(newSingleThreadExecutor.submit(() -> {
            addInstanceMDC();
            log.info("Control loop starting up...");
            Thread currentThread = Thread.currentThread();
            currentThread.setName("pc-control");
            getMyId().ifPresent(str -> {
                currentThread.setName("pc-control-" + str);
            });
            this.blockableControlThread = currentThread;
            while (this.state != State.CLOSED) {
                log.debug("Control loop start");
                try {
                    controlLoop(function, consumer);
                } catch (InterruptedException e2) {
                    log.debug("Control loop interrupted, closing");
                    Thread.interrupted();
                    doClose(this.shutdownTimeout);
                } catch (Exception e3) {
                    if (Thread.interrupted()) {
                        log.debug("Thread interrupted flag cleared in control loop error handling");
                    }
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e3.getMessage(), e3);
                    this.failureReason = new RuntimeException("Error from poll control thread: " + e3.getMessage(), e3);
                    doClose(this.shutdownTimeout);
                    throw this.failureReason;
                }
            }
            log.info("Control loop ending clean (state:{})...", this.state);
            return true;
        }));
    }

    private void addInstanceMDC() {
        this.myId.ifPresent(str -> {
            MDC.put(MDC_INSTANCE_ID, str);
        });
    }

    protected <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) throws TimeoutException, ExecutionException, InterruptedException {
        maybeWakeupPoller();
        boolean maybeAcquireCommitLock = maybeAcquireCommitLock();
        processWorkCompleteMailBox(maybeAcquireCommitLock ? Duration.ZERO : getTimeToBlockFor());
        if (maybeAcquireCommitLock) {
            commitOffsetsThatAreReady();
        }
        retrieveAndDistributeNewWork(function, consumer);
        log.trace("Loop: Running {} loop end plugin(s)", Integer.valueOf(this.controlLoopHooks.size()));
        this.controlLoopHooks.forEach((v0) -> {
            v0.run();
        });
        log.trace("Current state: {}", this.state);
        switch (AnonymousClass1.$SwitchMap$io$confluent$parallelconsumer$internal$State[this.state.ordinal()]) {
            case 1:
                drain();
                break;
            case DynamicLoadFactor.DEFAULT_INITIAL_LOADING_FACTOR /* 2 */:
                doClose(this.shutdownTimeout);
                break;
        }
        this.brokerPollSubsystem.supervise();
        try {
            Thread.sleep(Duration.ofMillis(1L).toMillis());
        } catch (InterruptedException e) {
            log.trace("Woke up", e);
        }
        if (log.isTraceEnabled()) {
            log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}", new Object[]{Long.valueOf(this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection()), Long.valueOf(this.wm.getNumberOfIncompleteOffsets()), Integer.valueOf(this.wm.getNumberRecordsOutForProcessing()), this.state});
        }
    }

    private void maybeWakeupPoller() {
        if (this.state == State.RUNNING && !this.wm.isSufficientlyLoaded() && this.brokerPollSubsystem.isPausedForThrottling()) {
            log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs target: {})", Long.valueOf(this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection()), Integer.valueOf(this.options.getTargetAmountOfRecordsInFlight()));
            this.brokerPollSubsystem.wakeupIfPaused();
        }
    }

    private boolean maybeAcquireCommitLock() throws TimeoutException, InterruptedException {
        boolean z = isTimeToCommitNow() && this.wm.isDirty() && !this.isRebalanceInProgress.get();
        if (z && this.options.isUsingTransactionCommitMode()) {
            log.debug("Acquiring commit lock pessimistically, before we try to collect offsets for committing");
            this.producerManager.get().preAcquireOffsetsToCommit();
        }
        return z;
    }

    private <R> int retrieveAndDistributeNewWork(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) {
        checkPipelinePressure();
        int i = 0;
        if (this.state == State.RUNNING || this.state == State.DRAINING) {
            int calculateQuantityToRequest = calculateQuantityToRequest();
            List<WorkContainer<K, V>> workIfAvailable = this.wm.getWorkIfAvailable(calculateQuantityToRequest);
            i = workIfAvailable.size();
            this.lastWorkRequestWasFulfilled = i >= calculateQuantityToRequest;
            log.trace("Loop: Submit to pool");
            submitWorkToPool(function, consumer, workIfAvailable);
        }
        this.queueStatsLimiter.performIfNotLimited(() -> {
            int numberOfUserFunctionsQueued = getNumberOfUserFunctionsQueued();
            log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}", new Object[]{Integer.valueOf(this.workerThreadPool.get().getActiveCount()), Integer.valueOf(numberOfUserFunctionsQueued), Integer.valueOf(numberOfUserFunctionsQueued), Integer.valueOf(getPoolLoadTarget()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
        });
        return i;
    }

    protected <R> void submitWorkToPool(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer, List<WorkContainer<K, V>> list) {
        if (this.state.equals(State.CLOSING) || this.state.equals(State.CLOSED)) {
            log.debug("Not submitting new work as Parallel Consumer is in {} state, incoming work: {}, Pool stats: {}", new Object[]{this.state, Integer.valueOf(list.size()), this.workerThreadPool.get()});
        }
        if (list.isEmpty()) {
            return;
        }
        log.debug("New work incoming: {}, Pool stats: {}", Integer.valueOf(list.size()), this.workerThreadPool.get());
        List<List<WorkContainer<K, V>>> makeBatches = makeBatches(list);
        if (log.isDebugEnabled()) {
            List list2 = (List) makeBatches.stream().map((v0) -> {
                return v0.size();
            }).sorted().collect(Collectors.toList());
            log.debug("Number batches: {}, smallest {}, sizes {}", new Object[]{Integer.valueOf(makeBatches.size()), list2.stream().findFirst().get(), list2});
            List list3 = (List) list2.stream().filter(num -> {
                return num.intValue() < this.options.getBatchSize().intValue();
            }).collect(Collectors.toList());
            if (list3.size() > 1) {
                log.warn("More than one batch isn't target size: {}. Input number of batches: {}", list3, Integer.valueOf(makeBatches.size()));
            }
        }
        Iterator<List<WorkContainer<K, V>>> it = makeBatches.iterator();
        while (it.hasNext()) {
            submitWorkToPoolInner(function, consumer, it.next());
        }
    }

    private <R> void submitWorkToPoolInner(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer, List<WorkContainer<K, V>> list) {
        log.trace("Sending work ({}) to pool", list);
        Future<List<?>> submit = this.workerThreadPool.get().submit(() -> {
            addInstanceMDC();
            return runUserFunction(function, consumer, list);
        });
        Iterator<WorkContainer<K, V>> it = list.iterator();
        while (it.hasNext()) {
            it.next().setFuture(submit);
        }
    }

    private List<List<WorkContainer<K, V>>> makeBatches(List<WorkContainer<K, V>> list) {
        return partition(list, this.options.getBatchSize().intValue());
    }

    private static <T> List<List<T>> partition(Collection<T> collection, int i) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            arrayList2.add(it.next());
            if (arrayList2.size() == i) {
                arrayList.add(arrayList2);
                arrayList2 = new ArrayList();
            }
        }
        if (!arrayList2.isEmpty()) {
            arrayList.add(arrayList2);
        }
        if (log.isDebugEnabled()) {
            log.debug("sourceCollection.size() {}, batches: {}, batch sizes {}", new Object[]{Integer.valueOf(collection.size()), Integer.valueOf(arrayList.size()), arrayList.stream().map((v0) -> {
                return v0.size();
            }).collect(Collectors.toList())});
        }
        return arrayList;
    }

    protected int calculateQuantityToRequest() {
        int intValue;
        int targetOutForProcessing = getTargetOutForProcessing();
        int numberRecordsOutForProcessing = this.wm.getNumberRecordsOutForProcessing();
        int i = targetOutForProcessing - numberRecordsOutForProcessing;
        if (this.options.isUsingBatching() && (intValue = i % this.options.getBatchSize().intValue()) > 0) {
            i += targetOutForProcessing - intValue;
        }
        log.debug("Will try to get work - target: {}, current queue size: {}, requesting: {}, loading factor: {}", new Object[]{Integer.valueOf(targetOutForProcessing), Integer.valueOf(numberRecordsOutForProcessing), Integer.valueOf(i), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
        return i;
    }

    protected int getTargetOutForProcessing() {
        return getQueueTargetLoaded();
    }

    protected int getQueueTargetLoaded() {
        return getPoolLoadTarget() * this.dynamicExtraLoadFactor.getCurrentFactor();
    }

    protected void checkPipelinePressure() {
        if (log.isTraceEnabled()) {
            log.trace("Queue pressure check: (current size: {}, loaded target: {}, factor: {}) if (isPoolQueueLow() {} && lastWorkRequestWasFulfilled {}))", new Object[]{Integer.valueOf(getNumberOfUserFunctionsQueued()), Integer.valueOf(getQueueTargetLoaded()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor()), Boolean.valueOf(isPoolQueueLow()), Boolean.valueOf(this.lastWorkRequestWasFulfilled)});
        }
        if (isPoolQueueLow() && this.lastWorkRequestWasFulfilled) {
            if (this.dynamicExtraLoadFactor.maybeStepUp()) {
                log.debug("isPoolQueueLow(): Executor pool queue is not loaded with enough work (queue: {} vs target: {}), stepped up loading factor to {}", new Object[]{Integer.valueOf(getNumberOfUserFunctionsQueued()), Integer.valueOf(getPoolLoadTarget()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
            } else if (this.dynamicExtraLoadFactor.isMaxReached()) {
                log.warn("isPoolQueueLow(): Max loading factor steps reached: {}/{}", Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor()), Integer.valueOf(this.dynamicExtraLoadFactor.getMaxFactor()));
            }
        }
    }

    private int getPoolLoadTarget() {
        return this.options.getTargetAmountOfRecordsInFlight();
    }

    private boolean isPoolQueueLow() {
        int numberOfUserFunctionsQueued = getNumberOfUserFunctionsQueued();
        int poolLoadTarget = getPoolLoadTarget();
        boolean z = numberOfUserFunctionsQueued <= poolLoadTarget;
        log.debug("isPoolQueueLow()? workAmountBelowTarget {} {} vs {};", new Object[]{Boolean.valueOf(z), Integer.valueOf(numberOfUserFunctionsQueued), Integer.valueOf(poolLoadTarget)});
        return z;
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (isRecordsAwaitingProcessing()) {
            log.debug("Records still waiting processing, won't transition to closing.");
        } else {
            transitionToClosing();
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        if (this.state == State.UNUSED) {
            this.state = State.CLOSED;
        } else {
            this.state = State.CLOSING;
        }
        notifySomethingToDo();
    }

    protected void processWorkCompleteMailBox(Duration duration) {
        log.trace("Processing mailbox (might block waiting for results)...");
        ArrayDeque<ControllerEventMessage> arrayDeque = new ArrayDeque();
        if (duration.toMillis() > 0) {
            this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
            if (log.isDebugEnabled()) {
                log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}", new Object[]{duration, Integer.valueOf(this.workerThreadPool.get().getActiveCount()), Integer.valueOf(getNumberOfUserFunctionsQueued())});
            }
            log.trace("Blocking poll {}", duration);
            try {
                try {
                    ControllerEventMessage<K, V> poll = this.workMailBox.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        log.debug("Mailbox results returned null, indicating timeToBlockFor elapsed (which was set as {})", duration);
                    } else {
                        log.debug("Work arrived in mailbox during blocking poll. (Timeout was set as {})", duration);
                        arrayDeque.add(poll);
                    }
                    this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
                } catch (InterruptedException e) {
                    log.debug("Interrupted waiting on work results");
                    this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
                }
                log.trace("Blocking poll finish");
            } catch (Throwable th) {
                this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
                throw th;
            }
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", Integer.valueOf(size), Integer.valueOf(arrayDeque.size()));
        this.workMailBox.drainTo(arrayDeque, size);
        log.trace("Processing drained work {}...", Integer.valueOf(arrayDeque.size()));
        for (ControllerEventMessage controllerEventMessage : arrayDeque) {
            if (controllerEventMessage.isNewConsumerRecords()) {
                this.wm.registerWork(controllerEventMessage.getConsumerRecords());
            } else {
                WorkContainer<K, V> workContainer = controllerEventMessage.getWorkContainer();
                MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainer.toString());
                this.wm.handleFutureResult(workContainer);
                MDC.remove(MDC_WORK_CONTAINER_DESCRIPTOR);
            }
        }
    }

    private Duration getTimeToBlockFor() {
        if (!this.wm.isWorkInFlightMeetingTarget()) {
            Optional<Duration> lowestRetryTime = this.wm.getLowestRetryTime();
            if (lowestRetryTime.isPresent()) {
                Duration defaultMessageRetryDelay = this.options.getDefaultMessageRetryDelay();
                Duration duration = lowestRetryTime.get();
                Duration timeBetweenCommits = getTimeBetweenCommits();
                Duration duration2 = duration.toMillis() < defaultMessageRetryDelay.toMillis() ? defaultMessageRetryDelay : duration;
                Duration duration3 = timeBetweenCommits.toMillis() < duration2.toMillis() ? timeBetweenCommits : duration2;
                log.debug("Not enough work in flight, while work is waiting to be retried - so will only sleep until next retry time of {} (lowestScheduled = {})", duration3, duration);
                return duration3;
            }
        }
        Duration timeToNextCommitCheck = getTimeToNextCommitCheck();
        log.debug("Calculated next commit time in {}", timeToNextCommitCheck);
        return timeToNextCommitCheck;
    }

    private boolean isIdlingOrRunning() {
        return this.state == State.RUNNING || this.state == State.DRAINING || this.state == State.PAUSED;
    }

    protected boolean isTimeToCommitNow() {
        updateLastCommitCheckTime();
        boolean z = (this.lastCommitTime == null ? Duration.ofDays(1L) : Duration.between(this.lastCommitTime, Instant.now())).compareTo(getTimeBetweenCommits()) > 0;
        boolean isCommandedToCommit = isCommandedToCommit();
        boolean z2 = z || isCommandedToCommit;
        if (log.isDebugEnabled()) {
            log.debug("Should commit this cycle? shouldCommitNow? " + z2 + " : commitFrequencyOK? " + z + ", isCommandedToCommit? " + isCommandedToCommit);
        }
        return z2;
    }

    private int getNumberOfUserFunctionsQueued() {
        return this.workerThreadPool.get().getQueue().size();
    }

    private Duration getTimeToNextCommitCheck() {
        if (isIdlingOrRunning()) {
            return getTimeBetweenCommits().minus(getTimeSinceLastCheck());
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", State.RUNNING, this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCheck() {
        return Duration.between(this.lastCommitCheckTime, this.clock.instant());
    }

    protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException {
        log.trace("Synchronizing on commitCommand...");
        synchronized (this.commitCommand) {
            log.debug("Committing offsets that are ready...");
            this.committer.retrieveOffsetsAndCommit();
            clearCommitCommand();
            this.lastCommitTime = Instant.now();
        }
    }

    private void updateLastCommitCheckTime() {
        this.lastCommitCheckTime = Instant.now();
    }

    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> function, java.util.function.Consumer<R> consumer, List<WorkContainer<K, V>> list) {
        if (log.isDebugEnabled()) {
            MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, list.get(0).offset() + "");
        }
        log.trace("Pool received: {}", list);
        Stream<WorkContainer<K, V>> stream = list.stream();
        WorkManager<K, V> workManager = this.wm;
        Objects.requireNonNull(workManager);
        Map map = (Map) stream.collect(Collectors.groupingBy(workManager::checkIfWorkIsStale));
        List<WorkContainer<K, V>> list2 = (List) map.getOrDefault(Boolean.TRUE, new ArrayList());
        List<WorkContainer<K, V>> list3 = (List) map.getOrDefault(Boolean.FALSE, new ArrayList());
        handleStaleWork(list2);
        PollContextInternal<K, V> pollContextInternal = new PollContextInternal<>(list3);
        try {
            try {
                if (list3.isEmpty()) {
                    List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> emptyList = Collections.emptyList();
                    cleanUpContext(pollContextInternal);
                    return emptyList;
                }
                ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunctionInternal = runUserFunctionInternal(function, pollContextInternal, consumer, list3);
                cleanUpContext(pollContextInternal);
                return runUserFunctionInternal;
            } catch (Exception e) {
                Throwable cause = e.getCause();
                String msg = StringUtils.msg("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", pollContextInternal, e);
                if (cause instanceof PCRetriableException) {
                    log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
                } else {
                    log.error(msg, e);
                }
                for (WorkContainer<K, V> workContainer : list) {
                    workContainer.onUserFunctionFailure(e);
                    addToMailbox(pollContextInternal, workContainer);
                }
                throw e;
            }
        } catch (Throwable th) {
            cleanUpContext(pollContextInternal);
            throw th;
        }
    }

    protected void handleStaleWork(List<WorkContainer<K, V>> list) {
        PollContextInternal<K, V> pollContextInternal = new PollContextInternal<>(list);
        try {
            if (!list.isEmpty()) {
                log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", list);
                list.forEach(workContainer -> {
                    addToMailbox(pollContextInternal, workContainer);
                });
            }
        } finally {
            cleanUpContext(pollContextInternal);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <R> ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunctionInternal(Function<PollContextInternal<K, V>, List<R>> function, PollContextInternal<K, V> pollContextInternal, java.util.function.Consumer<R> consumer, List<WorkContainer<K, V>> list) {
        List<?> list2 = (List) this.userProcessingTimer.record(() -> {
            return (List) function.apply(pollContextInternal);
        });
        Iterator<WorkContainer<K, V>> it = list.iterator();
        while (it.hasNext()) {
            onUserFunctionSuccess(it.next(), list2);
        }
        ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> arrayList = new ArrayList<>();
        for (Object obj : list2) {
            log.trace("Running users call back...");
            consumer.accept(obj);
        }
        Iterator<WorkContainer<K, V>> it2 = list.iterator();
        while (it2.hasNext()) {
            addToMailBoxOnUserFunctionSuccess(pollContextInternal, it2.next(), list2);
        }
        log.trace("User function future registered");
        return arrayList;
    }

    private void cleanUpContext(PollContextInternal<K, V> pollContextInternal) {
        pollContextInternal.getProducingLock().ifPresent((v0) -> {
            v0.unlock();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addToMailBoxOnUserFunctionSuccess(PollContextInternal<K, V> pollContextInternal, WorkContainer<K, V> workContainer, List<?> list) {
        addToMailbox(pollContextInternal, workContainer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        log.trace("User function success");
        workContainer.onUserFunctionSuccess();
    }

    protected void addToMailbox(PollContextInternal<K, V> pollContextInternal, WorkContainer<K, V> workContainer) {
        log.trace("Adding {} {} to mailbox...", workContainer.isUserFunctionSucceeded() ? "succeeded" : "FAILED", workContainer);
        this.workMailBox.add(ControllerEventMessage.of(workContainer));
        workContainer.onPostAddToMailBox(pollContextInternal, this.producerManager);
    }

    public void registerWork(EpochAndRecordsMap<K, V> epochAndRecordsMap) {
        log.trace("Adding {} to mailbox...", epochAndRecordsMap);
        this.workMailBox.add(ControllerEventMessage.of(epochAndRecordsMap));
    }

    public void notifySomethingToDo() {
        if (!(!((Boolean) this.producerManager.map((v0) -> {
            return v0.isTransactionCommittingInProgress();
        }).orElse(false)).booleanValue()) || this.awaitingInflightProcessingCompletionOnShutdown.get()) {
            log.trace("Would have interrupted control thread, but TX in progress");
        } else {
            log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
            interruptControlThread();
        }
    }

    @Override // io.confluent.parallelconsumer.internal.DrainingCloseable
    public long workRemaining() {
        return this.wm.getNumberOfIncompleteOffsets();
    }

    public void addLoopEndCallBack(Runnable runnable) {
        this.controlLoopHooks.add(runnable);
    }

    public void setLongPollTimeout(Duration duration) {
        BrokerPollSystem.setLongPollTimeout(duration);
    }

    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        synchronized (this.commitCommand) {
            this.commitCommand.set(true);
        }
        notifySomethingToDo();
    }

    private boolean isTransactionCommittingInProgress() {
        return this.options.isUsingTransactionCommitMode() && ((Boolean) this.producerManager.map((v0) -> {
            return v0.isTransactionCommittingInProgress();
        }).orElse(false)).booleanValue();
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void pauseIfRunning() {
        if (this.state != State.RUNNING) {
            log.debug("Skipping transition of parallel consumer to state paused. Current state is {}.", this.state);
        } else {
            log.info("Transitioning parallel consumer to state paused.");
            this.state = State.PAUSED;
        }
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void resumeIfPaused() {
        if (this.state != State.PAUSED) {
            log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", this.state);
            return;
        }
        log.info("Transitioning parallel consumer to state running.");
        this.state = State.RUNNING;
        notifySomethingToDo();
    }

    private boolean isCommandedToCommit() {
        boolean z;
        synchronized (this.commitCommand) {
            z = this.commitCommand.get();
        }
        return z;
    }

    private void clearCommitCommand() {
        synchronized (this.commitCommand) {
            if (this.commitCommand.get()) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ParallelConsumerOptions<K, V> getOptions() {
        return this.options;
    }

    void setClock(Clock clock) {
        this.clock = clock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<ProducerManager<K, V>> getProducerManager() {
        return this.producerManager;
    }

    protected Supplier<ThreadPoolExecutor> getWorkerThreadPool() {
        return this.workerThreadPool;
    }

    public WorkManager<K, V> getWm() {
        return this.wm;
    }

    protected BlockingQueue<ControllerEventMessage<K, V>> getWorkMailBox() {
        return this.workMailBox;
    }

    public void setState(State state) {
        this.state = state;
    }

    public int getNumberOfAssignedPartitions() {
        return this.numberOfAssignedPartitions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PCModule<K, V> getModule() {
        return this.module;
    }

    public void setMyId(Optional<String> optional) {
        this.myId = optional;
    }

    public Optional<String> getMyId() {
        return this.myId;
    }
}
