package org.apache.inlong.manager.service.consume;

import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.kafka.ConsumeKafkaDTO;
import org.apache.inlong.manager.pojo.consume.kafka.ConsumeKafkaInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaTopicInfo;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/consume/ConsumeKafkaOperator.class */
public class ConsumeKafkaOperator extends AbstractConsumeOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeKafkaOperator.class);

    @Autowired
    private InlongGroupService groupService;

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public Boolean accept(String str) {
        return Boolean.valueOf(getMQType().equals(str));
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public String getMQType() {
        return "KAFKA";
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public void checkTopicInfo(InlongConsumeRequest inlongConsumeRequest) {
        String inlongGroupId = inlongConsumeRequest.getInlongGroupId();
        InlongKafkaTopicInfo topic = this.groupService.getTopic(inlongGroupId);
        Preconditions.checkNotNull(topic, "inlong group not exist: " + inlongGroupId);
        InlongKafkaTopicInfo inlongKafkaTopicInfo = topic;
        String topic2 = inlongConsumeRequest.getTopic();
        Preconditions.checkTrue(inlongKafkaTopicInfo.getTopics().contains(topic2), "Kafka topic not exist for " + topic2);
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public InlongConsumeInfo getFromEntity(InlongConsumeEntity inlongConsumeEntity) {
        Preconditions.checkNotNull(inlongConsumeEntity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
        ConsumeKafkaInfo consumeKafkaInfo = new ConsumeKafkaInfo();
        CommonBeanUtils.copyProperties(inlongConsumeEntity, consumeKafkaInfo);
        if (StringUtils.isNotBlank(inlongConsumeEntity.getExtParams())) {
            CommonBeanUtils.copyProperties(ConsumeKafkaDTO.getFromJson(inlongConsumeEntity.getExtParams()), consumeKafkaInfo);
        }
        return consumeKafkaInfo;
    }

    @Override // org.apache.inlong.manager.service.consume.AbstractConsumeOperator
    protected void setTargetEntity(InlongConsumeRequest inlongConsumeRequest, InlongConsumeEntity inlongConsumeEntity) {
        LOGGER.info("do nothing for inlong consume with Kafka");
    }
}
