package org.ballerinalang.kafka.nativeimpl.producer.action;

import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.kafka.util.KafkaConstants;
import org.ballerinalang.kafka.util.KafkaUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.util.exceptions.BallerinaException;

@BallerinaFunction(orgName = KafkaConstants.ORG_NAME, packageName = KafkaConstants.PACKAGE_NAME, functionName = "init", receiver = @Receiver(type = TypeKind.OBJECT, structType = KafkaConstants.PRODUCER_STRUCT_NAME, structPackage = KafkaConstants.KAFKA_NATIVE_PACKAGE), args = {@Argument(name = "producerConfig", type = TypeKind.RECORD, structType = KafkaConstants.PRODUCER_CONFIG_STRUCT_NAME)})
/* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/producer/action/Init.class */
public class Init implements NativeCallableUnit {
    public void execute(Context context, CallableUnitCallback callableUnitCallback) {
        BMap refArgument = context.getRefArgument(0);
        Properties processKafkaProducerConfig = KafkaUtils.processKafkaProducerConfig(context.getRefArgument(1));
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(processKafkaProducerConfig);
            if (Objects.isNull(kafkaProducer)) {
                throw new BallerinaException("Kafka producer has not been initialized properly.");
            }
            if (processKafkaProducerConfig.get("transactional.id") != null) {
                kafkaProducer.initTransactions();
            }
            BMap bMap = refArgument.get("producerHolder");
            BMap<String, BValue> createKafkaPackageStruct = KafkaUtils.createKafkaPackageStruct(context, KafkaConstants.PRODUCER_STRUCT_NAME);
            createKafkaPackageStruct.addNativeData(KafkaConstants.NATIVE_PRODUCER, kafkaProducer);
            createKafkaPackageStruct.addNativeData(KafkaConstants.NATIVE_PRODUCER_CONFIG, processKafkaProducerConfig);
            bMap.put(new BString(KafkaConstants.NATIVE_PRODUCER), createKafkaPackageStruct);
            callableUnitCallback.notifySuccess();
        } catch (IllegalStateException | KafkaException e) {
            throw new BallerinaException("Failed to initialize the producer " + e.getMessage(), e, context);
        }
    }

    public boolean isBlocking() {
        return false;
    }
}
