/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.kafka;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.resource.queue.kafka.KafkaUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class KafkaOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);

    public void createTopic(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, String topicName) throws InterruptedException, ExecutionException {
        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
        NewTopic topic = new NewTopic(topicName, inlongKafkaInfo.getNumPartitions().intValue(), inlongKafkaInfo.getReplicationFactor().shortValue());
        CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topic));
        Thread.sleep(500L);
        LOGGER.info("success to create kafka topic={}, with={} numPartitions", (Object)topicName, result.numPartitions(topicName).get());
    }

    public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String topicName) {
        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
        DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName));
        LOGGER.info("success to delete topic={}", (Object)topicName);
    }

    public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String topic) throws ExecutionException, InterruptedException {
        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
        Set topicList = (Set)adminClient.listTopics().names().get();
        return topicList.contains(topic);
    }
}

