/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.messaging.kafka.impl;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.Executor;
import org.ballerinalang.messaging.kafka.api.KafkaListener;
import org.ballerinalang.messaging.kafka.impl.KafkaPollCycleFutureListener;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObserverContext;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaListenerImpl
implements KafkaListener {
    private static final Logger logger = LoggerFactory.getLogger(KafkaListenerImpl.class);
    private Scheduler scheduler;
    private ObjectValue service;
    private ObjectValue listener;
    private ResponseCallback callback;

    public KafkaListenerImpl(Strand strand, ObjectValue listener, ObjectValue service) {
        this.scheduler = strand.scheduler;
        this.listener = listener;
        this.service = service;
        this.callback = new ResponseCallback();
    }

    @Override
    public void onRecordsReceived(ConsumerRecords records, KafkaConsumer kafkaConsumer, String groupId) {
        this.listener.addNativeData("KafkaConsumer", (Object)kafkaConsumer);
        this.executeResource(this.listener, records, groupId);
        KafkaMetricsUtil.reportConsume(this.listener, records);
    }

    @Override
    public void onRecordsReceived(ConsumerRecords records, KafkaConsumer kafkaConsumer, String groupId, KafkaPollCycleFutureListener consumer) {
        this.listener.addNativeData("KafkaConsumer", (Object)kafkaConsumer);
        this.executeResource(this.listener, consumer, records, groupId);
        KafkaMetricsUtil.reportConsume(this.listener, records);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("Kafka Ballerina server connector retrieved exception: " + throwable.getMessage(), throwable);
        KafkaMetricsUtil.reportConsumerError(this.listener, "message_received");
    }

    private void executeResource(ObjectValue listener, ConsumerRecords records, String groupId) {
        if (ObserveUtils.isTracingEnabled()) {
            Map<String, Object> properties = this.getNewObserverContextInProperties(listener);
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)this.callback, properties, (Object[])KafkaUtils.getResourceParameters(this.service, this.listener, records, groupId));
        } else {
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)this.callback, null, (Object[])KafkaUtils.getResourceParameters(this.service, this.listener, records, groupId));
        }
    }

    private void executeResource(ObjectValue listener, KafkaPollCycleFutureListener consumer, ConsumerRecords records, String groupId) {
        if (ObserveUtils.isTracingEnabled()) {
            Map<String, Object> properties = this.getNewObserverContextInProperties(listener);
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)consumer, properties, (Object[])KafkaUtils.getResourceParameters(this.service, this.listener, records, groupId));
        } else {
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)consumer, null, (Object[])KafkaUtils.getResourceParameters(this.service, this.listener, records, groupId));
        }
    }

    private Map<String, Object> getNewObserverContextInProperties(ObjectValue listener) {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        KafkaObserverContext observerContext = new KafkaObserverContext("consumer", KafkaUtils.getClientId(listener), KafkaUtils.getBootstrapServers(listener));
        properties.put("__observer_context__", (Object)observerContext);
        return properties;
    }

    private static class ResponseCallback
    implements CallableUnitCallback {
        private ResponseCallback() {
        }

        public void notifySuccess() {
        }

        public void notifyFailure(ErrorValue error) {
        }
    }
}

