package org.ballerinalang.messaging.kafka.impl;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.Executor;
import org.ballerinalang.messaging.kafka.api.KafkaListener;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaObserverContext;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/impl/KafkaListenerImpl.class */
public class KafkaListenerImpl implements KafkaListener {
    private static final Logger logger = LoggerFactory.getLogger(KafkaListenerImpl.class);
    private Scheduler scheduler;
    private ObjectValue service;
    private ObjectValue listener;
    private ResponseCallback callback = new ResponseCallback();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ballerinalang/messaging/kafka/impl/KafkaListenerImpl$ResponseCallback.class */
    public static class ResponseCallback implements CallableUnitCallback {
        private ResponseCallback() {
        }

        public void notifySuccess() {
        }

        public void notifyFailure(ErrorValue errorValue) {
        }
    }

    public KafkaListenerImpl(Strand strand, ObjectValue objectValue, ObjectValue objectValue2) {
        this.scheduler = strand.scheduler;
        this.listener = objectValue;
        this.service = objectValue2;
    }

    @Override // org.ballerinalang.messaging.kafka.api.KafkaListener
    public void onRecordsReceived(ConsumerRecords consumerRecords, KafkaConsumer kafkaConsumer, String str) {
        this.listener.addNativeData(KafkaConstants.NATIVE_CONSUMER, kafkaConsumer);
        executeResource(this.listener, consumerRecords, str);
        KafkaMetricsUtil.reportConsume(this.listener, consumerRecords);
    }

    @Override // org.ballerinalang.messaging.kafka.api.KafkaListener
    public void onRecordsReceived(ConsumerRecords consumerRecords, KafkaConsumer kafkaConsumer, String str, KafkaPollCycleFutureListener kafkaPollCycleFutureListener) {
        this.listener.addNativeData(KafkaConstants.NATIVE_CONSUMER, kafkaConsumer);
        executeResource(this.listener, kafkaPollCycleFutureListener, consumerRecords, str);
        KafkaMetricsUtil.reportConsume(this.listener, consumerRecords);
    }

    @Override // org.ballerinalang.messaging.kafka.api.KafkaListener
    public void onError(Throwable th) {
        logger.error("Kafka Ballerina server connector retrieved exception: " + th.getMessage(), th);
        KafkaMetricsUtil.reportConsumerError(this.listener, KafkaObservabilityConstants.ERROR_TYPE_MSG_RECEIVED);
    }

    private void executeResource(ObjectValue objectValue, ConsumerRecords consumerRecords, String str) {
        if (!ObserveUtils.isTracingEnabled()) {
            Executor.submit(this.scheduler, this.service, KafkaConstants.KAFKA_RESOURCE_ON_MESSAGE, this.callback, (Map) null, KafkaUtils.getResourceParameters(this.service, this.listener, consumerRecords, str));
        } else {
            Executor.submit(this.scheduler, this.service, KafkaConstants.KAFKA_RESOURCE_ON_MESSAGE, this.callback, getNewObserverContextInProperties(objectValue), KafkaUtils.getResourceParameters(this.service, this.listener, consumerRecords, str));
        }
    }

    private void executeResource(ObjectValue objectValue, KafkaPollCycleFutureListener kafkaPollCycleFutureListener, ConsumerRecords consumerRecords, String str) {
        if (!ObserveUtils.isTracingEnabled()) {
            Executor.submit(this.scheduler, this.service, KafkaConstants.KAFKA_RESOURCE_ON_MESSAGE, kafkaPollCycleFutureListener, (Map) null, KafkaUtils.getResourceParameters(this.service, this.listener, consumerRecords, str));
        } else {
            Executor.submit(this.scheduler, this.service, KafkaConstants.KAFKA_RESOURCE_ON_MESSAGE, kafkaPollCycleFutureListener, getNewObserverContextInProperties(objectValue), KafkaUtils.getResourceParameters(this.service, this.listener, consumerRecords, str));
        }
    }

    private Map<String, Object> getNewObserverContextInProperties(ObjectValue objectValue) {
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new KafkaObserverContext(KafkaObservabilityConstants.CONTEXT_CONSUMER, KafkaUtils.getClientId(objectValue), KafkaUtils.getBootstrapServers(objectValue)));
        return hashMap;
    }
}
