package org.ballerinalang.kafka.nativeimpl.consumer.action;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BLangVMErrors;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.kafka.util.KafkaConstants;
import org.ballerinalang.kafka.util.KafkaUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BRefType;
import org.ballerinalang.model.values.BRefValueArray;
import org.ballerinalang.model.values.BStringArray;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.natives.annotations.ReturnType;
import org.ballerinalang.util.codegen.FunctionInfo;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

@BallerinaFunction(orgName = KafkaConstants.ORG_NAME, packageName = KafkaConstants.PACKAGE_NAME, functionName = "subscribeWithPartitionRebalance", receiver = @Receiver(type = TypeKind.OBJECT, structType = KafkaConstants.CONSUMER_STRUCT_NAME, structPackage = KafkaConstants.KAFKA_NATIVE_PACKAGE), args = {@Argument(name = KafkaConstants.ALIAS_TOPICS, type = TypeKind.ARRAY, elementType = TypeKind.STRING), @Argument(name = "onPartitionsRevoked", type = TypeKind.ANY), @Argument(name = "onPartitionsAssigned", type = TypeKind.ANY)}, returnType = {@ReturnType(type = TypeKind.RECORD)}, isPublic = true)
/* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/consumer/action/SubscribeWithPartitionRebalance.class */
public class SubscribeWithPartitionRebalance implements NativeCallableUnit {

    /* loaded from: input_file:org/ballerinalang/kafka/nativeimpl/consumer/action/SubscribeWithPartitionRebalance$KafkaRebalanceListener.class */
    class KafkaRebalanceListener implements ConsumerRebalanceListener {
        private Context context;
        private FunctionInfo onPartitionsRevoked;
        private FunctionInfo onPartitionsAssigned;
        private BMap<String, BValue> consumerStruct;

        KafkaRebalanceListener(Context context, FunctionInfo functionInfo, FunctionInfo functionInfo2, BMap<String, BValue> bMap) {
            this.context = context;
            this.onPartitionsRevoked = functionInfo;
            this.onPartitionsAssigned = functionInfo2;
            this.consumerStruct = bMap;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            BLangFunctions.invokeCallable(this.onPartitionsRevoked, new BValue[]{this.consumerStruct, getPartitionsArray(collection)});
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            BLangFunctions.invokeCallable(this.onPartitionsAssigned, new BValue[]{this.consumerStruct, getPartitionsArray(collection)});
        }

        private BRefValueArray getPartitionsArray(Collection<TopicPartition> collection) {
            return new BRefValueArray((BRefType[]) KafkaUtils.createPartitionList(this.context, collection).toArray(new BRefType[0]), KafkaUtils.createKafkaPackageStruct(this.context, KafkaConstants.TOPIC_PARTITION_STRUCT_NAME).getType());
        }
    }

    public void execute(Context context, CallableUnitCallback callableUnitCallback) {
        BMap refArgument = context.getRefArgument(0);
        BStringArray refArgument2 = context.getRefArgument(1);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < refArgument2.size(); i++) {
            arrayList.add(refArgument2.get(i));
        }
        FunctionInfo functionInfo = null;
        FunctionInfo functionInfo2 = null;
        BValue refArgument3 = context.getRefArgument(2);
        BValue refArgument4 = context.getRefArgument(3);
        if (Objects.nonNull(refArgument3) && (refArgument3 instanceof BFunctionPointer)) {
            functionInfo = context.getRefArgument(2).value();
        } else {
            context.setReturnValues(new BValue[]{BLangVMErrors.createError(context, "The onPartitionsRevoked function is not provided.")});
        }
        if (Objects.nonNull(refArgument4) && (refArgument4 instanceof BFunctionPointer)) {
            functionInfo2 = context.getRefArgument(3).value();
        } else {
            context.setReturnValues(new BValue[]{BLangVMErrors.createError(context, "The onPartitionsAssigned function is not provided.")});
        }
        KafkaRebalanceListener kafkaRebalanceListener = new KafkaRebalanceListener(context, functionInfo, functionInfo2, refArgument);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) refArgument.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        if (Objects.isNull(kafkaConsumer)) {
            throw new BallerinaException("Kafka Consumer has not been initialized properly.");
        }
        try {
            kafkaConsumer.subscribe(arrayList, kafkaRebalanceListener);
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            context.setReturnValues(new BValue[]{BLangVMErrors.createError(context, e.getMessage())});
        }
    }

    public boolean isBlocking() {
        return true;
    }
}
