package org.ballerinalang.kafka.nativeimpl.consumer;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.ballerinalang.kafka.api.KafkaListener;
import org.ballerinalang.kafka.util.KafkaConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/consumer/KafkaRecordConsumer.class */
public class KafkaRecordConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordConsumer.class);
    private KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private int pollingTimeout;
    private int pollingInterval;
    private boolean decoupleProcessing;
    private String groupId;
    private KafkaListener kafkaListener;
    private String serviceId;
    private int consumerId;
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture pollTaskFuture;

    public KafkaRecordConsumer(KafkaListener kafkaListener, Properties properties, String str, int i) {
        this.pollingTimeout = 1000;
        this.pollingInterval = 1000;
        this.decoupleProcessing = true;
        this.serviceId = str;
        this.consumerId = i;
        this.kafkaConsumer = new KafkaConsumer<>(properties);
        this.kafkaConsumer.subscribe((ArrayList) properties.get(KafkaConstants.ALIAS_TOPICS));
        this.kafkaListener = kafkaListener;
        if (properties.get(KafkaConstants.ALIAS_POLLING_TIMEOUT) != null) {
            this.pollingTimeout = ((Integer) properties.get(KafkaConstants.ALIAS_POLLING_TIMEOUT)).intValue();
        }
        if (properties.get(KafkaConstants.ALIAS_POLLING_INTERVAL) != null) {
            this.pollingInterval = ((Integer) properties.get(KafkaConstants.ALIAS_POLLING_INTERVAL)).intValue();
        }
        if (properties.get("enable.auto.commit") != null) {
            this.decoupleProcessing = ((Boolean) properties.get("enable.auto.commit")).booleanValue();
        }
        if (properties.get(KafkaConstants.ALIAS_DECOUPLE_PROCESSING) != null) {
            this.decoupleProcessing = ((Boolean) properties.get(KafkaConstants.ALIAS_DECOUPLE_PROCESSING)).booleanValue();
        }
        this.groupId = (String) properties.get("group.id");
    }

    private void poll() {
        try {
            ConsumerRecords poll = this.kafkaConsumer.poll(this.pollingTimeout);
            if (logger.isDebugEnabled()) {
                logger.debug("Kafka Consumer " + this.consumerId + " on service " + this.serviceId + " has retrieved " + poll.count() + " records.");
            }
            if (!poll.isEmpty()) {
                if (this.decoupleProcessing) {
                    this.kafkaListener.onRecordsReceived(poll, this.kafkaConsumer);
                } else {
                    Semaphore semaphore = new Semaphore(0);
                    this.kafkaListener.onRecordsReceived(poll, this.kafkaConsumer, new KafkaPollCycleFutureListener(semaphore, this.serviceId), this.groupId);
                    semaphore.acquire();
                }
            }
        } catch (KafkaException | IllegalArgumentException | IllegalStateException | InterruptedException e) {
            this.kafkaListener.onError(e);
            this.pollTaskFuture.cancel(false);
        }
    }

    public void consume() {
        this.pollTaskFuture = this.executorService.scheduleAtFixedRate(() -> {
            poll();
        }, 0L, this.pollingInterval, TimeUnit.MILLISECONDS);
    }

    public int getConsumerId() {
        return this.consumerId;
    }

    public void stopConsume() {
        this.kafkaConsumer.wakeup();
        this.kafkaConsumer.close();
        this.executorService.shutdown();
    }
}
