/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.nativeimpl.producer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.messaging.kafka.impl.KafkaTransactionContext;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.ballerinalang.messaging.kafka.utils.TransactionUtils;

public class ProducerActions {
    public static Object init(ObjectValue producerObject) {
        MapValue configs = producerObject.getMapValue("producerConfig");
        Properties producerProperties = KafkaUtils.processKafkaProducerConfig((MapValue<String, Object>)configs);
        try {
            if (Objects.nonNull(producerProperties.get("transactional.id"))) {
                if (!((Boolean)producerProperties.get("enable.idempotence")).booleanValue()) {
                    throw new IllegalStateException("configuration enableIdempotence must be set to true to enable transactional producer");
                }
                KafkaUtils.createKafkaProducer(producerProperties, producerObject);
                KafkaTransactionContext transactionContext = TransactionUtils.createKafkaTransactionContext(producerObject);
                producerObject.addNativeData("TransactionInitiated", (Object)transactionContext);
            } else {
                KafkaUtils.createKafkaProducer(producerProperties, producerObject);
            }
        }
        catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "connection");
            return KafkaUtils.createKafkaError("Failed to initialize the producer: " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
        return null;
    }

    public static Object close(ObjectValue producerObject) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), producerObject);
        KafkaProducer kafkaProducer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        try {
            kafkaProducer.close();
            KafkaMetricsUtil.reportProducerClose(producerObject);
        }
        catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "close");
            return KafkaUtils.createKafkaError("Failed to close the Kafka producer: " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
        return null;
    }

    public static Object commitConsumer(ObjectValue producerObject, ObjectValue consumer) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, producerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumer.getNativeData("KafkaConsumer");
        KafkaProducer kafkaProducer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        HashMap<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        Set topicPartitions = kafkaConsumer.assignment();
        for (TopicPartition topicPartition : topicPartitions) {
            long position = kafkaConsumer.position(topicPartition);
            partitionToMetadataMap.put(new TopicPartition(topicPartition.topic(), topicPartition.partition()), new OffsetAndMetadata(position));
        }
        MapValue consumerConfig = consumer.getMapValue("consumerConfig");
        String groupId = consumerConfig.getStringValue("groupId");
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, producerObject);
            }
            kafkaProducer.sendOffsetsToTransaction(partitionToMetadataMap, groupId);
        }
        catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "commit");
            return KafkaUtils.createKafkaError("Failed to commit consumer: " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
        return null;
    }

    public static Object commitConsumerOffsets(ObjectValue producerObject, BArray offsets, String groupId) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, producerObject);
        KafkaProducer kafkaProducer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = KafkaUtils.getPartitionToMetadataMap(offsets);
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, producerObject);
            }
            kafkaProducer.sendOffsetsToTransaction(partitionToMetadataMap, groupId);
        }
        catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "commit");
            return KafkaUtils.createKafkaError("Failed to commit consumer offsets: " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
        return null;
    }

    public static Object flushRecords(ObjectValue producerObject) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, producerObject);
        KafkaProducer kafkaProducer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, producerObject);
            }
            kafkaProducer.flush();
        }
        catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "flush");
            return KafkaUtils.createKafkaError("Failed to flush Kafka records: " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
        return null;
    }

    public static Object getTopicPartitions(ObjectValue producerObject, String topic) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, producerObject, topic);
        KafkaProducer kafkaProducer = (KafkaProducer)producerObject.getNativeData("KafkaProducer");
        try {
            if (strand.isInTransaction()) {
                TransactionUtils.handleTransactions(strand, producerObject);
            }
            List partitionInfoList = kafkaProducer.partitionsFor(topic);
            BArray topicPartitionArray = BValueCreator.createArrayValue((BArrayType)new BArrayType(KafkaUtils.getTopicPartitionRecord().getType()));
            for (PartitionInfo info : partitionInfoList) {
                MapValue<String, Object> partition = KafkaUtils.populateTopicPartitionRecord(info.topic(), info.partition());
                topicPartitionArray.append(partition);
            }
            return topicPartitionArray;
        }
        catch (KafkaException e) {
            KafkaMetricsUtil.reportProducerError(producerObject, "topic_partitions");
            return KafkaUtils.createKafkaError("Failed to fetch partitions from the producer " + e.getMessage(), "{ballerina/kafka}ProducerError");
        }
    }
}

