package io.smallrye.reactive.messaging.pulsar.ack;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Objects;
import java.util.Optional;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/ack/PulsarCumulativeAck.class */
public class PulsarCumulativeAck implements PulsarAckHandler {
    public static final String STRATEGY_NAME = "cumulative";
    private final Consumer<?> consumer;

    @ApplicationScoped
    @Identifier(PulsarCumulativeAck.STRATEGY_NAME)
    /* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/ack/PulsarCumulativeAck$Factory.class */
    public static class Factory implements PulsarAckHandler.Factory {
        @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler.Factory
        public PulsarCumulativeAck create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration) {
            return new PulsarCumulativeAck(consumer);
        }

        @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler.Factory
        public /* bridge */ /* synthetic */ PulsarAckHandler create(Consumer consumer, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration) {
            return create((Consumer<?>) consumer, pulsarConnectorIncomingConfiguration);
        }
    }

    public PulsarCumulativeAck(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler
    public Uni<Void> handle(PulsarIncomingMessage<?> pulsarIncomingMessage) {
        MessageId messageId = pulsarIncomingMessage.getMessageId();
        pulsarIncomingMessage.unwrap().release();
        Uni completionStage = Uni.createFrom().completionStage(() -> {
            Optional metadata = pulsarIncomingMessage.getMetadata(PulsarTransactionMetadata.class);
            return metadata.isPresent() ? this.consumer.acknowledgeCumulativeAsync(messageId, ((PulsarTransactionMetadata) metadata.get()).getTransaction()) : this.consumer.acknowledgeCumulativeAsync(messageId);
        });
        Objects.requireNonNull(pulsarIncomingMessage);
        return completionStage.emitOn(pulsarIncomingMessage::runOnMessageContext);
    }
}
