package akka.kafka.internal;

import akka.Done;
import akka.Done$;
import akka.NotUsed;
import akka.NotUsed$;
import akka.kafka.internal.ConsumerStage;
import akka.kafka.scaladsl.Consumer;
import akka.stream.stage.AsyncCallback;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* JADX INFO: Add missing generic type declarations: [V, K] */
/* compiled from: ConsumerStage.scala */
/* loaded from: input_file:akka/kafka/internal/CommittableConsumerStage$$anon$1.class */
public final class CommittableConsumerStage$$anon$1<K, V> extends ConsumerStageLogic<K, V, Consumer.CommittableMessage<K, V>> implements ConsumerStage.Committer {
    private final Some<FiniteDuration> pollCommitTimeout;
    private long akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation;
    private boolean stopping;
    private final AsyncCallback<NotUsed> akka$kafka$internal$CommittableConsumerStage$$anon$$decrementConfirmation;
    private final AsyncCallback<Tuple2<ConsumerStage.CommittableOffsetBatchImpl, Promise<Done>>> commitBatchCallback;
    private final AsyncCallback<Tuple2<Consumer.PartitionOffset, Promise<Done>>> commitSingleCallback;

    /* JADX WARN: Incorrect inner types in field signature: Lakka/kafka/internal/CommittableConsumerStage<TK;TV;>.$anon$1$StopTimeout$; */
    private volatile CommittableConsumerStage$$anon$1$StopTimeout$ akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module;
    private final /* synthetic */ CommittableConsumerStage $outer;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private CommittableConsumerStage$$anon$1$StopTimeout$ akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module == null) {
                this.akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module = new CommittableConsumerStage$$anon$1$StopTimeout$(this);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module;
        }
    }

    /* JADX WARN: Incorrect inner types in method signature: ()Lakka/kafka/internal/CommittableConsumerStage<TK;TV;>.$anon$1$StopTimeout$; */
    public CommittableConsumerStage$$anon$1$StopTimeout$ akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout() {
        return this.akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module == null ? akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$lzycompute() : this.akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout$module;
    }

    private Some<FiniteDuration> pollCommitTimeout() {
        return this.pollCommitTimeout;
    }

    public long akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation() {
        return this.akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation;
    }

    public void akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation_$eq(long j) {
        this.akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation = j;
    }

    private boolean stopping() {
        return this.stopping;
    }

    private void stopping_$eq(boolean z) {
        this.stopping = z;
    }

    public AsyncCallback<NotUsed> akka$kafka$internal$CommittableConsumerStage$$anon$$decrementConfirmation() {
        return this.akka$kafka$internal$CommittableConsumerStage$$anon$$decrementConfirmation;
    }

    private AsyncCallback<Tuple2<ConsumerStage.CommittableOffsetBatchImpl, Promise<Done>>> commitBatchCallback() {
        return this.commitBatchCallback;
    }

    private AsyncCallback<Tuple2<Consumer.PartitionOffset, Promise<Done>>> commitSingleCallback() {
        return this.commitSingleCallback;
    }

    public void preStart() {
        setKeepGoing(true);
    }

    @Override // akka.kafka.internal.ConsumerStageLogic
    public Option<FiniteDuration> pollTimeout() {
        return super.pollTimeout().orElse(new CommittableConsumerStage$$anon$1$$anonfun$pollTimeout$1(this));
    }

    @Override // akka.kafka.internal.ConsumerStageLogic
    public void poll() {
        super.poll();
        if (stopping() && !isClosed(this.$outer.out())) {
            if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stop producing messages, commits in progress ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation())})));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            complete(this.$outer.out());
        }
        if (stopping() && akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation() == 0) {
            completeStage();
        }
    }

    @Override // akka.kafka.internal.ConsumerStageLogic
    public void pushMsg(ConsumerRecord<K, V> consumerRecord) {
        Consumer.CommittableMessage committableMessage = new Consumer.CommittableMessage(consumerRecord.key(), consumerRecord.value(), new ConsumerStage.CommittableOffsetImpl(new Consumer.PartitionOffset(new Consumer.ClientTopicPartition(this.$outer.akka$kafka$internal$CommittableConsumerStage$$clientId(), consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()), this));
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace("Push element {}", new Object[]{committableMessage});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        push(this.$outer.out(), committableMessage);
    }

    @Override // akka.kafka.internal.ConsumerStage.Committer
    public Future<Done> commit(Consumer.PartitionOffset partitionOffset) {
        if (stoppedPromise().isCompleted()) {
            return Future$.MODULE$.failed(new IllegalStateException("ConsumerStage is stopped, offset commit not performed"));
        }
        Promise<Done> apply = Promise$.MODULE$.apply();
        scheduleCommitTimeout(apply);
        commitSingleCallback().invoke(new Tuple2(partitionOffset, apply));
        return apply.future();
    }

    @Override // akka.kafka.internal.ConsumerStage.Committer
    public Future<Done> commit(ConsumerStage.CommittableOffsetBatchImpl committableOffsetBatchImpl) {
        if (stoppedPromise().isCompleted()) {
            return Future$.MODULE$.failed(new IllegalStateException("ConsumerStage is stopped, offset commit not performed"));
        }
        Promise<Done> apply = Promise$.MODULE$.apply();
        scheduleCommitTimeout(apply);
        commitBatchCallback().invoke(new Tuple2(committableOffsetBatchImpl, apply));
        return apply.future();
    }

    private void scheduleCommitTimeout(final Promise<Done> promise) {
        try {
            promise.future().onComplete(new CommittableConsumerStage$$anon$1$$anonfun$scheduleCommitTimeout$1(this, materializer().scheduleOnce(this.$outer.akka$kafka$internal$CommittableConsumerStage$$settings.commitTimeout(), new Runnable(this, promise) { // from class: akka.kafka.internal.CommittableConsumerStage$$anon$1$$anon$3
                private final /* synthetic */ CommittableConsumerStage$$anon$1 $outer;
                private final Promise done$1;

                @Override // java.lang.Runnable
                public void run() {
                    this.done$1.tryFailure(new TimeoutException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Offset commit timed out after ", " ms"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(this.$outer.akka$kafka$internal$CommittableConsumerStage$$anon$$$outer().akka$kafka$internal$CommittableConsumerStage$$settings.commitTimeout().toMillis())}))));
                }

                /* JADX WARN: Incorrect inner types in method signature: (Lakka/kafka/internal/CommittableConsumerStage<TK;TV;>.$anon$1;)V */
                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                    this.done$1 = promise;
                }
            })), materializer().executionContext());
        } catch (Throwable th) {
            if (NonFatal$.MODULE$.unapply(th).isEmpty()) {
                throw th;
            }
            promise.tryFailure(new IllegalStateException("ConsumerStage is not started, offset commit not performed"));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public void akka$kafka$internal$CommittableConsumerStage$$anon$$commitSingleInternal(Consumer.PartitionOffset partitionOffset, Promise<Done> promise) {
        commitAsync(Collections.singletonMap(new TopicPartition(partitionOffset.key().topic(), partitionOffset.key().partition()), new OffsetAndMetadata(partitionOffset.offset() + 1)), promise);
    }

    public void akka$kafka$internal$CommittableConsumerStage$$anon$$commitBatchInternal(ConsumerStage.CommittableOffsetBatchImpl committableOffsetBatchImpl, Promise<Done> promise) {
        Tuple2 partition = committableOffsetBatchImpl.offsets().partition(new CommittableConsumerStage$$anon$1$$anonfun$7(this));
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple2 = new Tuple2((Map) partition._1(), (Map) partition._2());
        Map map = (Map) tuple2._1();
        Map map2 = (Map) tuple2._2();
        Map map3 = (Map) map.map(new CommittableConsumerStage$$anon$1$$anonfun$8(this), Map$.MODULE$.canBuildFrom());
        if (map2.isEmpty()) {
            commitAsync((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map3).asJava(), promise);
            return;
        }
        Promise<Done> apply = Promise$.MODULE$.apply();
        commitAsync((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map3).asJava(), apply);
        ConsumerStage.CommittableOffsetBatchImpl committableOffsetBatchImpl2 = new ConsumerStage.CommittableOffsetBatchImpl(map2, committableOffsetBatchImpl.stages().$minus(this.$outer.akka$kafka$internal$CommittableConsumerStage$$clientId()));
        Future<Done> commit = ((ConsumerStage.Committer) ((Tuple2) committableOffsetBatchImpl2.stages().head())._2()).commit(committableOffsetBatchImpl2);
        promise.completeWith(apply.future().flatMap(new CommittableConsumerStage$$anon$1$$anonfun$akka$kafka$internal$CommittableConsumerStage$$anon$$commitBatchInternal$1(this, commit), materializer().executionContext()));
    }

    private void commitAsync(java.util.Map<TopicPartition, OffsetAndMetadata> map, final Promise<Done> promise) {
        akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation_$eq(akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation() + 1);
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Start commit {}. Commits in progress {}"})).s(Nil$.MODULE$), (Object[]) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Object[]{map, BoxesRunTime.boxToLong(akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation()).toString()})).toArray(ClassTag$.MODULE$.Object()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        consumer().commitAsync(map, new OffsetCommitCallback(this, promise) { // from class: akka.kafka.internal.CommittableConsumerStage$$anon$1$$anon$4
            private final /* synthetic */ CommittableConsumerStage$$anon$1 $outer;
            private final Promise done$2;

            public void onComplete(java.util.Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (exc == null) {
                    this.done$2.success(Done$.MODULE$);
                } else {
                    this.done$2.failure(exc);
                }
                if (this.$outer.logger().underlying().isTraceEnabled()) {
                    this.$outer.logger().underlying().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Commit completed {} - {}"})).s(Nil$.MODULE$), (Object[]) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Object[]{map2, this.done$2.future().value()})).toArray(ClassTag$.MODULE$.Object()));
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                this.$outer.akka$kafka$internal$CommittableConsumerStage$$anon$$decrementConfirmation().invoke(NotUsed$.MODULE$);
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/kafka/internal/CommittableConsumerStage<TK;TV;>.$anon$1;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.done$2 = promise;
            }
        });
        poll();
    }

    @Override // akka.kafka.internal.ConsumerStageLogic
    public void onTimer(Object obj) {
        if (!akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout().equals(obj)) {
            super.onTimer(obj);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stop timeout, commits in progress ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation())})));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        completeStage();
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.ConsumerStageLogic
    public void stopInternal() {
        if (stopping()) {
            return;
        }
        stopping_$eq(true);
        scheduleOnce(akka$kafka$internal$CommittableConsumerStage$$anon$$StopTimeout(), this.$outer.akka$kafka$internal$CommittableConsumerStage$$settings.stopTimeout());
        poll();
    }

    public /* synthetic */ CommittableConsumerStage akka$kafka$internal$CommittableConsumerStage$$anon$$$outer() {
        return this.$outer;
    }

    public final Option akka$kafka$internal$CommittableConsumerStage$$anon$$commit$1() {
        return akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation() > 0 ? pollCommitTimeout() : None$.MODULE$;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public CommittableConsumerStage$$anon$1(CommittableConsumerStage<K, V> committableConsumerStage) {
        super(committableConsumerStage.akka$kafka$internal$CommittableConsumerStage$$settings, (KafkaConsumer) committableConsumerStage.akka$kafka$internal$CommittableConsumerStage$$consumerProvider.apply(), committableConsumerStage.out(), committableConsumerStage.m3shape());
        if (committableConsumerStage == null) {
            throw null;
        }
        this.$outer = committableConsumerStage;
        this.pollCommitTimeout = new Some<>(committableConsumerStage.akka$kafka$internal$CommittableConsumerStage$$settings.pollCommitTimeout());
        this.akka$kafka$internal$CommittableConsumerStage$$anon$$awaitingConfirmation = 0L;
        this.stopping = false;
        this.akka$kafka$internal$CommittableConsumerStage$$anon$$decrementConfirmation = getAsyncCallback(new CommittableConsumerStage$$anon$1$$anonfun$4(this));
        this.commitBatchCallback = getAsyncCallback(new CommittableConsumerStage$$anon$1$$anonfun$5(this));
        this.commitSingleCallback = getAsyncCallback(new CommittableConsumerStage$$anon$1$$anonfun$6(this));
    }
}
