package org.apache.inlong.manager.service.resource.queue.kafka;

import com.google.common.annotations.VisibleForTesting;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.class */
public class KafkaOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);

    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;

    public void createTopic(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, String str) throws InterruptedException, ExecutionException {
        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
        NewTopic newTopic = new NewTopic(str, inlongKafkaInfo.getNumPartitions().intValue(), inlongKafkaInfo.getReplicationFactor().shortValue());
        if (topicIsExists(kafkaClusterInfo, str)) {
            LOGGER.warn("kafka topic={} already exists", str);
            return;
        }
        CreateTopicsResult createTopics = adminClient.createTopics(Collections.singletonList(newTopic));
        Thread.sleep(500L);
        LOGGER.info("success to create kafka topic={}, with={} numPartitions", str, createTopics.numPartitions(str).get());
    }

    public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String str) {
        LOGGER.info("success to delete topic={}, result: {}", str, KafkaUtils.getAdminClient(kafkaClusterInfo).deleteTopics(Collections.singletonList(str)).all());
    }

    public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String str) throws ExecutionException, InterruptedException {
        return ((Set) KafkaUtils.getAdminClient(kafkaClusterInfo).listTopics().names().get()).contains(str);
    }

    public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo kafkaClusterInfo, String str, String str2, Integer num, InlongStreamInfo inlongStreamInfo) {
        LOGGER.debug("begin to query message for topic {} in cluster: {}", str, kafkaClusterInfo);
        return getLatestMessage(new KafkaConsumer(getProperties(kafkaClusterInfo.getUrl(), str2)), str, num, inlongStreamInfo);
    }

    @VisibleForTesting
    public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]> consumer, String str, Integer num, InlongStreamInfo inlongStreamInfo) {
        ArrayList arrayList = new ArrayList();
        try {
            try {
                List list = (List) consumer.partitionsFor(str).stream().map(partitionInfo -> {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }).collect(Collectors.toList());
                Map beginningOffsets = consumer.beginningOffsets(list);
                Map endOffsets = consumer.endOffsets(list);
                int ceil = (int) Math.ceil(num.intValue() / list.size());
                Map map = (Map) beginningOffsets.entrySet().stream().map(entry -> {
                    long longValue = ((Long) entry.getValue()).longValue();
                    long longValue2 = ((Long) endOffsets.getOrDefault(entry.getKey(), Long.valueOf(longValue))).longValue();
                    return Pair.of(entry.getKey(), Long.valueOf(longValue2 - longValue >= ((long) ceil) ? longValue2 - ceil : longValue));
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                consumer.assign(list);
                consumer.getClass();
                map.forEach((v1, v2) -> {
                    r1.seek(v1, v2);
                });
                Iterator it = consumer.poll(Duration.ofMillis(100L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    HashMap hashMap = new HashMap();
                    for (Header header : consumerRecord.headers()) {
                        hashMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
                    }
                    arrayList.addAll(this.deserializeOperatorFactory.getInstance(DataProxyMsgEncType.valueOf(Integer.parseInt((String) hashMap.getOrDefault("msgEnType", Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()))))).decodeMsg(inlongStreamInfo, (byte[]) consumerRecord.value(), hashMap, 0));
                    if (arrayList.size() >= num.intValue()) {
                        break;
                    }
                }
                LOGGER.debug("success query messages for topic={}, size={}, returned size={}", new Object[]{str, Integer.valueOf(arrayList.size()), num});
                List<BriefMQMessage> subList = arrayList.subList(arrayList.size() > num.intValue() ? arrayList.size() - num.intValue() : 0, arrayList.size());
                for (int i = 0; i < subList.size(); i++) {
                    subList.get(i).setId(Integer.valueOf(i + 1));
                }
                return subList;
            } catch (Exception e) {
                LOGGER.error("decode msg error: ", e);
                throw new BusinessException("decode msg error: " + e.getMessage());
            }
        } finally {
            consumer.close();
        }
    }

    private static Properties getProperties(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("group.id", str2);
        return properties;
    }
}
