/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.util.ArrayList;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Seek {
    private static final Logger logger = LoggerFactory.getLogger(Seek.class);

    public static Object seek(ObjectValue consumerObject, MapValue<String, Object> partitionOffset) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        TopicPartition topicPartition = KafkaUtils.createTopicPartitionFromPartitionOffset(partitionOffset);
        Long offset = partitionOffset.getIntValue("offset");
        try {
            kafkaConsumer.seek(topicPartition, offset.longValue());
        }
        catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "seek");
            return KafkaUtils.createKafkaError("Failed to seek the consumer: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }

    public static Object seekToBeginning(ObjectValue consumerObject, BArray topicPartitions) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        ArrayList<TopicPartition> partitionList = KafkaUtils.getTopicPartitionList(topicPartitions, logger);
        try {
            kafkaConsumer.seekToBeginning(partitionList);
        }
        catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "seek_to_beginning");
            return KafkaUtils.createKafkaError("Failed to seek the consumer to the beginning: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }

    public static Object seekToEnd(ObjectValue consumerObject, BArray topicPartitions) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        ArrayList<TopicPartition> partitionList = KafkaUtils.getTopicPartitionList(topicPartitions, logger);
        try {
            kafkaConsumer.seekToEnd(partitionList);
        }
        catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "seek_to_end");
            return KafkaUtils.createKafkaError("Failed to seek the consumer to the end: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }
}

