package io.smallrye.reactive.messaging.kafka.fault;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.class */
public class KafkaDeadLetterQueue implements KafkaFailureHandler {
    public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
    public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
    public static final String DEAD_LETTER_REASON = "dead-letter-reason";
    public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
    public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
    public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
    public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";
    public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue";
    private final String channel;
    private final KafkaSink dlqSink;
    private final UnicastProcessor<Message<?>> dlqSource;
    private final String topic;

    @ApplicationScoped
    @Identifier("dead-letter-queue")
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue$Factory.class */
    public static class Factory implements KafkaFailureHandler.Factory {

        @Inject
        KafkaCDIEvents kafkaCDIEvents;

        @Inject
        @Any
        Instance<SerializationFailureHandler<?>> serializationFailureHandlers;

        @Inject
        @Any
        Instance<ClientCustomizer<Map<String, Object>>> configCustomizers;

        @Inject
        @Any
        Instance<ProducerInterceptor<?, ?>> producerInterceptors;

        @Inject
        Instance<Config> rootConfig;

        @Inject
        @Any
        Instance<Map<String, Object>> configurations;

        @Inject
        Instance<OpenTelemetry> openTelemetryInstance;

        @Inject
        Instance<SubscriberDecorator> subscriberDecorators;

        @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler.Factory
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            HashMap hashMap = new HashMap(kafkaConsumer.configuration());
            String str = (String) hashMap.remove("key.deserializer");
            String str2 = (String) hashMap.remove("value.deserializer");
            String str3 = (String) kafkaConsumer.configuration().get("client.id");
            KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration = new KafkaConnectorOutgoingConfiguration(ConfigHelper.retrieveChannelConfiguration(this.configurations, new OverrideConnectorConfig("mp.messaging.incoming.", (Config) this.rootConfig.get(), KafkaConnector.CONNECTOR_NAME, kafkaConnectorIncomingConfiguration.getChannel(), "dead-letter-queue", Map.of("key.serializer", overrideConnectorConfig -> {
                return KafkaDeadLetterQueue.getMirrorSerializer(str);
            }, "value.serializer", overrideConnectorConfig2 -> {
                return KafkaDeadLetterQueue.getMirrorSerializer(str2);
            }, "client.id", overrideConnectorConfig3 -> {
                return kafkaConnectorIncomingConfiguration.getDeadLetterQueueProducerClientId().orElse("kafka-dead-letter-topic-producer-" + str3);
            }, "topic", overrideConnectorConfig4 -> {
                return "dead-letter-topic-" + kafkaConnectorIncomingConfiguration.getChannel();
            }, "key-serialization-failure-handler", overrideConnectorConfig5 -> {
                return "dlq-serialization";
            }, "value-serialization-failure-handler", overrideConnectorConfig6 -> {
                return "dlq-serialization";
            }, "interceptor.classes", overrideConnectorConfig7 -> {
                return "";
            }))));
            String orElse = kafkaConnectorIncomingConfiguration.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + kafkaConnectorIncomingConfiguration.getChannel());
            KafkaLogging.log.deadLetterConfig(kafkaConnectorOutgoingConfiguration.getTopic().orElse(null), kafkaConnectorOutgoingConfiguration.getKeySerializer(), kafkaConnectorOutgoingConfiguration.getValueSerializer());
            UnicastProcessor create = UnicastProcessor.create();
            KafkaSink kafkaSink = new KafkaSink(kafkaConnectorOutgoingConfiguration, this.kafkaCDIEvents, this.openTelemetryInstance, this.configCustomizers, this.serializationFailureHandlers, this.producerInterceptors);
            Wiring.wireOutgoingConnectorToUpstream(create, kafkaSink.getSink(), this.subscriberDecorators, kafkaConnectorOutgoingConfiguration.getChannel() + "-dead-letter-queue");
            return new KafkaDeadLetterQueue(kafkaConnectorIncomingConfiguration.getChannel(), orElse, kafkaSink, create);
        }
    }

    public KafkaDeadLetterQueue(String str, String str2, KafkaSink kafkaSink, UnicastProcessor<Message<?>> unicastProcessor) {
        this.channel = str;
        this.topic = str2;
        this.dlqSink = kafkaSink;
        this.dlqSource = unicastProcessor;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getMirrorSerializer(String str) {
        return str == null ? StringSerializer.class.getName() : str.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable th) {
        String message = th.getMessage();
        if (message == null) {
            message = th.toString();
        }
        return message;
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th, Metadata metadata) {
        OutgoingKafkaRecordMetadata outgoingKafkaRecordMetadata = metadata != null ? (OutgoingKafkaRecordMetadata) metadata.get(OutgoingKafkaRecordMetadata.class).orElse(null) : null;
        String str = this.topic;
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getTopic() != null) {
            str = outgoingKafkaRecordMetadata.getTopic();
        }
        Object key = incomingKafkaRecord.getKey();
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getKey() != null) {
            key = outgoingKafkaRecordMetadata.getKey();
        }
        Integer num = null;
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getPartition() >= 0) {
            num = Integer.valueOf(outgoingKafkaRecordMetadata.getPartition());
        }
        ProducerRecord<?, ?> producerRecord = new ProducerRecord<>(str, num, key, incomingKafkaRecord.getPayload());
        addHeader(producerRecord, DEAD_LETTER_EXCEPTION_CLASS_NAME, th.getClass().getName());
        addHeader(producerRecord, DEAD_LETTER_REASON, getThrowableMessage(th));
        if (th.getCause() != null) {
            addHeader(producerRecord, DEAD_LETTER_CAUSE_CLASS_NAME, th.getCause().getClass().getName());
            addHeader(producerRecord, DEAD_LETTER_CAUSE, getThrowableMessage(th.getCause()));
        }
        addHeader(producerRecord, DEAD_LETTER_TOPIC, incomingKafkaRecord.getTopic());
        addHeader(producerRecord, DEAD_LETTER_PARTITION, Integer.toString(incomingKafkaRecord.getPartition()));
        addHeader(producerRecord, DEAD_LETTER_OFFSET, Long.toString(incomingKafkaRecord.getOffset()));
        incomingKafkaRecord.getHeaders().forEach(header -> {
            producerRecord.headers().add(header);
        });
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getHeaders() != null) {
            outgoingKafkaRecordMetadata.getHeaders().forEach(header2 -> {
                producerRecord.headers().add(header2);
            });
        }
        producerRecord.headers().remove(DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ);
        KafkaLogging.log.messageNackedDeadLetter(this.channel, str);
        CompletableFuture completableFuture = new CompletableFuture();
        this.dlqSource.onNext(incomingKafkaRecord.withPayload(producerRecord).withAck(() -> {
            return incomingKafkaRecord.ack().thenAccept(r4 -> {
                completableFuture.complete(null);
            });
        }).withNack(th2 -> {
            completableFuture.completeExceptionally(th2);
            return completableFuture;
        }));
        Uni completionStage = Uni.createFrom().completionStage(completableFuture);
        Objects.requireNonNull(incomingKafkaRecord);
        return completionStage.emitOn(incomingKafkaRecord::runOnMessageContext);
    }

    void addHeader(ProducerRecord<?, ?> producerRecord, String str, String str2) {
        producerRecord.headers().add(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public void terminate() {
        this.dlqSink.closeQuietly();
    }
}
