package io.smallrye.reactive.messaging.pulsar.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.Consumer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.class */
public class PulsarReconsumeLater implements PulsarFailureHandler {
    public static final String STRATEGY_NAME = "reconsume-later";
    private final Consumer<?> consumer;
    private final String channel;
    private final Duration defaultDelay;

    @ApplicationScoped
    @Identifier(PulsarReconsumeLater.STRATEGY_NAME)
    /* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater$Factory.class */
    public static class Factory implements PulsarFailureHandler.Factory {
        @Override // io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler.Factory
        public PulsarFailureHandler create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration, BiConsumer<Throwable, Boolean> biConsumer) {
            return new PulsarReconsumeLater(consumer, pulsarConnectorIncomingConfiguration.getChannel(), Duration.ofSeconds(pulsarConnectorIncomingConfiguration.getReconsumeLaterDelay().longValue()));
        }
    }

    public PulsarReconsumeLater(Consumer<?> consumer, String str, Duration duration) {
        this.consumer = consumer;
        this.channel = str;
        this.defaultDelay = duration;
    }

    @Override // io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler
    public Uni<Void> handle(PulsarIncomingMessage<?> pulsarIncomingMessage, Throwable th, Metadata metadata) {
        Optional flatMap = Optional.ofNullable(metadata).flatMap(metadata2 -> {
            return metadata2.get(PulsarReconsumeLaterMetadata.class);
        });
        Duration duration = (Duration) flatMap.map((v0) -> {
            return v0.getDelay();
        }).orElse(this.defaultDelay);
        Map map = (Map) flatMap.map((v0) -> {
            return v0.getCustomProperties();
        }).orElse(null);
        PulsarLogging.log.messageFailureDelayed(this.channel, duration.toSeconds(), th.getMessage());
        PulsarLogging.log.messageFailureFullCause(th);
        Uni completionStage = Uni.createFrom().completionStage(() -> {
            return this.consumer.reconsumeLaterAsync(pulsarIncomingMessage.unwrap(), map, duration.toSeconds(), TimeUnit.SECONDS);
        });
        Objects.requireNonNull(pulsarIncomingMessage);
        return completionStage.emitOn(pulsarIncomingMessage::runOnMessageContext).onItem().transformToUni(r4 -> {
            return Uni.createFrom().completionStage(pulsarIncomingMessage.ack());
        }).onFailure().recoverWithUni(th2 -> {
            return Uni.createFrom().completionStage(pulsarIncomingMessage.nack(th2));
        });
    }
}
