package org.apache.inlong.manager.service.resource.queue.tubemq;

import com.google.common.base.Objects;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.class */
public class TubeMQResourceOperator implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(TubeMQResourceOperator.class);

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private InlongConsumeService consumeService;

    @Autowired
    private TubeMQOperator tubeMQOperator;

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public boolean accept(String str) {
        return "TUBEMQ".equals(str);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.checkNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.checkNotNull(str, "operator cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={}", inlongGroupId);
        if (Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), inlongGroupInfo.getStatus())) {
            log.info("skip to create tubemq resource as the status of groupId={} was successful", inlongGroupId);
        }
        try {
            String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
            TubeClusterInfo one = this.clusterService.getOne(inlongClusterTag, null, "TUBEMQ");
            String mqResource = inlongGroupInfo.getMqResource();
            this.tubeMQOperator.createTopic(one, mqResource, str);
            log.info("success to create tubemq topic for groupId={}", inlongGroupId);
            String str2 = inlongClusterTag + "_" + mqResource + "_consumer_group";
            this.tubeMQOperator.createConsumerGroup(one, mqResource, str2, str);
            log.info("success to create tubemq consumer group for groupId={}", inlongGroupId);
            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}", new Object[]{this.consumeService.saveBySystem(inlongGroupInfo, mqResource, str2), str2, inlongGroupId, mqResource});
            log.info("success to create tubemq resource for groupId={}, cluster={}", inlongGroupId, one);
        } catch (Exception e) {
            log.error("failed to create tubemq resource for groupId=" + inlongGroupId, e);
            throw new WorkflowListenerException("failed to create tubemq resource: " + e.getMessage());
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
    }
}
