package org.ballerinalang.kafka.nativeimpl.producer.action;

import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.kafka.transaction.KafkaTransactionContext;
import org.ballerinalang.kafka.util.KafkaConstants;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BByteArray;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.natives.annotations.ReturnType;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.transactions.LocalTransactionInfo;

@BallerinaFunction(orgName = KafkaConstants.ORG_NAME, packageName = KafkaConstants.PACKAGE_NAME, functionName = "send", receiver = @Receiver(type = TypeKind.OBJECT, structType = KafkaConstants.PRODUCER_STRUCT_NAME, structPackage = KafkaConstants.KAFKA_NATIVE_PACKAGE), args = {@Argument(name = "value", type = TypeKind.BLOB), @Argument(name = "topic", type = TypeKind.STRING), @Argument(name = "key", type = TypeKind.UNION), @Argument(name = "partition", type = TypeKind.UNION), @Argument(name = "timestamp", type = TypeKind.UNION)}, returnType = {@ReturnType(type = TypeKind.NONE)})
/* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/producer/action/Send.class */
public class Send implements NativeCallableUnit {
    public void execute(Context context, CallableUnitCallback callableUnitCallback) {
        BMap refArgument = context.getRefArgument(0);
        BMap bMap = refArgument.get("producerHolder").get(new BString(KafkaConstants.NATIVE_PRODUCER));
        KafkaProducer kafkaProducer = (KafkaProducer) bMap.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        Properties properties = (Properties) bMap.getNativeData(KafkaConstants.NATIVE_PRODUCER_CONFIG);
        String stringArgument = context.getStringArgument(0);
        byte[] bytes = context.getRefArgument(1).getBytes();
        BByteArray nullableRefArgument = context.getNullableRefArgument(2);
        BInteger nullableRefArgument2 = context.getNullableRefArgument(3);
        BInteger nullableRefArgument3 = context.getNullableRefArgument(4);
        byte[] bytes2 = Objects.nonNull(nullableRefArgument) ? nullableRefArgument.getBytes() : null;
        Long value = Objects.nonNull(nullableRefArgument2) ? nullableRefArgument2.value() : null;
        Long value2 = Objects.nonNull(nullableRefArgument3) ? nullableRefArgument3.value() : null;
        ProducerRecord producerRecord = Objects.nonNull(value) ? new ProducerRecord(stringArgument, Integer.valueOf(value.intValue()), value2, bytes2, bytes) : new ProducerRecord(stringArgument, (Integer) null, value2, bytes2, bytes);
        if (Objects.isNull(kafkaProducer) || Objects.isNull(producerRecord)) {
            throw new BallerinaException("Kafka producer/record has not been initialized properly.");
        }
        try {
            if (Objects.nonNull(properties.get("transactional.id")) && context.isInTransaction()) {
                String stringValue = refArgument.get("connectorID").stringValue();
                LocalTransactionInfo localTransactionInfo = context.getLocalTransactionInfo();
                if (Objects.isNull(localTransactionInfo.getTransactionContext(stringValue))) {
                    localTransactionInfo.registerTransactionContext(stringValue, new KafkaTransactionContext(kafkaProducer));
                    kafkaProducer.beginTransaction();
                }
            }
            kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (Objects.nonNull(exc)) {
                    throw new BallerinaException("Failed to send message. " + exc.getMessage(), exc, context);
                }
                callableUnitCallback.notifySuccess();
            });
        } catch (IllegalStateException | KafkaException e) {
            throw new BallerinaException("Failed to send message. " + e.getMessage(), e, context);
        }
    }

    public boolean isBlocking() {
        return false;
    }
}
