package org.ballerinalang.messaging.kafka.nativeimpl.producer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.ballerinalang.messaging.kafka.utils.TransactionUtils;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/producer/ProducerActions.class */
public class ProducerActions {
    public static Object init(ObjectValue objectValue) {
        Properties processKafkaProducerConfig = KafkaUtils.processKafkaProducerConfig(objectValue.getMapValue(KafkaConstants.PRODUCER_CONFIG_FIELD_NAME));
        try {
            if (!Objects.nonNull(processKafkaProducerConfig.get("transactional.id"))) {
                KafkaUtils.createKafkaProducer(processKafkaProducerConfig, objectValue);
            } else {
                if (!((Boolean) processKafkaProducerConfig.get("enable.idempotence")).booleanValue()) {
                    throw new IllegalStateException("configuration enableIdempotence must be set to true to enable transactional producer");
                }
                KafkaUtils.createKafkaProducer(processKafkaProducerConfig, objectValue);
                objectValue.addNativeData(KafkaConstants.TRANSACTION_CONTEXT, TransactionUtils.createKafkaTransactionContext(objectValue));
            }
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_CONNECTION);
            return KafkaUtils.createKafkaError("Failed to initialize the producer: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }

    public static Object close(ObjectValue objectValue) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        try {
            ((KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER)).close();
            KafkaMetricsUtil.reportProducerClose(objectValue);
            return null;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, "close");
            return KafkaUtils.createKafkaError("Failed to close the Kafka producer: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }

    public static Object commitConsumer(ObjectValue objectValue, ObjectValue objectValue2) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue2.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        KafkaProducer kafkaProducer = (KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
            hashMap.put(new TopicPartition(topicPartition.topic(), topicPartition.partition()), new OffsetAndMetadata(kafkaConsumer.position(topicPartition)));
        }
        String stringValue = objectValue2.getMapValue(KafkaConstants.CONSUMER_CONFIG_FIELD_NAME).getStringValue(KafkaConstants.CONSUMER_GROUP_ID_CONFIG);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, objectValue);
            }
            kafkaProducer.sendOffsetsToTransaction(hashMap, stringValue);
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_COMMIT);
            return KafkaUtils.createKafkaError("Failed to commit consumer: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }

    public static Object commitConsumerOffsets(ObjectValue objectValue, BArray bArray, String str) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue);
        KafkaProducer kafkaProducer = (KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = KafkaUtils.getPartitionToMetadataMap(bArray);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, objectValue);
            }
            kafkaProducer.sendOffsetsToTransaction(partitionToMetadataMap, str);
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_COMMIT);
            return KafkaUtils.createKafkaError("Failed to commit consumer offsets: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }

    public static Object flushRecords(ObjectValue objectValue) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue);
        KafkaProducer kafkaProducer = (KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, objectValue);
            }
            kafkaProducer.flush();
            return null;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_FLUSH);
            return KafkaUtils.createKafkaError("Failed to flush Kafka records: " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }

    public static Object getTopicPartitions(ObjectValue objectValue, String str) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue, str);
        KafkaProducer kafkaProducer = (KafkaProducer) objectValue.getNativeData(KafkaConstants.NATIVE_PRODUCER);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, objectValue);
            }
            List<PartitionInfo> partitionsFor = kafkaProducer.partitionsFor(str);
            BArray createArrayValue = BValueCreator.createArrayValue(new BArrayType(KafkaUtils.getTopicPartitionRecord().getType()));
            for (PartitionInfo partitionInfo : partitionsFor) {
                createArrayValue.append(KafkaUtils.populateTopicPartitionRecord(partitionInfo.topic(), partitionInfo.partition()));
            }
            return createArrayValue;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_TOPIC_PARTITIONS);
            return KafkaUtils.createKafkaError("Failed to fetch partitions from the producer " + e.getMessage(), KafkaConstants.PRODUCER_ERROR);
        }
    }
}
