/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sink.kafka;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class KafkaResourceOperator
implements SinkResourceOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResourceOperator.class);
    @Autowired
    private StreamSinkService sinkService;

    @Override
    public Boolean accept(String sinkType) {
        return "KAFKA".equals(sinkType);
    }

    @Override
    public void createSinkResource(SinkInfo sinkInfo) {
        LOGGER.info("begin to create kafka topic for sinkId={}", (Object)sinkInfo.getId());
        KafkaSinkDTO kafkaInfo = KafkaSinkDTO.getFromJson((String)sinkInfo.getExtParams());
        String topicName = kafkaInfo.getTopicName();
        Integer partitionNum = kafkaInfo.getPartitionNum();
        Preconditions.expectNotBlank((String)topicName, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"topic name cannot be empty");
        Preconditions.expectNotNull((Object)partitionNum, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"partition cannot be empty");
        try (Admin admin = this.getKafkaAdmin(kafkaInfo.getBootstrapServers());){
            boolean topicExists = this.isTopicExists(admin, topicName, partitionNum);
            if (!topicExists) {
                CreateTopicsResult result = admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.of(partitionNum), Optional.empty())));
                ((KafkaFuture)result.values().get(topicName)).get();
            }
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create kafka topic success");
            LOGGER.info("success to create kafka topic [{}] for sinkInfo={}", (Object)topicName, (Object)sinkInfo);
        }
        catch (Throwable e) {
            LOGGER.error("create kafka topic error, ", e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
            throw new WorkflowException("create kafka topic failed, reason: " + e.getMessage());
        }
    }

    private boolean isTopicExists(Admin admin, String topicName, Integer partitionNum) throws Exception {
        ListTopicsResult listResult = admin.listTopics();
        if (!((Map)listResult.namesToListings().get()).containsKey(topicName)) {
            LOGGER.info("kafka topic {} not existed", (Object)topicName);
            return false;
        }
        DescribeTopicsResult result = admin.describeTopics(Collections.singletonList(topicName));
        TopicDescription desc = (TopicDescription)((KafkaFuture)result.values().get(topicName)).get();
        String info = "kafka topic=%s already exist with partition num=%s";
        if (desc.partitions().size() != partitionNum.intValue()) {
            String errMsg = String.format(info + ", but the requested partition num=%s", topicName, desc.partitions().size(), partitionNum);
            LOGGER.error(errMsg);
            throw new IllegalArgumentException(errMsg);
        }
        LOGGER.info(String.format(info + ", no need to create", topicName, partitionNum));
        return true;
    }

    private Admin getKafkaAdmin(String bootstrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        return Admin.create((Properties)props);
    }
}

