package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniMemoize;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils;
import io.vertx.core.Context;
import jakarta.enterprise.inject.Instance;
import java.lang.annotation.Annotation;
import java.security.AccessController;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.class */
public class ReactiveKafkaProducer<K, V> implements KafkaProducer<K, V> {
    private final AtomicBoolean closed;
    private final String clientId;
    private final ProducerInterceptor<K, V> interceptor;
    private final Uni<Producer<K, V>> producerUni;
    private final AtomicReference<Producer<K, V>> producerRef;
    private final ExecutorService kafkaWorker;
    private final Map<String, Object> kafkaConfiguration;
    private final String channel;
    private final int closetimeout;
    private Consumer<Throwable> reportFailure;

    public ReactiveKafkaProducer(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration, Instance<ClientCustomizer<Map<String, Object>>> instance, Instance<SerializationFailureHandler<?>> instance2, Instance<ProducerInterceptor<?, ?>> instance3, Consumer<Throwable> consumer, BiConsumer<Producer<?, ?>, Map<String, Object>> biConsumer) {
        this(getKafkaProducerConfiguration(kafkaConnectorOutgoingConfiguration, instance), kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getCloseTimeout().intValue(), kafkaConnectorOutgoingConfiguration.getLazyClient().booleanValue(), getProducerInterceptorBean(kafkaConnectorOutgoingConfiguration, instance3), createSerializationFailureHandler(kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getKeySerializationFailureHandler().orElse(null), instance2), createSerializationFailureHandler(kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getValueSerializationFailureHandler().orElse(null), instance2), biConsumer);
        this.reportFailure = consumer;
    }

    public String getClientId() {
        return this.clientId;
    }

    public ReactiveKafkaProducer(Map<String, Object> map, String str, int i, boolean z, ProducerInterceptor<K, V> producerInterceptor, SerializationFailureHandler<K> serializationFailureHandler, SerializationFailureHandler<V> serializationFailureHandler2, BiConsumer<Producer<?, ?>, Map<String, Object>> biConsumer) {
        this.closed = new AtomicBoolean(true);
        this.producerRef = new AtomicReference<>();
        this.kafkaConfiguration = map;
        this.channel = str;
        this.closetimeout = i;
        this.clientId = map.get("client.id").toString();
        this.interceptor = producerInterceptor;
        String str2 = (String) map.get("key.serializer");
        String str3 = (String) map.get("value.serializer");
        if (str3 == null) {
            throw KafkaExceptions.ex.missingValueSerializer(this.channel, this.channel);
        }
        SerializerWrapper serializerWrapper = new SerializerWrapper(str2, true, serializationFailureHandler);
        SerializerWrapper serializerWrapper2 = new SerializerWrapper(str3, false, serializationFailureHandler2);
        serializerWrapper.configure(map, true);
        serializerWrapper2.configure(map, false);
        if (producerInterceptor != null) {
            producerInterceptor.configure(map);
        }
        this.kafkaWorker = Executors.newSingleThreadExecutor(KafkaSendingThread::new);
        UniMemoize memoize = Uni.createFrom().item(() -> {
            return this.producerRef.updateAndGet(producer -> {
                if (producer != null) {
                    return producer;
                }
                org.apache.kafka.clients.producer.KafkaProducer kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(map, serializerWrapper, serializerWrapper2);
                if (map.containsKey("transactional.id")) {
                    kafkaProducer.initTransactions();
                }
                biConsumer.accept(kafkaProducer, map);
                this.closed.set(false);
                return kafkaProducer;
            });
        }).onFailure().invoke(th -> {
            KafkaLogging.log.unableToInitializeProducer(str, th);
            if (this.reportFailure != null) {
                this.reportFailure.accept(th);
            }
        }).memoize();
        AtomicBoolean atomicBoolean = this.closed;
        Objects.requireNonNull(atomicBoolean);
        this.producerUni = memoize.until(atomicBoolean::get).runSubscriptionOn(this.kafkaWorker);
        if (z) {
            return;
        }
        this.producerUni.await().indefinitely();
    }

    private Uni<Producer<K, V>> withProducerOnSendingThread() {
        return this.producerUni;
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public <T> Uni<T> runOnSendingThread(Function<Producer<K, V>, T> function) {
        return withProducerOnSendingThread().map(function);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> runOnSendingThread(Consumer<Producer<K, V>> consumer) {
        return withProducerOnSendingThread().invoke(consumer).replaceWithVoid();
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return withProducerOnSendingThread().chain(producer -> {
            ProducerRecord<K, V> interceptOnSend = interceptOnSend(producerRecord);
            return Uni.createFrom().emitter(uniEmitter -> {
                producer.send(interceptOnSend, (recordMetadata, exc) -> {
                    interceptOnAcknowledge(interceptOnSend, recordMetadata, exc);
                    if (exc == null) {
                        uniEmitter.complete(recordMetadata);
                        return;
                    }
                    if (producerRecord.topic() != null) {
                        KafkaLogging.log.unableToWrite(this.channel, producerRecord.topic(), exc);
                    } else {
                        KafkaLogging.log.unableToWrite(this.channel, exc);
                    }
                    uniEmitter.fail(exc);
                });
            });
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> flush() {
        return runOnSendingThread((v0) -> {
            v0.flush();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<List<PartitionInfo>> partitionsFor(String str) {
        return runOnSendingThread(producer -> {
            return producer.partitionsFor(str);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> initTransactions() {
        return runOnSendingThread((v0) -> {
            v0.initTransactions();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> beginTransaction() {
        return runOnSendingThread((v0) -> {
            v0.beginTransaction();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> commitTransaction() {
        return runOnSendingThread((v0) -> {
            v0.commitTransaction();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> abortTransaction() {
        return runOnSendingThread((v0) -> {
            v0.abortTransaction();
        }).onItem().invoke(() -> {
            KafkaLogging.log.transactionAborted(this.clientId, this.channel);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    @CheckReturnValue
    public Uni<Void> sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        return runOnSendingThread(producer -> {
            producer.sendOffsetsToTransaction(map, consumerGroupMetadata);
        });
    }

    private static <K, V> ProducerInterceptor<K, V> getProducerInterceptorBean(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration, Instance<ProducerInterceptor<?, ?>> instance) {
        return (ProducerInterceptor) kafkaConnectorOutgoingConfiguration.getInterceptorBean().flatMap(str -> {
            return CDIUtils.getInstanceById(instance, str).stream().findFirst();
        }).orElse(null);
    }

    private static <T> SerializationFailureHandler<T> createSerializationFailureHandler(String str, String str2, Instance<SerializationFailureHandler<?>> instance) {
        if (str2 == null) {
            return null;
        }
        Instance select = instance.select(new Annotation[]{Identifier.Literal.of(str2)});
        if (select.isUnsatisfied()) {
            throw KafkaExceptions.ex.unableToFindSerializationFailureHandler(str2, str);
        }
        if (select.stream().count() > 1) {
            throw KafkaExceptions.ex.unableToFindSerializationFailureHandler(str2, str, (int) select.stream().count());
        }
        if (select.stream().count() == 1) {
            return (SerializationFailureHandler) select.get();
        }
        return null;
    }

    private static Map<String, Object> getKafkaProducerConfiguration(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration, Instance<ClientCustomizer<Map<String, Object>>> instance) {
        HashMap hashMap = new HashMap();
        JsonHelper.asJsonObject(kafkaConnectorOutgoingConfiguration.config()).forEach(entry -> {
            hashMap.put((String) entry.getKey(), entry.getValue().toString());
        });
        hashMap.put("acks", kafkaConnectorOutgoingConfiguration.getAcks());
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
            hashMap.put("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
        }
        if (!hashMap.containsKey("key.serializer")) {
            KafkaLogging.log.keySerializerOmitted();
            hashMap.put("key.serializer", kafkaConnectorOutgoingConfiguration.getKeySerializer());
        }
        hashMap.compute("client.id", (str, obj) -> {
            return obj == null ? kafkaConnectorOutgoingConfiguration.getClientIdPrefix().orElse("kafka-producer-") + kafkaConnectorOutgoingConfiguration.getChannel() : kafkaConnectorOutgoingConfiguration.getClientIdPrefix().orElse("") + obj;
        });
        if (!hashMap.containsKey("reconnect.backoff.max.ms")) {
            hashMap.put("reconnect.backoff.max.ms", "10000");
        }
        ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
        return (Map) ConfigUtils.customize(kafkaConnectorOutgoingConfiguration.config(), instance, hashMap);
    }

    public String get(String str) {
        return (String) this.kafkaConfiguration.get(str);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Producer<K, V> unwrap() {
        return this.producerRef.get();
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            int i = this.closetimeout;
            UniOnItem onItem = runOnSendingThread(producer -> {
                interceptClose();
                if (System.getSecurityManager() == null) {
                    producer.close(Duration.ofMillis(i));
                } else {
                    AccessController.doPrivileged(() -> {
                        producer.close(Duration.ofMillis(i));
                        return null;
                    });
                }
            }).onItem();
            ExecutorService executorService = this.kafkaWorker;
            Objects.requireNonNull(executorService);
            Uni invoke = onItem.invoke(executorService::shutdown);
            if (Context.isOnEventLoopThread()) {
                invoke.subscribeAsCompletionStage();
            } else {
                invoke.await().atMost(Duration.ofMillis(i * 2));
            }
        }
    }

    private ProducerRecord<K, V> interceptOnSend(ProducerRecord<K, V> producerRecord) {
        if (this.interceptor != null) {
            try {
                return this.interceptor.onSend(producerRecord);
            } catch (Throwable th) {
                KafkaLogging.log.interceptorOnSendError(this.channel, th);
            }
        }
        return producerRecord;
    }

    private void interceptOnAcknowledge(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exc) {
        RecordMetadata recordMetadataForFailure;
        if (this.interceptor != null) {
            if (exc == null) {
                recordMetadataForFailure = recordMetadata;
            } else {
                try {
                    recordMetadataForFailure = getRecordMetadataForFailure(producerRecord);
                } catch (Throwable th) {
                    KafkaLogging.log.interceptorOnAcknowledgeError(this.channel, th);
                    return;
                }
            }
            this.interceptor.onAcknowledgement(recordMetadataForFailure, exc);
        }
    }

    private static RecordMetadata getRecordMetadataForFailure(ProducerRecord<?, ?> producerRecord) {
        return new RecordMetadata(new TopicPartition(producerRecord.topic(), producerRecord.partition() != null ? producerRecord.partition().intValue() : -1), -1L, -1, -1L, -1, -1);
    }

    private void interceptClose() {
        if (this.interceptor != null) {
            try {
                this.interceptor.close();
            } catch (Throwable th) {
                KafkaLogging.log.interceptorCloseError(this.channel, th);
            }
        }
    }
}
