package org.ballerinalang.messaging.kafka.nativeimpl.consumer;

import java.io.PrintStream;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BArray;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.messaging.kafka.observability.KafkaMetricsUtil;
import org.ballerinalang.messaging.kafka.observability.KafkaObservabilityConstants;
import org.ballerinalang.messaging.kafka.observability.KafkaTracingUtil;
import org.ballerinalang.messaging.kafka.utils.KafkaConstants;
import org.ballerinalang.messaging.kafka.utils.KafkaUtils;

/* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/consumer/SubscriptionHandler.class */
public class SubscriptionHandler {
    private static final PrintStream console = System.out;

    /* loaded from: input_file:org/ballerinalang/messaging/kafka/nativeimpl/consumer/SubscriptionHandler$KafkaRebalanceListener.class */
    static class KafkaRebalanceListener implements ConsumerRebalanceListener {
        private Strand strand;
        private Scheduler scheduler;
        private FPValue onPartitionsRevoked;
        private FPValue onPartitionsAssigned;
        private ObjectValue consumer;

        KafkaRebalanceListener(Strand strand, Scheduler scheduler, FPValue fPValue, FPValue fPValue2, ObjectValue objectValue) {
            this.strand = strand;
            this.scheduler = scheduler;
            this.onPartitionsRevoked = fPValue;
            this.onPartitionsAssigned = fPValue2;
            this.consumer = objectValue;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.scheduler.schedule(new Object[]{null, this.consumer, true, getPartitionsArray(collection), true}, this.onPartitionsRevoked.getConsumer(), this.strand, (CallableUnitCallback) null);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            this.scheduler.schedule(new Object[]{null, this.consumer, true, getPartitionsArray(collection), true}, this.onPartitionsAssigned.getConsumer(), this.strand, (CallableUnitCallback) null);
        }

        private BArray getPartitionsArray(Collection<TopicPartition> collection) {
            BArray createArrayValue = BValueCreator.createArrayValue(new BArrayType(KafkaUtils.getTopicPartitionRecord().getType()));
            for (TopicPartition topicPartition : collection) {
                createArrayValue.append(KafkaUtils.populateTopicPartitionRecord(topicPartition.topic(), topicPartition.partition()));
            }
            return createArrayValue;
        }
    }

    public static Object subscribe(ObjectValue objectValue, BArray bArray) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        List<String> stringListFromStringBArray = KafkaUtils.getStringListFromStringBArray(bArray);
        try {
            kafkaConsumer.subscribe(stringListFromStringBArray);
            KafkaMetricsUtil.reportBulkSubscription(objectValue, kafkaConsumer.subscription());
            console.println(KafkaConstants.SUBSCRIBED_TOPICS + KafkaUtils.getTopicNamesString(stringListFromStringBArray));
            return null;
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_SUBSCRIBE);
            return KafkaUtils.createKafkaError("Failed to subscribe to the provided topics: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    public static Object subscribeToPattern(ObjectValue objectValue, String str) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        try {
            kafkaConsumer.subscribe(Pattern.compile(str));
            KafkaMetricsUtil.reportBulkSubscription(objectValue, kafkaConsumer.subscription());
            return null;
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_SUBSCRIBE_PATTERN);
            return KafkaUtils.createKafkaError("Failed to unsubscribe from the topics: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }

    public static Object subscribeWithPartitionRebalance(ObjectValue objectValue, BArray bArray, FPValue fPValue, FPValue fPValue2) {
        Strand strand = Scheduler.getStrand();
        KafkaTracingUtil.traceResourceInvocation(strand, objectValue);
        NonBlockingCallback nonBlockingCallback = new NonBlockingCallback(strand);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        try {
            kafkaConsumer.subscribe(KafkaUtils.getStringListFromStringBArray(bArray), new KafkaRebalanceListener(strand, strand.scheduler, fPValue, fPValue2, objectValue));
            KafkaMetricsUtil.reportBulkSubscription(objectValue, kafkaConsumer.subscription());
        } catch (IllegalArgumentException | IllegalStateException | KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_SUBSCRIBE_PARTITION_REBALANCE);
            nonBlockingCallback.notifyFailure(KafkaUtils.createKafkaError("Failed to subscribe the consumer: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR));
        }
        nonBlockingCallback.notifySuccess();
        return null;
    }

    public static Object unsubscribe(ObjectValue objectValue) {
        KafkaTracingUtil.traceResourceInvocation(Scheduler.getStrand(), objectValue);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) objectValue.getNativeData(KafkaConstants.NATIVE_CONSUMER);
        try {
            Set subscription = kafkaConsumer.subscription();
            kafkaConsumer.unsubscribe();
            KafkaMetricsUtil.reportBulkUnsubscription(objectValue, subscription);
            return null;
        } catch (KafkaException e) {
            KafkaMetricsUtil.reportConsumerError(objectValue, KafkaObservabilityConstants.ERROR_TYPE_UNSUBSCRIBE);
            return KafkaUtils.createKafkaError("Failed to unsubscribe the consumer: " + e.getMessage(), KafkaConstants.CONSUMER_ERROR);
        }
    }
}
