package org.ballerinalang.messaging.kafka.nativeimpl.producer;

import java.util.Objects;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.ballerinalang.messaging.kafka.utils.TransactionUtils;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/producer/Send.class */
public class Send {
    /* JADX INFO: Access modifiers changed from: protected */
    public static Object sendKafkaRecord(ProducerRecord producerRecord, ObjectValue objectValue) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue, producerRecord.topic());
        NonBlockingCallback nonBlockingCallback = new NonBlockingCallback(strand);
        KafkaProducer kafkaProducer = (KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, objectValue);
            }
            kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (Objects.nonNull(exc)) {
                    KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_PUBLISH);
                    nonBlockingCallback.setReturnValues(KafkaUtils.createKafkaError("Failed to send data to Kafka server: " + exc.getMessage(), KafkaConstants.PRODUCER_ERROR));
                } else {
                    KafkaMetricsUtil.reportPublish(objectValue, producerRecord.topic(), producerRecord.value());
                    nonBlockingCallback.setReturnValues((Object) null);
                }
                nonBlockingCallback.notifySuccess();
            });
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_PUBLISH);
            nonBlockingCallback.setReturnValues(KafkaUtils.createKafkaError("Failed to send data to Kafka server: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR));
            nonBlockingCallback.notifySuccess();
            return null;
        }
    }
}
