package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.time.Duration;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/consumer/Poll.class */
public class Poll {
    public static Object poll(ObjectValue objectValue, long j) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue);
        NonBlockingCallback nonBlockingCallback = new NonBlockingCallback(strand);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        String stringValue = objectValue.getStringValue(KafkaConstants.CONSUMER_KEY_DESERIALIZER_TYPE_CONFIG);
        String stringValue2 = objectValue.getStringValue(KafkaConstants.CONSUMER_VALUE_DESERIALIZER_TYPE_CONFIG);
        Duration ofMillis = Duration.ofMillis(j);
        BArray createArrayValue = BValueCreator.createArrayValue(new BArrayType(KafkaUtils.getConsumerRecord().getType()));
        try {
            ConsumerRecords poll = kafkaConsumer.poll(ofMillis);
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    MapValue<String, Object> populateConsumerRecord = KafkaUtils.populateConsumerRecord((ConsumerRecord) it.next(), stringValue, stringValue2);
                    createArrayValue.append(populateConsumerRecord);
                    KafkaMetricsUtil.reportConsume(objectValue, populateConsumerRecord.getStringValue(KafkaConstants.ALIAS_TOPIC), populateConsumerRecord.get(KafkaConstants.ALIAS_VALUE));
                }
            }
            nonBlockingCallback.setReturnValues(createArrayValue);
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_POLL);
            nonBlockingCallback.notifyFailure(KafkaUtils.createKafkaError("Failed to poll from the Kafka server: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR));
        }
        nonBlockingCallback.notifySuccess();
        return null;
    }
}
