package org.ballerinalang.kafka.nativeimpl.consumer.endpoint;

import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BLangVMErrors;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.connector.api.BLangConnectorSPIUtil;
import org.ballerinalang.connector.api.Service;
import org.ballerinalang.kafka.exception.KafkaConnectorException;
import org.ballerinalang.kafka.impl.KafkaListenerImpl;
import org.ballerinalang.kafka.impl.KafkaServerConnectorImpl;
import org.ballerinalang.kafka.util.KafkaConstants;
import org.ballerinalang.kafka.util.KafkaUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

@BallerinaFunction(orgName = KafkaConstants.ORG_NAME, packageName = KafkaConstants.PACKAGE_NAME, functionName = "registerListener", receiver = @Receiver(type = TypeKind.OBJECT, structType = KafkaConstants.CONSUMER_STRUCT_NAME, structPackage = KafkaConstants.KAFKA_NATIVE_PACKAGE))
/* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/consumer/endpoint/RegisterListener.class */
public class RegisterListener implements NativeCallableUnit {
    public void execute(Context context, CallableUnitCallback callableUnitCallback) {
        Service serviceRegistered = BLangConnectorSPIUtil.getServiceRegistered(context);
        try {
            BLangConnectorSPIUtil.getConnectorEndpointStruct(context).addNativeData(KafkaConstants.CONSUMER_SERVER_CONNECTOR_NAME, new KafkaServerConnectorImpl(serviceRegistered.getName(), KafkaUtils.processKafkaConsumerConfig(context.getRefArgument(0).get("consumerConfig")), new KafkaListenerImpl(KafkaUtils.extractKafkaResource(serviceRegistered))));
        } catch (KafkaConnectorException e) {
            context.setReturnValues(new BValue[]{BLangVMErrors.createError(context, "Unable to initialize server connector: " + e.getMessage())});
            return;
        } catch (Throwable th) {
            context.setReturnValues(new BValue[]{BLangVMErrors.createError(context, "Cannot register: " + th.getMessage())});
        }
        context.setReturnValues(new BValue[0]);
    }

    public boolean isBlocking() {
        return true;
    }
}
