/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerConnection {
    private static final Logger logger = LoggerFactory.getLogger(BrokerConnection.class);
    private static final PrintStream console = System.out;

    public static Object close(ObjectValue consumerObject, long duration) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        Properties consumerProperties = (Properties)consumerObject.getNativeData("KafkaConsumerConfig");
        int defaultApiTimeout = KafkaUtils.getDefaultApiTimeout(consumerProperties);
        int apiTimeout = KafkaUtils.getIntFromLong(duration, logger, "duration");
        try {
            if (apiTimeout > -1) {
                BrokerConnection.closeWithDuration(kafkaConsumer, apiTimeout);
            } else if (defaultApiTimeout > -1) {
                BrokerConnection.closeWithDuration(kafkaConsumer, defaultApiTimeout);
            } else {
                kafkaConsumer.close();
            }
            KafkaMetricsUtil.reportConsumerClose(consumerObject);
        }
        catch (KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "close");
            return KafkaUtils.createKafkaError("Failed to close the connection from Kafka server: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }

    public static Object connect(ObjectValue consumerObject) {
        if (Objects.nonNull(consumerObject.getNativeData("KafkaConsumer"))) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "connection");
            return KafkaUtils.createKafkaError("Kafka consumer is already connected to external broker. Please close it before re-connecting the external broker again.", "{ballerina/kafka}ConsumerError");
        }
        MapValue configs = consumerObject.getMapValue("consumerConfig");
        Properties consumerProperties = KafkaUtils.processKafkaConsumerConfig((MapValue<String, Object>)configs);
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
            consumerObject.addNativeData("KafkaConsumer", (Object)kafkaConsumer);
            consumerObject.addNativeData("KafkaConsumerConfig", (Object)consumerProperties);
            consumerObject.addNativeData("bootstrap.servers", (Object)consumerProperties.getProperty("bootstrap.servers"));
            consumerObject.addNativeData("client.id", (Object)KafkaUtils.getClientIdFromProperties(consumerProperties));
            KafkaMetricsUtil.reportNewConsumer(consumerObject);
        }
        catch (KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "connection");
            return KafkaUtils.createKafkaError("Cannot connect to the kafka server: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        console.println("[ballerina/kafka] kafka servers: " + configs.get((Object)"bootstrapServers"));
        return null;
    }

    public static Object pause(ObjectValue consumerObject, BArray topicPartitions) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        ArrayList<TopicPartition> partitionList = KafkaUtils.getTopicPartitionList(topicPartitions, logger);
        try {
            kafkaConsumer.pause(partitionList);
        }
        catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "get_pause");
            return KafkaUtils.createKafkaError("Failed to pause topic partitions for the consumer: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }

    public static Object resume(ObjectValue consumerObject, BArray topicPartitions) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), consumerObject);
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumerObject.getNativeData("KafkaConsumer");
        ArrayList<TopicPartition> partitionList = KafkaUtils.getTopicPartitionList(topicPartitions, logger);
        try {
            kafkaConsumer.resume(partitionList);
        }
        catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(consumerObject, "resume");
            return KafkaUtils.createKafkaError("Failed to resume topic partitions for the consumer: " + e.getMessage(), "{ballerina/kafka}ConsumerError");
        }
        return null;
    }

    private static void closeWithDuration(KafkaConsumer kafkaConsumer, long timeout) {
        Duration duration = Duration.ofMillis(timeout);
        kafkaConsumer.close(duration);
    }
}

