package org.ballerinalang.messaging.kafka.impl;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.ballerinalang.messaging.kafka.api.KafkaListener;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/impl/KafkaRecordConsumer.class */
public class KafkaRecordConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordConsumer.class);
    private KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private Duration pollingTimeout;
    private int pollingInterval;
    private boolean decoupleProcessing;
    private String groupId;
    private KafkaListener kafkaListener;
    private String serviceId;
    private int consumerId;
    private ScheduledFuture pollTaskFuture;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

    public KafkaRecordConsumer(KafkaListener kafkaListener, Properties properties, String str, int i, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.pollingTimeout = Duration.ofMillis(1000L);
        this.pollingInterval = 1000;
        this.decoupleProcessing = true;
        this.serviceId = str;
        this.consumerId = i;
        if (Objects.isNull(kafkaConsumer)) {
            this.kafkaConsumer = new KafkaConsumer<>(properties);
        } else {
            this.kafkaConsumer = kafkaConsumer;
        }
        this.kafkaConsumer.subscribe((ArrayList) properties.get("topics"));
        this.kafkaListener = kafkaListener;
        if (properties.get(KafkaConstants.ALIAS_POLLING_TIMEOUT) != null) {
            this.pollingTimeout = Duration.ofMillis(((Integer) properties.get(KafkaConstants.ALIAS_POLLING_TIMEOUT)).intValue());
        }
        if (properties.get(KafkaConstants.ALIAS_POLLING_INTERVAL) != null) {
            this.pollingInterval = ((Integer) properties.get(KafkaConstants.ALIAS_POLLING_INTERVAL)).intValue();
        }
        if (properties.get("enable.auto.commit") != null) {
            this.decoupleProcessing = ((Boolean) properties.get("enable.auto.commit")).booleanValue();
        }
        if (properties.get(KafkaConstants.ALIAS_DECOUPLE_PROCESSING) != null) {
            this.decoupleProcessing = ((Boolean) properties.get(KafkaConstants.ALIAS_DECOUPLE_PROCESSING)).booleanValue();
        }
        this.groupId = (String) properties.get("group.id");
    }

    private void poll() {
        ConsumerRecords<byte[], byte[]> consumerRecords = null;
        try {
            try {
                if (!this.closed.get()) {
                    consumerRecords = this.kafkaConsumer.poll(this.pollingTimeout);
                }
            } catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Kafka service " + this.serviceId + " attached to consumer " + this.consumerId + " has received " + consumerRecords.count() + " records.");
            }
            processRetrievedRecords(consumerRecords);
        } catch (KafkaException | IllegalArgumentException | IllegalStateException e2) {
            this.kafkaListener.onError(e2);
            this.pollTaskFuture.cancel(false);
        }
    }

    private void processRetrievedRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        if (!Objects.nonNull(consumerRecords) || consumerRecords.isEmpty()) {
            return;
        }
        if (this.decoupleProcessing) {
            this.kafkaListener.onRecordsReceived(consumerRecords, this.kafkaConsumer, this.groupId);
            return;
        }
        Semaphore semaphore = new Semaphore(0);
        this.kafkaListener.onRecordsReceived(consumerRecords, this.kafkaConsumer, this.groupId, new KafkaPollCycleFutureListener(semaphore, this.serviceId));
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            this.kafkaListener.onError(e);
            this.pollTaskFuture.cancel(false);
        }
    }

    public void consume() {
        this.pollTaskFuture = this.executorService.scheduleAtFixedRate(() -> {
            poll();
        }, 0L, this.pollingInterval, TimeUnit.MILLISECONDS);
    }

    public int getConsumerId() {
        return this.consumerId;
    }

    public void stopConsume() {
        this.closed.set(true);
        this.kafkaConsumer.wakeup();
        this.kafkaConsumer.close();
        this.executorService.shutdown();
    }
}
