/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.types.BPackage;
import org.ballerinalang.jvm.types.BTypes;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.jvm.values.api.BError;
import org.ballerinalang.jvm.values.api.BString;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.utils.AvroUtils;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.slf4j.Logger;

public class KafkaUtils {
    private KafkaUtils() {
    }

    public static Object[] getResourceParameters(ObjectValue service, ObjectValue listener, ConsumerRecords records, String groupId) {
        BArray consumerRecordsArray = BValueCreator.createArrayValue((BArrayType)new BArrayType(KafkaUtils.getConsumerRecord().getType()));
        String keyType = listener.getStringValue("keyDeserializerType");
        String valueType = listener.getStringValue("valueDeserializerType");
        if (service.getType().getAttachedFunctions()[0].getParameterType().length == 2) {
            for (Object record : records) {
                MapValue<String, Object> consumerRecord = KafkaUtils.populateConsumerRecord((ConsumerRecord)record, keyType, valueType);
                consumerRecordsArray.append(consumerRecord);
            }
            return new Object[]{listener, true, consumerRecordsArray, true, null, false, null, false};
        }
        BArray partitionOffsetsArray = BValueCreator.createArrayValue((BArrayType)new BArrayType(KafkaUtils.getPartitionOffsetRecord().getType()));
        for (Object record : records) {
            ConsumerRecord kafkaRecord = (ConsumerRecord)record;
            MapValue<String, Object> consumerRecord = KafkaUtils.populateConsumerRecord(kafkaRecord, keyType, valueType);
            MapValue<String, Object> topicPartition = KafkaUtils.populateTopicPartitionRecord(kafkaRecord.topic(), kafkaRecord.partition());
            MapValue<String, Object> partitionOffset = KafkaUtils.populatePartitionOffsetRecord(topicPartition, kafkaRecord.offset());
            consumerRecordsArray.append(consumerRecord);
            partitionOffsetsArray.append(partitionOffset);
        }
        return new Object[]{listener, true, consumerRecordsArray, true, partitionOffsetsArray, true, groupId, true};
    }

    public static Properties processKafkaConsumerConfig(MapValue<String, Object> configurations) {
        Properties properties = new Properties();
        KafkaUtils.addStringParamIfPresent("bootstrap.servers", configurations, properties, "bootstrapServers");
        KafkaUtils.addStringParamIfPresent("group.id", configurations, properties, "groupId");
        KafkaUtils.addStringParamIfPresent("auto.offset.reset", configurations, properties, "offsetReset");
        KafkaUtils.addStringParamIfPresent("partition.assignment.strategy", configurations, properties, "partitionAssignmentStrategy");
        KafkaUtils.addStringParamIfPresent("metrics.recording.level", configurations, properties, "metricsRecordingLevel");
        KafkaUtils.addStringParamIfPresent("metric.reporters", configurations, properties, "metricsReporterClasses");
        KafkaUtils.addStringParamIfPresent("client.id", configurations, properties, "clientId");
        KafkaUtils.addStringParamIfPresent("interceptor.classes", configurations, properties, "interceptorClasses");
        KafkaUtils.addStringParamIfPresent("isolation.level", configurations, properties, "isolationLevel");
        KafkaUtils.addDeserializerConfigs("key.deserializer", configurations, properties, "keyDeserializerType");
        KafkaUtils.addDeserializerConfigs("value.deserializer", configurations, properties, "valueDeserializerType");
        KafkaUtils.addCustomDeserializer("keyDeserializer", "keyDeserializerType", properties, configurations);
        KafkaUtils.addCustomDeserializer("valueDeserializer", "valueDeserializerType", properties, configurations);
        KafkaUtils.addStringParamIfPresent("schema.registry.url", configurations, properties, "schemaRegistryUrl");
        KafkaUtils.addStringArrayParamIfPresent("topics", configurations, properties, "topics");
        KafkaUtils.addIntParamIfPresent("session.timeout.ms", configurations, properties, "sessionTimeoutInMillis");
        KafkaUtils.addIntParamIfPresent("heartbeat.interval.ms", configurations, properties, "heartBeatIntervalInMillis");
        KafkaUtils.addIntParamIfPresent("metadata.max.age.ms", configurations, properties, "metadataMaxAgeInMillis");
        KafkaUtils.addIntParamIfPresent("auto.commit.interval.ms", configurations, properties, "autoCommitIntervalInMillis");
        KafkaUtils.addIntParamIfPresent("max.partition.fetch.bytes", configurations, properties, "maxPartitionFetchBytes");
        KafkaUtils.addIntParamIfPresent("send.buffer.bytes", configurations, properties, "sendBuffer");
        KafkaUtils.addIntParamIfPresent("receive.buffer.bytes", configurations, properties, "receiveBuffer");
        KafkaUtils.addIntParamIfPresent("fetch.min.bytes", configurations, properties, "fetchMinBytes");
        KafkaUtils.addIntParamIfPresent("fetch.max.bytes", configurations, properties, "fetchMaxBytes");
        KafkaUtils.addIntParamIfPresent("fetch.max.wait.ms", configurations, properties, "fetchMaxWaitTimeInMillis");
        KafkaUtils.addIntParamIfPresent("reconnect.backoff.ms", configurations, properties, "reconnectBackoffTimeInMillis");
        KafkaUtils.addIntParamIfPresent("retry.backoff.ms", configurations, properties, "retryBackoffInMillis");
        KafkaUtils.addIntParamIfPresent("metrics.sample.window.ms", configurations, properties, "metricsSampleWindowInMillis");
        KafkaUtils.addIntParamIfPresent("metrics.num.samples", configurations, properties, "metricsNumSamples");
        KafkaUtils.addIntParamIfPresent("request.timeout.ms", configurations, properties, "requestTimeoutInMillis");
        KafkaUtils.addIntParamIfPresent("connections.max.idle.ms", configurations, properties, "connectionMaxIdleTimeInMillis");
        KafkaUtils.addIntParamIfPresent("max.poll.records", configurations, properties, "maxPollRecords");
        KafkaUtils.addIntParamIfPresent("max.poll.interval.ms", configurations, properties, "maxPollInterval");
        KafkaUtils.addIntParamIfPresent("reconnect.backoff.max.ms", configurations, properties, "reconnectBackoffTimeMaxInMillis");
        KafkaUtils.addIntParamIfPresent("default.api.timeout.ms", configurations, properties, "defaultApiTimeoutInMillis");
        KafkaUtils.addIntParamIfPresent("pollingTimeoutInMillis", configurations, properties, "pollingTimeoutInMillis");
        KafkaUtils.addIntParamIfPresent("pollingIntervalInMillis", configurations, properties, "pollingIntervalInMillis");
        KafkaUtils.addIntParamIfPresent("concurrentConsumers", configurations, properties, "concurrentConsumers");
        KafkaUtils.addBooleanParamIfPresent("enable.auto.commit", configurations, properties, "autoCommit", true);
        KafkaUtils.addBooleanParamIfPresent("check.crcs", configurations, properties, "checkCRCS", true);
        KafkaUtils.addBooleanParamIfPresent("exclude.internal.topics", configurations, properties, "excludeInternalTopics", true);
        KafkaUtils.addBooleanParamIfPresent("decoupleProcessing", configurations, properties, "decoupleProcessing", false);
        if (Objects.nonNull(configurations.get((Object)"secureSocket"))) {
            KafkaUtils.processSslProperties(configurations, properties);
        }
        if ("AVRO".equals(configurations.get((Object)"valueDeserializer")) || "AVRO".equals(configurations.get((Object)"valueDeserializer"))) {
            properties.put("specific.avro.reader", (Object)false);
        }
        if (Objects.nonNull(configurations.get((Object)"authenticationConfiguration"))) {
            KafkaUtils.processSaslProperties(configurations, properties);
        }
        if (Objects.nonNull(configurations.getMapValue("properties"))) {
            KafkaUtils.processAdditionalProperties(configurations.getMapValue("properties"), properties);
        }
        return properties;
    }

    public static Properties processKafkaProducerConfig(MapValue<String, Object> configurations) {
        Properties properties = new Properties();
        KafkaUtils.addStringParamIfPresent("bootstrap.servers", configurations, properties, "bootstrapServers");
        KafkaUtils.addStringParamIfPresent("acks", configurations, properties, "acks");
        KafkaUtils.addStringParamIfPresent("compression.type", configurations, properties, "compressionType");
        KafkaUtils.addStringParamIfPresent("client.id", configurations, properties, "clientId");
        KafkaUtils.addStringParamIfPresent("metrics.recording.level", configurations, properties, "metricsRecordingLevel");
        KafkaUtils.addStringParamIfPresent("metric.reporters", configurations, properties, "metricReporterClasses");
        KafkaUtils.addStringParamIfPresent("partitioner.class", configurations, properties, "partitionerClass");
        KafkaUtils.addStringParamIfPresent("interceptor.classes", configurations, properties, "interceptorClasses");
        KafkaUtils.addStringParamIfPresent("transactional.id", configurations, properties, "transactionalId");
        KafkaUtils.addStringParamIfPresent("schema.registry.url", configurations, properties, "schemaRegistryUrl");
        KafkaUtils.addSerializerTypeConfigs("key.serializer", configurations, properties, "keySerializerType");
        KafkaUtils.addSerializerTypeConfigs("value.serializer", configurations, properties, "valueSerializerType");
        KafkaUtils.addCustomKeySerializer(properties, configurations);
        KafkaUtils.addCustomValueSerializer(properties, configurations);
        KafkaUtils.addIntParamIfPresent("buffer.memory", configurations, properties, "bufferMemory");
        KafkaUtils.addIntParamIfPresent("retries", configurations, properties, "retryCount");
        KafkaUtils.addIntParamIfPresent("batch.size", configurations, properties, "batchSize");
        KafkaUtils.addIntParamIfPresent("linger.ms", configurations, properties, "linger");
        KafkaUtils.addIntParamIfPresent("send.buffer.bytes", configurations, properties, "sendBuffer");
        KafkaUtils.addIntParamIfPresent("receive.buffer.bytes", configurations, properties, "receiveBuffer");
        KafkaUtils.addIntParamIfPresent("max.request.size", configurations, properties, "maxRequestSize");
        KafkaUtils.addIntParamIfPresent("reconnect.backoff.ms", configurations, properties, "reconnectBackoffTimeInMillis");
        KafkaUtils.addIntParamIfPresent("reconnect.backoff.max.ms", configurations, properties, "reconnectBackoffMaxTimeInMillis");
        KafkaUtils.addIntParamIfPresent("retry.backoff.ms", configurations, properties, "retryBackoffTimeInMillis");
        KafkaUtils.addIntParamIfPresent("max.block.ms", configurations, properties, "maxBlock");
        KafkaUtils.addIntParamIfPresent("request.timeout.ms", configurations, properties, "requestTimeoutInMillis");
        KafkaUtils.addIntParamIfPresent("metadata.max.age.ms", configurations, properties, "metadataMaxAgeInMillis");
        KafkaUtils.addIntParamIfPresent("metrics.sample.window.ms", configurations, properties, "metricsSampleWindowInMillis");
        KafkaUtils.addIntParamIfPresent("metrics.num.samples", configurations, properties, "metricsNumSamples");
        KafkaUtils.addIntParamIfPresent("max.in.flight.requests.per.connection", configurations, properties, "maxInFlightRequestsPerConnection");
        KafkaUtils.addIntParamIfPresent("connections.max.idle.ms", configurations, properties, "connectionsMaxIdleTimeInMillis");
        KafkaUtils.addIntParamIfPresent("transaction.timeout.ms", configurations, properties, "transactionTimeoutInMillis");
        KafkaUtils.addBooleanParamIfPresent("enable.idempotence", configurations, properties, "enableIdempotence");
        if (Objects.nonNull(configurations.get((Object)"secureSocket"))) {
            KafkaUtils.processSslProperties(configurations, properties);
        }
        if (Objects.nonNull(configurations.get((Object)"authenticationConfiguration"))) {
            KafkaUtils.processSaslProperties(configurations, properties);
        }
        if (Objects.nonNull(configurations.getMapValue("properties"))) {
            KafkaUtils.processAdditionalProperties(configurations.getMapValue("properties"), properties);
        }
        return properties;
    }

    private static void processSslProperties(MapValue<String, Object> configurations, Properties configParams) {
        MapValue secureSocket = (MapValue)configurations.get((Object)"secureSocket");
        KafkaUtils.addStringParamIfPresent("ssl.keystore.type", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"keyStore")), configParams, "keyStoreType");
        KafkaUtils.addStringParamIfPresent("ssl.keystore.location", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"keyStore")), configParams, "location");
        KafkaUtils.addStringParamIfPresent("ssl.keystore.password", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"keyStore")), configParams, "password");
        KafkaUtils.addStringParamIfPresent("ssl.keymanager.algorithm", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"keyStore")), configParams, "keyManagerAlgorithm");
        KafkaUtils.addStringParamIfPresent("ssl.truststore.type", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"trustStore")), configParams, "trustStoreType");
        KafkaUtils.addStringParamIfPresent("ssl.truststore.location", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"trustStore")), configParams, "location");
        KafkaUtils.addStringParamIfPresent("ssl.truststore.password", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"trustStore")), configParams, "password");
        KafkaUtils.addStringParamIfPresent("ssl.trustmanager.algorithm", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"trustStore")), configParams, "trustManagerAlgorithm");
        KafkaUtils.addStringParamIfPresent("security.protocol", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"protocol")), configParams, "securityProtocol");
        KafkaUtils.addStringParamIfPresent("ssl.protocol", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"protocol")), configParams, "sslProtocol");
        KafkaUtils.addStringParamIfPresent("ssl.enabled.protocols", (MapValue<String, Object>)((MapValue)secureSocket.get((Object)"protocol")), configParams, "sslProtocolVersions");
        KafkaUtils.addStringParamIfPresent("ssl.provider", configurations, configParams, "sslProvider");
        KafkaUtils.addStringParamIfPresent("ssl.key.password", configurations, configParams, "sslKeyPassword");
        KafkaUtils.addStringParamIfPresent("ssl.cipher.suites", configurations, configParams, "sslCipherSuites");
        KafkaUtils.addStringParamIfPresent("ssl.endpoint.identification.algorithm", configurations, configParams, "sslEndpointIdentificationAlgorithm");
        KafkaUtils.addStringParamIfPresent("ssl.secure.random.implementation", configurations, configParams, "sslSecureRandomImplementation");
    }

    private static void processSaslProperties(MapValue<String, Object> configurations, Properties properties) {
        MapValue authenticationConfig = configurations.getMapValue("authenticationConfiguration");
        String mechanism = authenticationConfig.getStringValue("mechanism");
        if ("PLAIN".equals(mechanism)) {
            String username = authenticationConfig.getStringValue("username");
            String password = authenticationConfig.getStringValue("password");
            String jaasConfigValue = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
            KafkaUtils.addStringParamIfPresent("sasl.mechanism", (MapValue<String, Object>)authenticationConfig, properties, "mechanism");
            KafkaUtils.addStringParamIfPresent("security.protocol", (MapValue<String, Object>)authenticationConfig, properties, "securityProtocol");
            properties.put("sasl.jaas.config", jaasConfigValue);
        }
    }

    private static void processAdditionalProperties(MapValue propertiesMap, Properties kafkaProperties) {
        for (Object keyValue : propertiesMap.getKeys()) {
            String key = (String)keyValue;
            kafkaProperties.setProperty(key, propertiesMap.getStringValue(key));
        }
    }

    private static void addSerializerTypeConfigs(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        if (Objects.nonNull(configs.get((Object)key))) {
            String value = KafkaUtils.getSerializerType(configs, key);
            configParams.put(paramName, value);
        }
    }

    private static void addDeserializerConfigs(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        if (Objects.nonNull(configs.get((Object)key))) {
            String value = KafkaUtils.getDeserializerValue(configs, key);
            configParams.put(paramName, value);
        }
    }

    private static void addCustomKeySerializer(Properties properties, MapValue<String, Object> configurations) {
        Object serializer = configurations.get((Object)"keySerializer");
        String serializerType = configurations.getStringValue("keySerializerType");
        if (Objects.nonNull(serializer) && "CUSTOM".equals(serializerType)) {
            properties.put("keySerializer", configurations.get((Object)"keySerializer"));
        }
    }

    private static void addCustomValueSerializer(Properties properties, MapValue<String, Object> configurations) {
        Object serializer = configurations.get((Object)"valueSerializer");
        String serializerType = configurations.getStringValue("valueSerializerType");
        if (Objects.nonNull(serializer) && "CUSTOM".equals(serializerType)) {
            properties.put("valueSerializer", configurations.get((Object)"valueSerializer"));
        }
    }

    private static void addCustomDeserializer(String configParam, String typeConfig, Properties properties, MapValue<String, Object> configurations) {
        Object deserializer = configurations.get((Object)configParam);
        String deserializerType = configurations.getStringValue(typeConfig);
        if (Objects.nonNull(deserializer) && "CUSTOM".equals(deserializerType)) {
            properties.put(configParam, configurations.get((Object)configParam));
            properties.put("ballerina.strand", BRuntime.getCurrentRuntime());
        }
    }

    private static String getSerializerType(MapValue<String, Object> configs, String key) {
        String value;
        switch (value = (String)configs.get((Object)key)) {
            case "BYTE_ARRAY": {
                return "org.apache.kafka.common.serialization.ByteArraySerializer";
            }
            case "STRING": {
                return "org.apache.kafka.common.serialization.StringSerializer";
            }
            case "INT": {
                return "org.apache.kafka.common.serialization.LongSerializer";
            }
            case "FLOAT": {
                return "org.apache.kafka.common.serialization.DoubleSerializer";
            }
            case "AVRO": {
                return "io.confluent.kafka.serializers.KafkaAvroSerializer";
            }
            case "CUSTOM": {
                return "org.ballerinalang.messaging.kafka.serdes.BallerinaKafkaSerializer";
            }
        }
        return value;
    }

    private static String getDeserializerValue(MapValue<String, Object> configs, String key) {
        String value;
        switch (value = (String)configs.get((Object)key)) {
            case "BYTE_ARRAY": {
                return "org.apache.kafka.common.serialization.ByteArrayDeserializer";
            }
            case "STRING": {
                return "org.apache.kafka.common.serialization.StringDeserializer";
            }
            case "INT": {
                return "org.apache.kafka.common.serialization.LongDeserializer";
            }
            case "FLOAT": {
                return "org.apache.kafka.common.serialization.DoubleDeserializer";
            }
            case "AVRO": {
                return "io.confluent.kafka.serializers.KafkaAvroDeserializer";
            }
            case "CUSTOM": {
                return "org.ballerinalang.messaging.kafka.serdes.BallerinaKafkaDeserializer";
            }
        }
        return value;
    }

    private static void addStringParamIfPresent(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        String value;
        if (Objects.nonNull(configs.get((Object)key)) && (value = (String)configs.get((Object)key)) != null && !value.equals("")) {
            configParams.setProperty(paramName, value);
        }
    }

    private static void addStringArrayParamIfPresent(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        BArray stringArray = (BArray)configs.get((Object)key);
        List<String> values = KafkaUtils.getStringListFromStringBArray(stringArray);
        configParams.put(paramName, values);
    }

    private static void addIntParamIfPresent(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        Long value = (Long)configs.get((Object)key);
        if (Objects.nonNull(value)) {
            configParams.put(paramName, (Object)value.intValue());
        }
    }

    private static void addBooleanParamIfPresent(String paramName, MapValue<String, Object> configs, Properties configParams, String key, boolean defaultValue) {
        boolean value = (Boolean)configs.get((Object)key);
        if (value != defaultValue) {
            configParams.put(paramName, (Object)value);
        }
    }

    private static void addBooleanParamIfPresent(String paramName, MapValue<String, Object> configs, Properties configParams, String key) {
        boolean value = (Boolean)configs.get((Object)key);
        configParams.put(paramName, (Object)value);
    }

    public static ArrayList<TopicPartition> getTopicPartitionList(BArray partitions, Logger logger) {
        ArrayList<TopicPartition> partitionList = new ArrayList<TopicPartition>();
        if (partitions != null) {
            for (int counter = 0; counter < partitions.size(); ++counter) {
                MapValue partition = (MapValue)partitions.get((long)counter);
                String topic = (String)partition.get((Object)"topic");
                int partitionValue = KafkaUtils.getIntFromLong((Long)partition.get((Object)"partition"), logger, "partition");
                partitionList.add(new TopicPartition(topic, partitionValue));
            }
        }
        return partitionList;
    }

    public static List<String> getStringListFromStringBArray(BArray stringArray) {
        ArrayList<String> values = new ArrayList<String>();
        if (Objects.isNull(stringArray) || !((BArrayType)stringArray.getType()).getElementType().equals((Object)BTypes.typeString)) {
            return values;
        }
        if (stringArray.size() != 0) {
            for (int i = 0; i < stringArray.size(); ++i) {
                values.add(stringArray.getString((long)i));
            }
        }
        return values;
    }

    public static MapValue<String, Object> populateTopicPartitionRecord(String topic, int partition) {
        return BallerinaValues.createRecord(KafkaUtils.getTopicPartitionRecord(), (Object[])new Object[]{topic, partition});
    }

    public static MapValue<String, Object> populatePartitionOffsetRecord(MapValue<String, Object> topicPartition, long offset) {
        return BallerinaValues.createRecord(KafkaUtils.getPartitionOffsetRecord(), (Object[])new Object[]{topicPartition, offset});
    }

    public static MapValue<String, Object> populateConsumerRecord(ConsumerRecord record, String keyType, String valueType) {
        Object key = null;
        if (Objects.nonNull(record.key())) {
            key = KafkaUtils.getBValues(record.key(), keyType);
        }
        Object value = KafkaUtils.getBValues(record.value(), valueType);
        return BallerinaValues.createRecord(KafkaUtils.getConsumerRecord(), (Object[])new Object[]{key, value, record.offset(), record.partition(), record.timestamp(), record.topic()});
    }

    private static Object getBValues(Object value, String type) {
        if ("BYTE_ARRAY".equals(type)) {
            if (value instanceof byte[]) {
                return BValueCreator.createArrayValue((byte[])((byte[])value));
            }
            throw KafkaUtils.createKafkaError("{ballerina/kafka}ConsumerError", "Invalid type - expected: byte[]");
        }
        if ("STRING".equals(type)) {
            if (value instanceof String) {
                return value;
            }
            throw KafkaUtils.createKafkaError("{ballerina/kafka}ConsumerError", "Invalid type - expected: string");
        }
        if ("INT".equals(type)) {
            if (value instanceof Long) {
                return value;
            }
            throw KafkaUtils.createKafkaError("{ballerina/kafka}ConsumerError", "Invalid type - expected: int");
        }
        if ("FLOAT".equals(type)) {
            if (value instanceof Double) {
                return value;
            }
            throw KafkaUtils.createKafkaError("{ballerina/kafka}ConsumerError", "Invalid type - expected: float");
        }
        if ("AVRO".equals(type)) {
            return AvroUtils.handleAvroConsumer(value);
        }
        if ("CUSTOM".equals(type)) {
            return value;
        }
        throw KafkaUtils.createKafkaError("Unexpected type found for consumer record", "{ballerina/kafka}ConsumerError");
    }

    public static MapValue<String, Object> getConsumerRecord() {
        return KafkaUtils.createKafkaRecord("ConsumerRecord");
    }

    public static MapValue<String, Object> getAvroGenericRecord() {
        return KafkaUtils.createKafkaRecord("AvroGenericRecord");
    }

    public static MapValue<String, Object> getPartitionOffsetRecord() {
        return KafkaUtils.createKafkaRecord("PartitionOffset");
    }

    public static MapValue<String, Object> getTopicPartitionRecord() {
        return KafkaUtils.createKafkaRecord("TopicPartition");
    }

    public static BError createKafkaError(String message, String reason) {
        MapValue<String, Object> detail = KafkaUtils.createKafkaDetailRecord(message);
        return BValueCreator.createErrorValue((BString)StringUtils.fromString((String)reason), detail);
    }

    public static BError createKafkaError(String message, String reason, BError cause) {
        MapValue<String, Object> detail = KafkaUtils.createKafkaDetailRecord(message, cause);
        return BValueCreator.createErrorValue((BString)StringUtils.fromString((String)reason), detail);
    }

    private static MapValue<String, Object> createKafkaDetailRecord(String message) {
        return KafkaUtils.createKafkaDetailRecord(message, null);
    }

    private static MapValue<String, Object> createKafkaDetailRecord(String message, BError cause) {
        MapValue<String, Object> detail = KafkaUtils.createKafkaRecord("Detail");
        return BallerinaValues.createRecord(detail, (Object[])new Object[]{message, cause});
    }

    public static MapValue<String, Object> createKafkaRecord(String recordName) {
        return BallerinaValues.createRecordValue((BPackage)KafkaConstants.KAFKA_PROTOCOL_PACKAGE_ID, (String)recordName);
    }

    public static BArray getPartitionOffsetArrayFromOffsetMap(Map<TopicPartition, Long> offsetMap) {
        BArray partitionOffsetArray = BValueCreator.createArrayValue((BArrayType)new BArrayType(KafkaUtils.getPartitionOffsetRecord().getType()));
        if (!offsetMap.entrySet().isEmpty()) {
            for (Map.Entry<TopicPartition, Long> entry : offsetMap.entrySet()) {
                TopicPartition tp = entry.getKey();
                Long offset = entry.getValue();
                MapValue<String, Object> topicPartition = KafkaUtils.populateTopicPartitionRecord(tp.topic(), tp.partition());
                MapValue<String, Object> partition = KafkaUtils.populatePartitionOffsetRecord(topicPartition, offset);
                partitionOffsetArray.append(partition);
            }
        }
        return partitionOffsetArray;
    }

    public static Map<TopicPartition, OffsetAndMetadata> getPartitionToMetadataMap(BArray offsets) {
        HashMap<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i = 0; i < offsets.size(); ++i) {
            MapValue offset = (MapValue)offsets.get((long)i);
            int offsetValue = offset.getIntValue("offset").intValue();
            TopicPartition topicPartition = KafkaUtils.createTopicPartitionFromPartitionOffset(offset);
            partitionToMetadataMap.put(topicPartition, new OffsetAndMetadata((long)offsetValue));
        }
        return partitionToMetadataMap;
    }

    public static TopicPartition createTopicPartitionFromPartitionOffset(MapValue offset) {
        MapValue partition = (MapValue)offset.get((Object)"partition");
        String topic = partition.getStringValue("topic");
        int partitionValue = partition.getIntValue("partition").intValue();
        return new TopicPartition(topic, partitionValue);
    }

    public static Integer getIntValue(Object value, String name, Logger logger) {
        Long longValue = KafkaUtils.getLongValue(value);
        if (Objects.isNull(longValue)) {
            return null;
        }
        return KafkaUtils.getIntFromLong(longValue, logger, name);
    }

    public static int getIntFromLong(long longValue, Logger logger, String name) {
        try {
            return Math.toIntExact(longValue);
        }
        catch (ArithmeticException e) {
            logger.warn("The value set for {} needs to be less than {}. The {} value is set to {}", new Object[]{name, Integer.MAX_VALUE, name, Integer.MAX_VALUE});
            return Integer.MAX_VALUE;
        }
    }

    public static Long getLongValue(Object value) {
        if (Objects.isNull(value)) {
            return null;
        }
        return (Long)value;
    }

    public static int getDefaultApiTimeout(Properties consumerProperties) {
        if (Objects.nonNull(consumerProperties.get("default.api.timeout.ms"))) {
            return (Integer)consumerProperties.get("default.api.timeout.ms");
        }
        return -1;
    }

    public static void createKafkaProducer(Properties producerProperties, ObjectValue producerObject) {
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
        producerObject.addNativeData("KafkaProducer", (Object)kafkaProducer);
        producerObject.addNativeData("KafkaProducerConfig", (Object)producerProperties);
        producerObject.addNativeData("bootstrap.servers", (Object)producerProperties.getProperty("bootstrap.servers"));
        producerObject.addNativeData("client.id", (Object)KafkaUtils.getClientIdFromProperties(producerProperties));
        KafkaMetricsUtil.reportNewProducer(producerObject);
    }

    public static String getBrokerNames(ObjectValue listener) {
        MapValue listenerConfigurations = listener.getMapValue("consumerConfig");
        return (String)listenerConfigurations.get((Object)"bootstrapServers");
    }

    public static String getTopicNamesString(List<String> topicsList) {
        return String.join((CharSequence)", ", topicsList);
    }

    public static String getClientIdFromProperties(Properties properties) {
        if (properties == null) {
            return "unknown";
        }
        String clientId = properties.getProperty("client.id");
        if (clientId == null) {
            return "unknown";
        }
        return clientId;
    }

    public static String getBootstrapServers(ObjectValue object) {
        if (object == null) {
            return "unknown";
        }
        String bootstrapServers = (String)object.getNativeData("bootstrap.servers");
        if (bootstrapServers == null) {
            return "unknown";
        }
        return bootstrapServers;
    }

    public static String getClientId(ObjectValue object) {
        if (object == null) {
            return "unknown";
        }
        String clientId = (String)object.getNativeData("client.id");
        if (clientId == null) {
            return "unknown";
        }
        return clientId;
    }
}

