package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.io.PrintStream;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/consumer/BrokerConnection.class */
public class BrokerConnection {
    private static final Logger logger = LoggerFactory.getLogger(BrokerConnection.class);
    private static final PrintStream console = System.out;

    public static Object close(ObjectValue objectValue, long j) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        int defaultApiTimeout = KafkaUtils.getDefaultApiTimeout((Properties) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER_CONFIG));
        int intFromLong = KafkaUtils.getIntFromLong(j, logger, KafkaConstants.ALIAS_DURATION);
        try {
            if (intFromLong > -1) {
                closeWithDuration(kafkaConsumer, intFromLong);
            } else if (defaultApiTimeout > -1) {
                closeWithDuration(kafkaConsumer, defaultApiTimeout);
            } else {
                kafkaConsumer.close();
            }
            KafkaMetricsUtil.reportConsumerClose(objectValue);
            return null;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, "close");
            return KafkaUtils.createKafkaError("Failed to close the connection from Kafka server: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    public static Object connect(ObjectValue objectValue) {
        if (Objects.nonNull(objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER))) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_CONNECTION);
            return KafkaUtils.createKafkaError("Kafka consumer is already connected to external broker. Please close it before re-connecting the external broker again.", KafkaConstants.CONSUMER_ERROR);
        }
        MapValue mapValue = objectValue.getMapValue(KafkaConstants.CONSUMER_CONFIG_FIELD_NAME);
        Properties processKafkaConsumerConfig = KafkaUtils.processKafkaConsumerConfig(mapValue);
        try {
            objectValue.addNativeData(KafkaConstants.NATIVE_CONSUMER, new KafkaConsumer(processKafkaConsumerConfig));
            objectValue.addNativeData(KafkaConstants.NATIVE_CONSUMER_CONFIG, processKafkaConsumerConfig);
            objectValue.addNativeData(KafkaConstants.BOOTSTRAP_SERVERS, processKafkaConsumerConfig.getProperty(KafkaConstants.BOOTSTRAP_SERVERS));
            objectValue.addNativeData(KafkaConstants.CLIENT_ID, KafkaUtils.getClientIdFromProperties(processKafkaConsumerConfig));
            KafkaMetricsUtil.reportNewConsumer(objectValue);
            console.println(KafkaConstants.KAFKA_SERVERS + mapValue.get(KafkaConstants.CONSUMER_BOOTSTRAP_SERVERS_CONFIG));
            return null;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_CONNECTION);
            return KafkaUtils.createKafkaError("Cannot connect to the kafka server: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    public static Object pause(ObjectValue objectValue, BArray bArray) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        try {
            ((KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER)).pause(KafkaUtils.getTopicPartitionList(bArray, logger));
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_PAUSE);
            return KafkaUtils.createKafkaError("Failed to pause topic partitions for the consumer: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    public static Object resume(ObjectValue objectValue, BArray bArray) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        try {
            ((KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER)).resume(KafkaUtils.getTopicPartitionList(bArray, logger));
            return null;
        } catch (IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_RESUME);
            return KafkaUtils.createKafkaError("Failed to resume topic partitions for the consumer: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    private static void closeWithDuration(KafkaConsumer kafkaConsumer, long j) {
        kafkaConsumer.close(Duration.ofMillis(j));
    }
}
