/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.impl;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.ballerinalang.messaging.kafka.api.KafkaListener;
import org.ballerinalang.messaging.kafka.impl.KafkaPollCycleFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordConsumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordConsumer.class);
    private KafkaConsumer kafkaConsumer;
    private Duration pollingTimeout = Duration.ofMillis(1000L);
    private int pollingInterval = 1000;
    private boolean decoupleProcessing = true;
    private String groupId;
    private KafkaListener kafkaListener;
    private String serviceId;
    private int consumerId;
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture pollTaskFuture;

    public KafkaRecordConsumer(KafkaListener kafkaListener, Properties configParams, String serviceId, int consumerId, KafkaConsumer kafkaConsumer) {
        this.serviceId = serviceId;
        this.consumerId = consumerId;
        this.kafkaConsumer = Objects.isNull(kafkaConsumer) ? new KafkaConsumer(configParams) : kafkaConsumer;
        ArrayList topics = (ArrayList)configParams.get("topics");
        this.kafkaConsumer.subscribe((Collection)topics);
        this.kafkaListener = kafkaListener;
        if (configParams.get("pollingTimeoutInMillis") != null) {
            this.pollingTimeout = Duration.ofMillis(((Integer)configParams.get("pollingTimeoutInMillis")).intValue());
        }
        if (configParams.get("pollingIntervalInMillis") != null) {
            this.pollingInterval = (Integer)configParams.get("pollingIntervalInMillis");
        }
        if (configParams.get("enable.auto.commit") != null) {
            this.decoupleProcessing = (Boolean)configParams.get("enable.auto.commit");
        }
        if (configParams.get("decoupleProcessing") != null) {
            this.decoupleProcessing = (Boolean)configParams.get("decoupleProcessing");
        }
        this.groupId = (String)configParams.get("group.id");
    }

    private void poll() {
        try {
            ConsumerRecords recordsRetrieved;
            block6: {
                recordsRetrieved = null;
                try {
                    if (!this.closed.get()) {
                        recordsRetrieved = this.kafkaConsumer.poll(this.pollingTimeout);
                    }
                }
                catch (WakeupException e) {
                    if (this.closed.get()) break block6;
                    throw e;
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Kafka service " + this.serviceId + " attached to consumer " + this.consumerId + " has received " + recordsRetrieved.count() + " records.");
            }
            this.processRetrievedRecords(recordsRetrieved);
        }
        catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            this.kafkaListener.onError(e);
            this.pollTaskFuture.cancel(false);
        }
    }

    private void processRetrievedRecords(ConsumerRecords consumerRecords) {
        if (Objects.nonNull(consumerRecords) && !consumerRecords.isEmpty()) {
            if (this.decoupleProcessing) {
                this.kafkaListener.onRecordsReceived(consumerRecords, this.kafkaConsumer, this.groupId);
            } else {
                Semaphore sem = new Semaphore(0);
                KafkaPollCycleFutureListener pollCycleListener = new KafkaPollCycleFutureListener(sem, this.serviceId);
                this.kafkaListener.onRecordsReceived(consumerRecords, this.kafkaConsumer, this.groupId, pollCycleListener);
                try {
                    sem.acquire();
                }
                catch (InterruptedException e) {
                    this.kafkaListener.onError(e);
                    this.pollTaskFuture.cancel(false);
                }
            }
        }
    }

    public void consume() {
        Runnable pollingFunction = () -> this.poll();
        this.pollTaskFuture = this.executorService.scheduleAtFixedRate(pollingFunction, 0L, this.pollingInterval, TimeUnit.MILLISECONDS);
    }

    public int getConsumerId() {
        return this.consumerId;
    }

    public void stopConsume() {
        this.closed.set(true);
        this.kafkaConsumer.wakeup();
        this.kafkaConsumer.close();
        this.executorService.shutdown();
    }
}

