/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.nativeimpl.producer;

import java.util.Objects;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.ballerinalang.messaging.kafka.utils.TransactionUtils;

public class Send {
    protected static Object sendKafkaRecord(ProducerRecord record, ObjectValue producerObject) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, producerObject, record.topic());
        NonBlockingCallback callback = new NonBlockingCallback(strand);
        KafkaProducer producer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, producerObject);
            }
            producer.send(record, (metadata, e) -> {
                if (Objects.nonNull(e)) {
                    KafkaMetricsUtil.reportProducerError(producerObject, "publish");
                    callback.setReturnValues((Object)KafkaUtils.createKafkaError("Failed to send data to Kafka server: " + e.getMessage(), "{ballerina/kafka}ProducerError"));
                } else {
                    KafkaMetricsUtil.reportPublish(producerObject, record.topic(), record.value());
                    callback.setReturnValues(null);
                }
                callback.notifySuccess();
            });
        }
        catch (IllegalStateException | KafkaException e2) {
            KafkaMetricsUtil.reportProducerError(producerObject, "publish");
            callback.setReturnValues((Object)KafkaUtils.createKafkaError("Failed to send data to Kafka server: " + e2.getMessage(), "{ballerina/kafka}ProducerError"));
            callback.notifySuccess();
        }
        return null;
    }
}

