package org.ballerinalang.kafka.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.bre.Context;
import org.ballerinalang.connector.api.BLangConnectorSPIUtil;
import org.ballerinalang.connector.api.BallerinaConnectorException;
import org.ballerinalang.connector.api.ParamDetail;
import org.ballerinalang.connector.api.Resource;
import org.ballerinalang.connector.api.Service;
import org.ballerinalang.model.types.BArrayType;
import org.ballerinalang.model.types.BObjectType;
import org.ballerinalang.model.types.BRecordType;
import org.ballerinalang.model.values.BByteArray;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BRefType;
import org.ballerinalang.model.values.BRefValueArray;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BStringArray;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.util.codegen.ProgramFile;
import org.ballerinalang.util.exceptions.BallerinaException;

/* loaded from: input_file:org/ballerinalang/kafka/util/KafkaUtils.class */
public class KafkaUtils {
    public static Resource extractKafkaResource(Service service) throws BallerinaConnectorException {
        Resource[] resources = service.getResources();
        if (resources.length == 0) {
            throw new BallerinaException("No resources found to handle the Kafka records in " + service.getName());
        }
        if (resources.length > 1) {
            throw new BallerinaException("More than one resources found in Kafka service " + service.getName() + ". Kafka Service should only have one resource");
        }
        List paramDetails = resources[0].getParamDetails();
        if (paramDetails.size() == 0 || paramDetails.size() == 1) {
            throw new BallerinaException("Kafka resource signature does not comply with param standard sequence.");
        }
        validateConsumerParam((ParamDetail) paramDetails.get(0));
        validateRecordsParam((ParamDetail) paramDetails.get(1));
        if (paramDetails.size() > 2) {
            validateOffsetsParam((ParamDetail) paramDetails.get(2));
            if (paramDetails.size() > 3) {
                validateGroupIDParam((ParamDetail) paramDetails.get(3));
            }
        }
        return resources[0];
    }

    private static void validateConsumerParam(ParamDetail paramDetail) {
        if (paramDetail.getVarType().getTag() == 32) {
            BObjectType varType = paramDetail.getVarType();
            if (varType.getPackagePath().equals(KafkaConstants.KAFKA_NATIVE_PACKAGE) && varType.getName().equals(KafkaConstants.CONSUMER_STRUCT_NAME)) {
                return;
            }
        }
        throw new BallerinaException("Resource signature validation failed for param at index: 0.");
    }

    private static void validateRecordsParam(ParamDetail paramDetail) {
        if (paramDetail.getVarType().getTag() == 16) {
            BArrayType varType = paramDetail.getVarType();
            if (varType.getElementType().getTag() == 33) {
                BRecordType elementType = varType.getElementType();
                if (elementType.getPackagePath().equals(KafkaConstants.KAFKA_NATIVE_PACKAGE) && elementType.getName().equals(KafkaConstants.CONSUMER_RECORD_STRUCT_NAME)) {
                    return;
                }
            }
        }
        throw new BallerinaException("Resource signature validation failed for param at index: 1.");
    }

    private static void validateOffsetsParam(ParamDetail paramDetail) {
        if (paramDetail.getVarType().getTag() == 16) {
            BArrayType varType = paramDetail.getVarType();
            if (varType.getElementType().getTag() == 33) {
                BRecordType elementType = varType.getElementType();
                if (elementType.getPackagePath().equals(KafkaConstants.KAFKA_NATIVE_PACKAGE) && elementType.getName().equals(KafkaConstants.OFFSET_STRUCT_NAME)) {
                    return;
                }
            }
        }
        throw new BallerinaException("Resource signature validation failed for param at index: 2.");
    }

    private static void validateGroupIDParam(ParamDetail paramDetail) {
        if (paramDetail.getVarType().getTag() != 4) {
            throw new BallerinaException("Resource signature validation failed for param at index: 3.");
        }
    }

    public static BValue[] getSignatureParameters(Resource resource, ConsumerRecords<byte[], byte[]> consumerRecords, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        List paramDetails = resource.getParamDetails();
        BValue[] bValueArr = new BValue[paramDetails.size()];
        if (paramDetails.size() > 0) {
            bValueArr[0] = createConsumerStruct(resource, kafkaConsumer);
            if (paramDetails.size() > 1) {
                bValueArr[1] = createRecordStructArray(resource, consumerRecords);
            }
        }
        return bValueArr;
    }

    private static BRefValueArray createRecordStructArray(Resource resource, ConsumerRecords<byte[], byte[]> consumerRecords) {
        ArrayList arrayList = new ArrayList();
        ProgramFile programFile = resource.getResourceInfo().getServiceInfo().getPackageInfo().getProgramFile();
        consumerRecords.forEach(consumerRecord -> {
            BMap createBStruct = BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.CONSUMER_RECORD_STRUCT_NAME, new Object[0]);
            if (consumerRecord.key() != null) {
                createBStruct.put("key", new BByteArray((byte[]) consumerRecord.key()));
            }
            createBStruct.put("value", new BByteArray((byte[]) consumerRecord.value()));
            createBStruct.put("offset", new BInteger(consumerRecord.offset()));
            createBStruct.put("partition", new BInteger(consumerRecord.partition()));
            createBStruct.put("timestamp", new BInteger(consumerRecord.timestamp()));
            createBStruct.put("topic", new BString(consumerRecord.topic()));
            arrayList.add(createBStruct);
        });
        return new BRefValueArray((BRefType[]) arrayList.toArray(new BRefType[0]), BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.CONSUMER_RECORD_STRUCT_NAME, new Object[0]).getType());
    }

    private static BMap<String, BValue> createConsumerStruct(Resource resource, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        BMap<String, BValue> createBStruct = BLangConnectorSPIUtil.createBStruct(resource.getResourceInfo().getServiceInfo().getPackageInfo().getProgramFile(), KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.CONSUMER_STRUCT_NAME, new Object[0]);
        createBStruct.addNativeData(KafkaConstants.NATIVE_CONSUMER, kafkaConsumer);
        return createBStruct;
    }

    private static BRefValueArray createOffsetStructArray(Resource resource, ConsumerRecords<byte[], byte[]> consumerRecords) {
        HashMap hashMap = new HashMap();
        consumerRecords.forEach(consumerRecord -> {
            hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
        });
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            hashMap2.put(entry.getKey(), new OffsetAndMetadata(((Long) entry.getValue()).longValue() + 1));
        }
        ProgramFile programFile = resource.getResourceInfo().getServiceInfo().getPackageInfo().getProgramFile();
        ArrayList arrayList = new ArrayList();
        hashMap2.entrySet().forEach(entry2 -> {
            BMap createBStruct = BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.OFFSET_STRUCT_NAME, new Object[0]);
            BMap createBStruct2 = BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.TOPIC_PARTITION_STRUCT_NAME, new Object[0]);
            createBStruct2.put("topic", new BString(((TopicPartition) entry2.getKey()).topic()));
            createBStruct2.put("partition", new BInteger(((TopicPartition) entry2.getKey()).partition()));
            createBStruct.put("partition", createBStruct2);
            createBStruct.put("offset", new BInteger(((OffsetAndMetadata) entry2.getValue()).offset()));
            arrayList.add(createBStruct);
        });
        return new BRefValueArray((BRefType[]) arrayList.toArray(new BRefType[0]), BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.OFFSET_STRUCT_NAME, new Object[0]).getType());
    }

    private static BMap<String, BValue> createConsumerStruct(Resource resource, KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
        ProgramFile programFile = resource.getResourceInfo().getServiceInfo().getPackageInfo().getProgramFile();
        BMap<String, BValue> createBStruct = BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.CONSUMER_STRUCT_NAME, new Object[0]);
        createBStruct.addNativeData(KafkaConstants.NATIVE_CONSUMER, kafkaConsumer);
        BMap createBStruct2 = BLangConnectorSPIUtil.createBStruct(programFile, KafkaConstants.KAFKA_NATIVE_PACKAGE, KafkaConstants.CONSUMER_CONFIG_STRUCT_NAME, new Object[0]);
        createBStruct2.put(KafkaConstants.CONSUMER_GROUP_ID_CONFIG, new BString(str));
        createBStruct.put("config", createBStruct2);
        return createBStruct;
    }

    public static BValue[] getSignatureParameters(Resource resource, ConsumerRecords<byte[], byte[]> consumerRecords, KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
        List paramDetails = resource.getParamDetails();
        BValue[] bValueArr = new BValue[paramDetails.size()];
        if (paramDetails.size() > 0) {
            bValueArr[0] = createConsumerStruct(resource, kafkaConsumer, str);
            if (paramDetails.size() > 1) {
                bValueArr[1] = createRecordStructArray(resource, consumerRecords);
                if (paramDetails.size() > 2) {
                    bValueArr[2] = createOffsetStructArray(resource, consumerRecords);
                    if (paramDetails.size() > 3) {
                        if (str == null) {
                            bValueArr[3] = null;
                        } else {
                            bValueArr[3] = new BString(str);
                        }
                    }
                }
            }
        }
        return bValueArr;
    }

    public static Properties processKafkaConsumerConfig(BMap<String, BValue> bMap) {
        Properties properties = new Properties();
        addStringParamIfPresent("bootstrap.servers", bMap, properties, "bootstrapServers");
        addStringParamIfPresent("group.id", bMap, properties, KafkaConstants.CONSUMER_GROUP_ID_CONFIG);
        addStringParamIfPresent("auto.offset.reset", bMap, properties, KafkaConstants.CONSUMER_AUTO_OFFSET_RESET_CONFIG);
        addStringParamIfPresent("partition.assignment.strategy", bMap, properties, KafkaConstants.CONSUMER_PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
        addStringParamIfPresent("metrics.recording.level", bMap, properties, "metricsRecordingLevel");
        addStringParamIfPresent("metric.reporters", bMap, properties, KafkaConstants.CONSUMER_METRIC_REPORTER_CLASSES_CONFIG);
        addStringParamIfPresent("client.id", bMap, properties, KafkaConstants.CONSUMER_CLIENT_ID_CONFIG);
        addStringParamIfPresent("interceptor.classes", bMap, properties, "interceptorClasses");
        addStringParamIfPresent("isolation.level", bMap, properties, KafkaConstants.CONSUMER_ISOLATION_LEVEL_CONFIG);
        addStringArrayParamIfPresent(KafkaConstants.ALIAS_TOPICS, bMap, properties, KafkaConstants.ALIAS_TOPICS);
        addStringArrayParamIfPresent(KafkaConstants.PROPERTIES_ARRAY, bMap, properties, KafkaConstants.PROPERTIES_ARRAY);
        addIntParamIfPresent("session.timeout.ms", bMap, properties, KafkaConstants.CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
        addIntParamIfPresent("heartbeat.interval.ms", bMap, properties, KafkaConstants.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
        addIntParamIfPresent("metadata.max.age.ms", bMap, properties, "metadataMaxAge");
        addIntParamIfPresent("auto.commit.interval.ms", bMap, properties, KafkaConstants.CONSUMER_AUTO_COMMIT_INTERVAL_MS_CONFIG);
        addIntParamIfPresent("max.partition.fetch.bytes", bMap, properties, KafkaConstants.CONSUMER_MAX_PARTITION_FETCH_BYTES_CONFIG);
        addIntParamIfPresent("send.buffer.bytes", bMap, properties, "sendBuffer");
        addIntParamIfPresent("receive.buffer.bytes", bMap, properties, "receiveBuffer");
        addIntParamIfPresent("fetch.min.bytes", bMap, properties, KafkaConstants.CONSUMER_FETCH_MIN_BYTES_CONFIG);
        addIntParamIfPresent("fetch.max.bytes", bMap, properties, KafkaConstants.CONSUMER_FETCH_MAX_BYTES_CONFIG);
        addIntParamIfPresent("fetch.max.wait.ms", bMap, properties, KafkaConstants.CONSUMER_FETCH_MAX_WAIT_MS_CONFIG);
        addIntParamIfPresent("reconnect.backoff.ms", bMap, properties, "reconnectBackoff");
        addIntParamIfPresent("retry.backoff.ms", bMap, properties, "retryBackoff");
        addIntParamIfPresent("metrics.sample.window.ms", bMap, properties, "metricsSampleWindow");
        addIntParamIfPresent("metrics.num.samples", bMap, properties, "metricsNumSamples");
        addIntParamIfPresent("request.timeout.ms", bMap, properties, "requestTimeout");
        addIntParamIfPresent("connections.max.idle.ms", bMap, properties, KafkaConstants.CONSUMER_CONNECTIONS_MAX_IDLE_MS_CONFIG);
        addIntParamIfPresent("max.poll.records", bMap, properties, KafkaConstants.CONSUMER_MAX_POLL_RECORDS_CONFIG);
        addIntParamIfPresent("max.poll.interval.ms", bMap, properties, KafkaConstants.CONSUMER_MAX_POLL_INTERVAL_MS_CONFIG);
        addIntParamIfPresent("reconnect.backoff.max.ms", bMap, properties, "reconnectBackoffMax");
        addIntParamIfPresent(KafkaConstants.ALIAS_POLLING_TIMEOUT, bMap, properties, KafkaConstants.ALIAS_POLLING_TIMEOUT);
        addIntParamIfPresent(KafkaConstants.ALIAS_POLLING_INTERVAL, bMap, properties, KafkaConstants.ALIAS_POLLING_INTERVAL);
        addIntParamIfPresent(KafkaConstants.ALIAS_CONCURRENT_CONSUMERS, bMap, properties, KafkaConstants.ALIAS_CONCURRENT_CONSUMERS);
        addBooleanParamIfPresent("enable.auto.commit", bMap, properties, KafkaConstants.CONSUMER_ENABLE_AUTO_COMMIT_CONFIG, true);
        addBooleanParamIfPresent("check.crcs", bMap, properties, KafkaConstants.CONSUMER_CHECK_CRCS_CONFIG, true);
        addBooleanParamIfPresent("exclude.internal.topics", bMap, properties, KafkaConstants.CONSUMER_EXCLUDE_INTERNAL_TOPICS_CONFIG, true);
        addBooleanParamIfPresent(KafkaConstants.ALIAS_DECOUPLE_PROCESSING, bMap, properties, KafkaConstants.ALIAS_DECOUPLE_PROCESSING, false);
        processDefaultConsumerProperties(properties);
        return properties;
    }

    public static Properties processKafkaProducerConfig(BMap<String, BValue> bMap) {
        Properties properties = new Properties();
        if (bMap == null) {
            processDefaultProducerProperties(properties);
            return properties;
        }
        addStringParamIfPresent("bootstrap.servers", bMap, properties, "bootstrapServers");
        addStringParamIfPresent(KafkaConstants.PRODUCER_ACKS_CONFIG, bMap, properties, KafkaConstants.PRODUCER_ACKS_CONFIG);
        addStringParamIfPresent("compression.type", bMap, properties, KafkaConstants.PRODUCER_COMPRESSION_TYPE_CONFIG);
        addStringParamIfPresent("client.id", bMap, properties, KafkaConstants.PRODUCER_CLIENT_ID_CONFIG);
        addStringParamIfPresent("metrics.recording.level", bMap, properties, "metricsRecordingLevel");
        addStringParamIfPresent("metric.reporters", bMap, properties, KafkaConstants.PRODUCER_METRIC_REPORTER_CLASSES_CONFIG);
        addStringParamIfPresent("partitioner.class", bMap, properties, KafkaConstants.PRODUCER_PARTITIONER_CLASS_CONFIG);
        addStringParamIfPresent("interceptor.classes", bMap, properties, "interceptorClasses");
        addStringParamIfPresent("transactional.id", bMap, properties, KafkaConstants.PRODUCER_TRANSACTIONAL_ID_CONFIG);
        addIntParamIfPresent("buffer.memory", bMap, properties, KafkaConstants.PRODUCER_BUFFER_MEMORY_CONFIG);
        addIntParamIfPresent("retries", bMap, properties, KafkaConstants.PRODUCER_RETRIES_CONFIG);
        addIntParamIfPresent("batch.size", bMap, properties, KafkaConstants.PRODUCER_BATCH_SIZE_CONFIG);
        addIntParamIfPresent("linger.ms", bMap, properties, KafkaConstants.PRODUCER_LINGER_MS_CONFIG);
        addIntParamIfPresent("send.buffer.bytes", bMap, properties, "sendBuffer");
        addIntParamIfPresent("receive.buffer.bytes", bMap, properties, "receiveBuffer");
        addIntParamIfPresent("max.request.size", bMap, properties, KafkaConstants.PRODUCER_MAX_REQUEST_SIZE_CONFIG);
        addIntParamIfPresent("reconnect.backoff.ms", bMap, properties, "reconnectBackoff");
        addIntParamIfPresent("reconnect.backoff.max.ms", bMap, properties, "reconnectBackoffMax");
        addIntParamIfPresent("retry.backoff.ms", bMap, properties, "retryBackoff");
        addIntParamIfPresent("max.block.ms", bMap, properties, KafkaConstants.PRODUCER_MAX_BLOCK_MS_CONFIG);
        addIntParamIfPresent("request.timeout.ms", bMap, properties, "requestTimeout");
        addIntParamIfPresent("metadata.max.age.ms", bMap, properties, "metadataMaxAge");
        addIntParamIfPresent("metrics.sample.window.ms", bMap, properties, "metricsSampleWindow");
        addIntParamIfPresent("metrics.num.samples", bMap, properties, "metricsNumSamples");
        addIntParamIfPresent("max.in.flight.requests.per.connection", bMap, properties, KafkaConstants.PRODUCER_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
        addIntParamIfPresent("connections.max.idle.ms", bMap, properties, KafkaConstants.PRODUCER_CONNECTIONS_MAX_IDLE_MS_CONFIG);
        addIntParamIfPresent("transaction.timeout.ms", bMap, properties, KafkaConstants.PRODUCER_TRANSACTION_TIMEOUT_CONFIG);
        addBooleanParamIfPresent("enable.idempotence", bMap, properties, KafkaConstants.PRODUCER_ENABLE_IDEMPOTENCE_CONFIG, false);
        processDefaultProducerProperties(properties);
        return properties;
    }

    public static void processDefaultConsumerProperties(Properties properties) {
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    }

    public static void processDefaultProducerProperties(Properties properties) {
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    }

    private static void addStringParamIfPresent(String str, BMap<String, BValue> bMap, Properties properties, String str2) {
        String value;
        if (!Objects.nonNull(bMap.get(str2)) || (value = bMap.get(str2).value()) == null || value.equals("")) {
            return;
        }
        properties.put(str, value);
    }

    private static void addStringArrayParamIfPresent(String str, BMap<String, BValue> bMap, Properties properties, String str2) {
        BStringArray bStringArray = bMap.get(str2);
        ArrayList arrayList = new ArrayList();
        if (bStringArray == null || bStringArray.size() == 0) {
            return;
        }
        for (int i = 0; i < bStringArray.size(); i++) {
            arrayList.add(bStringArray.get(i));
        }
        properties.put(str, arrayList);
    }

    private static void addIntParamIfPresent(String str, BMap<String, BValue> bMap, Properties properties, String str2) {
        long intValue = bMap.get(str2).intValue();
        if (intValue != -1) {
            properties.put(str, Integer.valueOf(new Long(intValue).intValue()));
        }
    }

    private static void addBooleanParamIfPresent(String str, BMap<String, BValue> bMap, Properties properties, String str2, boolean z) {
        boolean booleanValue = bMap.get(str2).value().booleanValue();
        if (booleanValue != z) {
            properties.put(str, Boolean.valueOf(booleanValue));
        }
    }

    public static BMap<String, BValue> createKafkaPackageStruct(Context context, String str) {
        return BLangConnectorSPIUtil.createBStruct(context.getProgramFile(), KafkaConstants.KAFKA_NATIVE_PACKAGE, str, new Object[0]);
    }

    public static ArrayList<TopicPartition> getTopicPartitionList(BRefValueArray bRefValueArray) {
        ArrayList<TopicPartition> arrayList = new ArrayList<>();
        if (bRefValueArray != null) {
            for (int i = 0; i < bRefValueArray.size(); i++) {
                BMap bMap = bRefValueArray.get(i);
                arrayList.add(new TopicPartition(bMap.get("topic").stringValue(), bMap.get("partition").value().intValue()));
            }
        }
        return arrayList;
    }
}
