/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.KafkaSchedulers;
import reactor.kafka.receiver.internals.SeekablePartition;
import reactor.kafka.sender.TransactionManager;

public class DefaultKafkaReceiver<K, V>
implements KafkaReceiver<K, V>,
ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger((String)DefaultKafkaReceiver.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets"));
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    private final List<Flux<? extends Event<?>>> fluxList = new ArrayList();
    private final List<Disposable> subscribeDisposables = new ArrayList<Disposable>();
    private final AtomicLong requestsPending = new AtomicLong();
    private final AtomicBoolean needsHeartbeat = new AtomicBoolean();
    private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();
    private final KafkaSchedulers.EventScheduler eventScheduler;
    private final AtomicBoolean isActive = new AtomicBoolean();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final AtomicBoolean awaitingTransaction = new AtomicBoolean();
    private AckMode ackMode;
    private AtmostOnceOffsets atmostOnceOffsets;
    private EmitterProcessor<Event<?>> eventEmitter;
    private FluxSink<Event<?>> eventSubmission;
    private EmitterProcessor<ConsumerRecords<K, V>> recordEmitter;
    private FluxSink<ConsumerRecords<K, V>> recordSubmission;
    private InitEvent initEvent;
    private PollEvent pollEvent;
    private CommitEvent commitEvent;
    private Flux<Event<?>> eventFlux;
    private Flux<ConsumerRecords<K, V>> consumerFlux;
    private Consumer<K, V> consumer;
    private Consumer<K, V> consumerProxy;
    Scheduler scheduler;

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions.toImmutable();
        this.eventScheduler = KafkaSchedulers.newEvent(receiverOptions.groupId());
    }

    @Override
    public Flux<ReceiverRecord<K, V>> receive() {
        this.ackMode = AckMode.MANUAL_ACK;
        Flux flux = this.createConsumerFlux().concatMap(Flux::fromIterable, Integer.MAX_VALUE);
        return this.withDoOnRequest(flux).map(r -> {
            TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
            CommittableOffset committableOffset = new CommittableOffset(topicPartition, r.offset());
            return new ReceiverRecord(r, committableOffset);
        });
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
        this.ackMode = AckMode.AUTO_ACK;
        Flux<ConsumerRecords<K, V>> flux = this.withDoOnRequest(this.createConsumerFlux());
        return flux.map(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords).doAfterTerminate(() -> {
            for (ConsumerRecord r : consumerRecords) {
                new CommittableOffset(r).acknowledge();
            }
        }));
    }

    @Override
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
        this.ackMode = AckMode.ATMOST_ONCE;
        this.atmostOnceOffsets = new AtmostOnceOffsets();
        return this.createConsumerFlux().concatMap(cr -> Flux.fromIterable((Iterable)cr).concatMap(r -> {
            long offset = r.offset();
            TopicPartition partition = new TopicPartition(r.topic(), r.partition());
            long committedOffset = this.atmostOnceOffsets.committedOffset(partition);
            this.atmostOnceOffsets.onDispatch(partition, offset);
            long commitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
            CommittableOffset committable = new CommittableOffset(partition, offset + commitAheadSize);
            if (offset >= committedOffset) {
                return committable.commit().then(Mono.just((Object)r)).publishOn(this.scheduler);
            }
            if (committedOffset - offset >= commitAheadSize / 2L) {
                committable.commit().subscribe();
            }
            return Mono.just((Object)r);
        }, Integer.MAX_VALUE), Integer.MAX_VALUE).transform(this::withDoOnRequest);
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        this.ackMode = AckMode.EXACTLY_ONCE;
        Flux<ConsumerRecords<K, V>> flux = this.withDoOnRequest(this.createConsumerFlux());
        return flux.map(consumerRecords -> transactionManager.begin().then(Mono.fromCallable(() -> this.awaitingTransaction.getAndSet(true))).thenMany(this.transactionalRecords(transactionManager, (ConsumerRecords<K, V>)consumerRecords))).publishOn(transactionManager.scheduler());
    }

    private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> records) {
        if (records.isEmpty()) {
            return Flux.empty();
        }
        CommittableBatch offsetBatch = new CommittableBatch();
        for (ConsumerRecord r : records) {
            offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
        }
        return Flux.fromIterable(records).concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), this.receiverOptions.groupId())).doAfterTerminate(() -> this.awaitingTransaction.set(false));
    }

    @Override
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create((T monoSink) -> {
            CustomEvent event = new CustomEvent(function, monoSink);
            this.emit(event);
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsAssigned {}", partitions);
        if (!partitions.isEmpty()) {
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onAssign : this.receiverOptions.assignListeners()) {
                onAssign.accept(this.toSeekable(partitions));
            }
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsRevoked {}", partitions);
        if (!partitions.isEmpty()) {
            if (this.ackMode != AckMode.ATMOST_ONCE) {
                this.commitEvent.runIfRequired(true);
            }
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onRevoke : this.receiverOptions.revokeListeners()) {
                onRevoke.accept(this.toSeekable(partitions));
            }
        }
    }

    private synchronized Flux<ConsumerRecords<K, V>> createConsumerFlux() {
        if (this.consumerFlux != null) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign = flux -> this.receiverOptions.subscriber(this).accept(this.consumer);
        this.initEvent = new InitEvent(kafkaSubscribeOrAssign);
        this.pollEvent = new PollEvent();
        this.commitEvent = new CommitEvent();
        this.recordEmitter = EmitterProcessor.create();
        this.recordSubmission = this.recordEmitter.sink();
        this.scheduler = KafkaSchedulers.fromWorker(Schedulers.parallel().createWorker());
        this.consumerFlux = this.recordEmitter.publishOn(this.scheduler).doOnSubscribe(s -> {
            try {
                this.start();
            }
            catch (Exception e) {
                log.error("Subscription to event flux failed", (Throwable)e);
                throw e;
            }
        }).doOnRequest(r -> {
            if (this.requestsPending.get() > 0L) {
                this.pollEvent.scheduleIfRequired();
            }
        }).doAfterTerminate(this::dispose).doOnCancel(this::dispose);
        return this.consumerFlux;
    }

    private <T> Flux<T> withDoOnRequest(Flux<T> consumerFlux) {
        return consumerFlux.doOnRequest(toAdd -> {
            long u;
            long r;
            do {
                if ((r = this.requestsPending.get()) != Long.MAX_VALUE) continue;
                this.pollEvent.scheduleIfRequired();
                return;
            } while (!this.requestsPending.compareAndSet(r, u = Operators.addCap((long)r, (long)toAdd)));
            if (u > 0L) {
                this.pollEvent.scheduleIfRequired();
            }
        });
    }

    Consumer<K, V> kafkaConsumer() {
        return this.consumer;
    }

    CommittableBatch committableBatch() {
        return this.commitEvent.commitBatch;
    }

    void close() {
        this.dispose(true);
    }

    private Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> partitions) {
        ArrayList<ReceiverPartition> seekableList = new ArrayList<ReceiverPartition>(partitions.size());
        for (TopicPartition partition : partitions) {
            seekableList.add(new SeekablePartition(this.consumer, partition));
        }
        return seekableList;
    }

    private void start() {
        log.debug("start");
        if (!this.isActive.compareAndSet(false, true)) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        this.fluxList.clear();
        this.requestsPending.set(0L);
        this.consecutiveCommitFailures.set(0);
        this.awaitingTransaction.set(false);
        this.eventEmitter = EmitterProcessor.create();
        this.eventSubmission = this.eventEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
        this.eventScheduler.start();
        Flux initFlux = Flux.just((Object)this.initEvent);
        this.fluxList.add((Flux<Event<?>>)this.eventEmitter);
        this.fluxList.add(initFlux);
        Duration commitInterval = this.receiverOptions.commitInterval();
        if (!(this.ackMode != AckMode.AUTO_ACK && this.ackMode != AckMode.MANUAL_ACK || commitInterval.isZero())) {
            Flux periodicCommitFlux = Flux.interval((Duration)this.receiverOptions.commitInterval()).onBackpressureLatest().map(i -> this.commitEvent.periodicEvent());
            this.fluxList.add(periodicCommitFlux);
        }
        this.eventFlux = Flux.merge(this.fluxList).publishOn((Scheduler)this.eventScheduler);
        this.subscribeDisposables.add(this.eventFlux.subscribe(event -> this.doEvent((Event<?>)event)));
    }

    private void fail(Throwable e) {
        log.error("Consumer flux exception", e);
        this.recordSubmission.error(e);
    }

    private void dispose() {
        boolean isEventsThread = this.eventScheduler.isCurrentThreadFromScheduler();
        boolean isEventsEmitterAvailable = !this.eventSubmission.isCancelled() && !this.eventEmitter.isTerminated() && !this.eventEmitter.isCancelled();
        this.dispose(!isEventsThread && isEventsEmitterAvailable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void dispose(boolean async) {
        Object closeEvent;
        log.debug("dispose {}", (Object)this.isActive);
        if (!this.isActive.compareAndSet(true, false)) return;
        boolean isConsumerClosed = this.consumer == null;
        try {
            if (!isConsumerClosed) {
                this.consumer.wakeup();
                closeEvent = new CloseEvent(this.receiverOptions.closeTimeout());
                if (async) {
                    this.emit((Event<?>)closeEvent);
                    isConsumerClosed = ((CloseEvent)closeEvent).await();
                } else {
                    ((CloseEvent)closeEvent).run();
                }
            }
            this.fluxList.clear();
            this.eventScheduler.dispose();
        }
        catch (Exception e) {
            log.warn("Cancel exception: " + e);
            return;
        }
        this.scheduler.dispose();
        try {
            closeEvent = this.subscribeDisposables.iterator();
            while (closeEvent.hasNext()) {
                Disposable disposable = (Disposable)closeEvent.next();
                disposable.dispose();
            }
            return;
        }
        finally {
            this.fluxList.clear();
            this.eventScheduler.dispose();
            this.scheduler.dispose();
            try {
                for (Disposable disposable : this.subscribeDisposables) {
                    disposable.dispose();
                }
            }
            finally {
                int maxRetries = 10;
                for (int i = 0; i < maxRetries && !isConsumerClosed; ++i) {
                    try {
                        if (this.consumer != null) {
                            this.consumer.close();
                        }
                        isConsumerClosed = true;
                        continue;
                    }
                    catch (Exception e) {
                        if (i != maxRetries - 1) continue;
                        log.warn("Consumer could not be closed", (Throwable)e);
                    }
                }
                this.consumerFlux = null;
                this.consumerProxy = null;
                this.atmostOnceOffsets = null;
                this.isClosed.set(true);
            }
        }
    }

    private void doEvent(Event<?> event) {
        log.trace("doEvent {}", (Object)event.eventType);
        try {
            event.run();
        }
        catch (Exception e) {
            this.fail(e);
        }
    }

    private void emit(Event<?> event) {
        this.eventSubmission.next(event);
    }

    private static class AtmostOnceOffsets {
        private final Map<TopicPartition, Long> committedOffsets = new ConcurrentHashMap<TopicPartition, Long>();
        private final Map<TopicPartition, Long> dispatchedOffsets = new ConcurrentHashMap<TopicPartition, Long>();

        AtmostOnceOffsets() {
        }

        void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                this.committedOffsets.put(entry.getKey(), entry.getValue().offset());
            }
        }

        void onDispatch(TopicPartition topicPartition, long offset) {
            this.dispatchedOffsets.put(topicPartition, offset);
        }

        long committedOffset(TopicPartition topicPartition) {
            Long offset = this.committedOffsets.get(topicPartition);
            return offset == null ? -1L : offset;
        }

        boolean undoCommitAhead(CommittableBatch committableBatch) {
            boolean undoRequired = false;
            for (Map.Entry<TopicPartition, Long> entry : this.committedOffsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                long offsetToCommit = this.dispatchedOffsets.get(entry.getKey()) + 1L;
                if (entry.getValue() <= offsetToCommit) continue;
                committableBatch.updateOffset(topicPartition, offsetToCommit);
                undoRequired = true;
            }
            return undoRequired;
        }
    }

    class CommittableOffset
    implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final AtomicBoolean acknowledged;

        public CommittableOffset(ConsumerRecord<K, V> record) {
            this(new TopicPartition(record.topic(), record.partition()), record.offset());
        }

        public CommittableOffset(TopicPartition topicPartition, long nextOffset) {
            this.topicPartition = topicPartition;
            this.commitOffset = nextOffset;
            this.acknowledged = new AtomicBoolean(false);
        }

        @Override
        public Mono<Void> commit() {
            if (this.maybeUpdateOffset() > 0) {
                return this.scheduleCommit();
            }
            return Mono.empty();
        }

        @Override
        public void acknowledge() {
            int commitBatchSize = DefaultKafkaReceiver.this.receiverOptions.commitBatchSize();
            long uncommittedCount = this.maybeUpdateOffset();
            if (commitBatchSize > 0 && uncommittedCount >= (long)commitBatchSize) {
                this.scheduleIfRequired();
            }
        }

        @Override
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            if (this.acknowledged.compareAndSet(false, true)) {
                return DefaultKafkaReceiver.this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset);
            }
            return DefaultKafkaReceiver.this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(emitter -> {
                DefaultKafkaReceiver.this.commitEvent.commitBatch.addCallbackEmitter((MonoSink<Void>)emitter);
                this.scheduleIfRequired();
            });
        }

        private void scheduleIfRequired() {
            DefaultKafkaReceiver.this.commitEvent.scheduleIfRequired();
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }

    private class CloseEvent
    extends Event<ConsumerRecords<K, V>> {
        private final long closeEndTimeNanos;
        private Semaphore semaphore;

        CloseEvent(Duration timeout) {
            super(EventType.CLOSE);
            this.semaphore = new Semaphore(0);
            this.closeEndTimeNanos = System.nanoTime() + timeout.toNanos();
        }

        @Override
        public void run() {
            try {
                if (DefaultKafkaReceiver.this.consumer != null) {
                    Collection<TopicPartition> manualAssignment = DefaultKafkaReceiver.this.receiverOptions.assignment();
                    if (manualAssignment != null && !manualAssignment.isEmpty()) {
                        DefaultKafkaReceiver.this.onPartitionsRevoked(manualAssignment);
                    }
                    int attempts = 3;
                    for (int i = 0; i < attempts; ++i) {
                        try {
                            long timeoutNanos;
                            boolean forceCommit = true;
                            if (DefaultKafkaReceiver.this.ackMode == AckMode.ATMOST_ONCE) {
                                forceCommit = DefaultKafkaReceiver.this.atmostOnceOffsets.undoCommitAhead(DefaultKafkaReceiver.this.committableBatch());
                            }
                            if (DefaultKafkaReceiver.this.ackMode != AckMode.EXACTLY_ONCE) {
                                DefaultKafkaReceiver.this.commitEvent.runIfRequired(forceCommit);
                                DefaultKafkaReceiver.this.commitEvent.waitFor(this.closeEndTimeNanos);
                            }
                            if ((timeoutNanos = this.closeEndTimeNanos - System.nanoTime()) < 0L) {
                                timeoutNanos = 0L;
                            }
                            DefaultKafkaReceiver.this.consumer.close(timeoutNanos, TimeUnit.NANOSECONDS);
                            break;
                        }
                        catch (WakeupException e) {
                            if (i != attempts - 1) continue;
                            throw e;
                        }
                    }
                }
                this.semaphore.release();
            }
            catch (Exception e) {
                log.error("Unexpected exception during close", (Throwable)e);
                DefaultKafkaReceiver.this.fail(e);
            }
        }

        boolean await(long timeoutNanos) throws InterruptedException {
            return this.semaphore.tryAcquire(timeoutNanos, TimeUnit.NANOSECONDS);
        }

        boolean await() {
            long remainingNanos;
            boolean closed = false;
            while (!closed && (remainingNanos = this.closeEndTimeNanos - System.nanoTime()) > 0L) {
                try {
                    closed = this.await(remainingNanos);
                }
                catch (InterruptedException interruptedException) {}
            }
            return closed;
        }
    }

    private class CustomEvent<T>
    extends Event<Void> {
        private final Function<Consumer<K, V>, ? extends T> function;
        private MonoSink<T> monoSink;

        CustomEvent(Function<Consumer<K, V>, ? extends T> function, MonoSink<T> monoSink) {
            super(EventType.CUSTOM);
            this.function = function;
            this.monoSink = monoSink;
        }

        @Override
        public void run() {
            if (DefaultKafkaReceiver.this.isActive.get()) {
                try {
                    T ret = this.function.apply(this.consumerProxy());
                    this.monoSink.success(ret);
                }
                catch (Throwable e) {
                    this.monoSink.error(e);
                }
            }
        }

        private Consumer<K, V> consumerProxy() {
            if (DefaultKafkaReceiver.this.consumerProxy == null) {
                Class[] interfaces = new Class[]{Consumer.class};
                InvocationHandler handler = (proxy, method, args) -> {
                    if (DELEGATE_METHODS.contains(method.getName())) {
                        try {
                            return method.invoke((Object)DefaultKafkaReceiver.this.consumer, args);
                        }
                        catch (InvocationTargetException e) {
                            throw e.getCause();
                        }
                    }
                    throw new UnsupportedOperationException("Method is not supported: " + method);
                };
                DefaultKafkaReceiver.this.consumerProxy = (Consumer)Proxy.newProxyInstance(Consumer.class.getClassLoader(), interfaces, handler);
            }
            return DefaultKafkaReceiver.this.consumerProxy;
        }
    }

    class CommitEvent
    extends Event<Map<TopicPartition, OffsetAndMetadata>> {
        private final CommittableBatch commitBatch;
        private final AtomicBoolean isPending;
        private final AtomicInteger inProgress;

        CommitEvent() {
            super(EventType.COMMIT);
            this.isPending = new AtomicBoolean();
            this.inProgress = new AtomicInteger();
            this.commitBatch = new CommittableBatch();
        }

        @Override
        public void run() {
            block11: {
                if (!this.isPending.compareAndSet(true, false)) {
                    return;
                }
                CommittableBatch.CommitArgs commitArgs = this.commitBatch.getAndClearOffsets();
                try {
                    if (commitArgs == null) break block11;
                    if (!commitArgs.offsets().isEmpty()) {
                        this.inProgress.incrementAndGet();
                        switch (DefaultKafkaReceiver.this.ackMode) {
                            case ATMOST_ONCE: {
                                try {
                                    DefaultKafkaReceiver.this.consumer.commitSync(commitArgs.offsets());
                                    this.handleSuccess(commitArgs, commitArgs.offsets());
                                    DefaultKafkaReceiver.this.atmostOnceOffsets.onCommit(commitArgs.offsets());
                                }
                                catch (Exception e) {
                                    this.handleFailure(commitArgs, e);
                                }
                                this.inProgress.decrementAndGet();
                                break;
                            }
                            case EXACTLY_ONCE: {
                                break;
                            }
                            default: {
                                DefaultKafkaReceiver.this.consumer.commitAsync(commitArgs.offsets(), (offsets, exception) -> {
                                    this.inProgress.decrementAndGet();
                                    if (exception == null) {
                                        this.handleSuccess(commitArgs, offsets);
                                    } else {
                                        this.handleFailure(commitArgs, exception);
                                    }
                                });
                                DefaultKafkaReceiver.this.pollEvent.scheduleIfRequired();
                            }
                        }
                        if (DefaultKafkaReceiver.this.ackMode != AckMode.ATMOST_ONCE) {
                            // empty if block
                        }
                        break block11;
                    }
                    this.handleSuccess(commitArgs, commitArgs.offsets());
                }
                catch (Exception e) {
                    log.error("Unexpected exception", (Throwable)e);
                    this.inProgress.decrementAndGet();
                    this.handleFailure(commitArgs, e);
                }
            }
        }

        void runIfRequired(boolean force) {
            if (force) {
                this.isPending.set(true);
            }
            if (this.isPending.get()) {
                this.run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (!offsets.isEmpty()) {
                DefaultKafkaReceiver.this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                for (MonoSink<Void> emitter : commitArgs.callbackEmitters()) {
                    emitter.success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exception) {
            boolean mayRetry;
            log.warn("Commit failed", (Throwable)exception);
            boolean bl = mayRetry = this.isRetriableException(exception) && !DefaultKafkaReceiver.this.isClosed.get() && DefaultKafkaReceiver.this.consecutiveCommitFailures.incrementAndGet() < DefaultKafkaReceiver.this.receiverOptions.maxCommitAttempts();
            if (!mayRetry) {
                List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
                if (callbackEmitters != null && !callbackEmitters.isEmpty()) {
                    this.isPending.set(false);
                    this.commitBatch.restoreOffsets(commitArgs, false);
                    for (MonoSink<Void> emitter : callbackEmitters) {
                        emitter.error((Throwable)exception);
                    }
                } else {
                    DefaultKafkaReceiver.this.fail(exception);
                }
            } else {
                this.commitBatch.restoreOffsets(commitArgs, true);
                log.warn("Commit failed with exception" + exception + ", retries remaining " + (DefaultKafkaReceiver.this.receiverOptions.maxCommitAttempts() - DefaultKafkaReceiver.this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                DefaultKafkaReceiver.this.pollEvent.scheduleIfRequired();
            }
        }

        private CommitEvent periodicEvent() {
            this.isPending.set(true);
            return this;
        }

        private void scheduleIfRequired() {
            if (DefaultKafkaReceiver.this.isActive.get() && this.isPending.compareAndSet(false, true)) {
                DefaultKafkaReceiver.this.emit(this);
            }
        }

        private void waitFor(long endTimeNanos) {
            while (this.inProgress.get() > 0 && endTimeNanos - System.nanoTime() > 0L) {
                DefaultKafkaReceiver.this.consumer.poll(1L);
            }
        }

        protected boolean isRetriableException(Exception exception) {
            return exception instanceof RetriableCommitFailedException;
        }
    }

    private class PollEvent
    extends Event<ConsumerRecords<K, V>> {
        private AtomicInteger pendingCount;
        private final long pollTimeoutMs;
        private final AtomicBoolean partitionsPaused;

        PollEvent() {
            super(EventType.POLL);
            this.pendingCount = new AtomicInteger();
            this.partitionsPaused = new AtomicBoolean();
            this.pollTimeoutMs = DefaultKafkaReceiver.this.receiverOptions.pollTimeout().toMillis();
        }

        @Override
        public void run() {
            block10: {
                DefaultKafkaReceiver.this.needsHeartbeat.set(false);
                try {
                    if (DefaultKafkaReceiver.this.isActive.get()) {
                        ConsumerRecords records;
                        DefaultKafkaReceiver.this.commitEvent.runIfRequired(false);
                        this.pendingCount.decrementAndGet();
                        if (DefaultKafkaReceiver.this.requestsPending.get() > 0L && !DefaultKafkaReceiver.this.awaitingTransaction.get()) {
                            if (this.partitionsPaused.getAndSet(false)) {
                                DefaultKafkaReceiver.this.consumer.resume((Collection)DefaultKafkaReceiver.this.consumer.assignment());
                            }
                        } else if (!this.partitionsPaused.getAndSet(true)) {
                            DefaultKafkaReceiver.this.consumer.pause((Collection)DefaultKafkaReceiver.this.consumer.assignment());
                        }
                        if ((records = DefaultKafkaReceiver.this.consumer.poll(this.pollTimeoutMs)).count() > 0) {
                            DefaultKafkaReceiver.this.recordSubmission.next((Object)records);
                        }
                        if (DefaultKafkaReceiver.this.isActive.get()) {
                            int count;
                            int n = count = (DefaultKafkaReceiver.this.ackMode == AckMode.AUTO_ACK || DefaultKafkaReceiver.this.ackMode == AckMode.EXACTLY_ONCE) && records.count() > 0 ? 1 : records.count();
                            if (DefaultKafkaReceiver.this.requestsPending.get() == Long.MAX_VALUE || DefaultKafkaReceiver.this.requestsPending.addAndGet(0 - count) > 0L || DefaultKafkaReceiver.this.commitEvent.inProgress.get() > 0) {
                                this.scheduleIfRequired();
                            }
                        }
                    }
                }
                catch (Exception e) {
                    if (!DefaultKafkaReceiver.this.isActive.get()) break block10;
                    log.error("Unexpected exception", (Throwable)e);
                    DefaultKafkaReceiver.this.fail(e);
                }
            }
        }

        void scheduleIfRequired() {
            if (this.pendingCount.get() <= 0) {
                DefaultKafkaReceiver.this.emit(this);
                this.pendingCount.incrementAndGet();
            }
        }
    }

    private class InitEvent
    extends Event<ConsumerRecords<K, V>> {
        private final java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign;

        InitEvent(java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign) {
            super(EventType.INIT);
            this.kafkaSubscribeOrAssign = kafkaSubscribeOrAssign;
        }

        @Override
        public void run() {
            block2: {
                try {
                    DefaultKafkaReceiver.this.isActive.set(true);
                    DefaultKafkaReceiver.this.isClosed.set(false);
                    DefaultKafkaReceiver.this.consumer = DefaultKafkaReceiver.this.consumerFactory.createConsumer(DefaultKafkaReceiver.this.receiverOptions);
                    this.kafkaSubscribeOrAssign.accept(DefaultKafkaReceiver.this.consumerFlux);
                }
                catch (Exception e) {
                    if (!DefaultKafkaReceiver.this.isActive.get()) break block2;
                    log.error("Unexpected exception", (Throwable)e);
                    DefaultKafkaReceiver.this.fail(e);
                }
            }
        }
    }

    abstract class Event<R>
    implements Runnable {
        protected EventType eventType;

        Event(EventType eventType) {
            this.eventType = eventType;
        }

        public EventType eventType() {
            return this.eventType;
        }
    }

    static enum AckMode {
        AUTO_ACK,
        MANUAL_ACK,
        ATMOST_ONCE,
        EXACTLY_ONCE;

    }

    static enum EventType {
        INIT,
        POLL,
        COMMIT,
        CUSTOM,
        CLOSE;

    }
}

