package akka.kafka.internal;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.kafka.ConsumerFailed;
import akka.kafka.KafkaConsumerActor$Internal$Messages;
import akka.kafka.KafkaConsumerActor$Internal$RequestMessages;
import akka.kafka.internal.SubSourceLogic;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.package$;
import scala.runtime.BoxedUnit;

/* compiled from: SubSourceLogic.scala */
/* loaded from: input_file:akka/kafka/internal/SubSourceLogic$SubSourceStage$$anon$1.class */
public final class SubSourceLogic$SubSourceStage$$anon$1 extends GraphStageLogic implements PromiseControl {
    private final SourceShape<Msg> shape;
    private final KafkaConsumerActor$Internal$RequestMessages requestMessages;
    private boolean requested;
    private GraphStageLogic.StageActor self;
    private Iterator<ConsumerRecord<K, V>> buffer;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise;
    private final AsyncCallback<ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback;
    private final /* synthetic */ SubSourceLogic.SubSourceStage $outer;

    @Override // akka.kafka.internal.PromiseControl
    public /* synthetic */ void akka$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performStop() {
        performStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> stop() {
        Future<Done> stop;
        stop = stop();
        return stop;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> shutdown() {
        Future<Done> shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> isShutdown() {
        Future<Done> isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.akka$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise() {
        return this.akka$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public AsyncCallback<ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback() {
        return this.akka$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$stopPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback<ControlOperation> asyncCallback) {
        this.akka$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    private KafkaConsumerActor$Internal$RequestMessages requestMessages() {
        return this.requestMessages;
    }

    private boolean requested() {
        return this.requested;
    }

    private void requested_$eq(boolean z) {
        this.requested = z;
    }

    private GraphStageLogic.StageActor self() {
        return this.self;
    }

    private void self_$eq(GraphStageLogic.StageActor stageActor) {
        this.self = stageActor;
    }

    private Iterator<ConsumerRecord<K, V>> buffer() {
        return this.buffer;
    }

    private void buffer_$eq(Iterator<ConsumerRecord<K, V>> iterator) {
        this.buffer = iterator;
    }

    public void preStart() {
        super.preStart();
        this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$$outer().subsourceStartedCB().invoke(new Tuple2(this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$tp, this));
        self_$eq(getStageActor(tuple2 -> {
            $anonfun$preStart$4(this, tuple2);
            return BoxedUnit.UNIT;
        }));
        self().watch(this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$consumer);
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performShutdown() {
        completeStage();
    }

    public void akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$pump() {
        while (isAvailable(this.$outer.out())) {
            if (!buffer().hasNext()) {
                if (requested()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                requested_$eq(true);
                this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$consumer.tell(requestMessages(), self().ref());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
            push(this.$outer.out(), this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$$outer().createMessage((ConsumerRecord) buffer().next()));
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public /* synthetic */ SubSourceLogic.SubSourceStage akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$preStart$4(SubSourceLogic$SubSourceStage$$anon$1 subSourceLogic$SubSourceStage$$anon$1, Tuple2 tuple2) {
        if (tuple2 != null) {
            Object _2 = tuple2._2();
            if (_2 instanceof KafkaConsumerActor$Internal$Messages) {
                KafkaConsumerActor$Internal$Messages kafkaConsumerActor$Internal$Messages = (KafkaConsumerActor$Internal$Messages) _2;
                subSourceLogic$SubSourceStage$$anon$1.requested_$eq(false);
                if (subSourceLogic$SubSourceStage$$anon$1.buffer().hasNext()) {
                    subSourceLogic$SubSourceStage$$anon$1.buffer_$eq(subSourceLogic$SubSourceStage$$anon$1.buffer().$plus$plus(() -> {
                        return kafkaConsumerActor$Internal$Messages.messages();
                    }));
                } else {
                    subSourceLogic$SubSourceStage$$anon$1.buffer_$eq(kafkaConsumerActor$Internal$Messages.messages());
                }
                subSourceLogic$SubSourceStage$$anon$1.akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$pump();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            Object _22 = tuple2._2();
            if (_22 instanceof Terminated) {
                ActorRef actor = ((Terminated) _22).actor();
                ActorRef actorRef = subSourceLogic$SubSourceStage$$anon$1.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$consumer;
                if (actor != null ? actor.equals(actorRef) : actorRef == null) {
                    subSourceLogic$SubSourceStage$$anon$1.failStage(new ConsumerFailed());
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        throw new MatchError(tuple2);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubSourceLogic$SubSourceStage$$anon$1(SubSourceLogic<K, V, Msg>.SubSourceStage subSourceStage) {
        super(subSourceStage.m44shape());
        if (subSourceStage == null) {
            throw null;
        }
        this.$outer = subSourceStage;
        PromiseControl.$init$(this);
        this.shape = subSourceStage.m44shape();
        this.requestMessages = new KafkaConsumerActor$Internal$RequestMessages(0, Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{subSourceStage.akka$kafka$internal$SubSourceLogic$SubSourceStage$$tp})));
        this.requested = false;
        this.buffer = package$.MODULE$.Iterator().empty();
        setHandler(subSourceStage.out(), new OutHandler(this) { // from class: akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anon$3
            private final /* synthetic */ SubSourceLogic$SubSourceStage$$anon$1 $outer;

            public void onPull() {
                this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$pump();
            }

            public void onDownstreamFinish() {
                this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceLogic$SubSourceStage$$$outer().subsourceCancelledCB().invoke(this.$outer.akka$kafka$internal$SubSourceLogic$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceLogic$SubSourceStage$$tp);
                OutHandler.onDownstreamFinish$(this);
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/kafka/internal/SubSourceLogic<TK;TV;TMsg;>.SubSourceStage$$anon$1;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
