package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.time.Duration;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

@BallerinaFunction(orgName = KafkaConstants.ORG_NAME, packageName = KafkaConstants.KAFKA_PACKAGE_NAME, functionName = "poll", receiver = @Receiver(type = TypeKind.OBJECT, structType = KafkaConstants.CONSUMER_STRUCT_NAME, structPackage = KafkaConstants.KAFKA_PROTOCOL_PACKAGE), isPublic = true)
/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/consumer/Poll.class */
public class Poll {
    public static Object poll(Strand strand, ObjectValue objectValue, long j) {
        NonBlockingCallback nonBlockingCallback = new NonBlockingCallback(strand);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        Duration ofMillis = Duration.ofMillis(j);
        ArrayValue arrayValue = new ArrayValue(new BArrayType(KafkaUtils.getConsumerRecord().getType()));
        try {
            int i = 0;
            Iterator it = kafkaConsumer.poll(ofMillis).iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                arrayValue.add(i2, KafkaUtils.populateConsumerRecord((ConsumerRecord) it.next()));
            }
            nonBlockingCallback.setReturnValues(arrayValue);
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            nonBlockingCallback.setReturnValues(KafkaUtils.createKafkaError("Failed to poll from the Kafka server: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR));
        }
        nonBlockingCallback.notifySuccess();
        return null;
    }
}
