/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.tubemq;

import com.google.common.base.Objects;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TubeMQQueueResourceOperator
implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(TubeMQQueueResourceOperator.class);
    public static final String TUBE_CONSUMER_GROUP = "%s_%s_%s_consumer_group";
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private InlongConsumeService consumeService;
    @Autowired
    private TubeMQOperator tubeMQOperator;
    @Autowired
    private StreamSinkService sinkService;

    @Override
    public boolean accept(String mqType) {
        return "TUBEMQ".equals(mqType);
    }

    @Override
    public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.expectNotBlank((String)operator, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"operator cannot be null");
        String groupId = groupInfo.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={}", (Object)groupId);
        if (Objects.equal((Object)GroupStatus.CONFIG_SUCCESSFUL.getCode(), (Object)groupInfo.getStatus())) {
            log.info("skip to create tubemq resource as the status of groupId={} was successful", (Object)groupId);
        }
        try {
            String clusterTag = groupInfo.getInlongClusterTag();
            TubeClusterInfo tubeCluster = (TubeClusterInfo)this.clusterService.getOne(clusterTag, null, "TUBEMQ");
            String topicName = groupInfo.getMqResource();
            this.tubeMQOperator.createTopic(tubeCluster, topicName, operator);
            log.info("success to create tubemq topic for groupId={}, cluster={}", (Object)groupId, (Object)tubeCluster);
        }
        catch (Exception e) {
            log.error("failed to create tubemq resource for groupId=" + groupId, (Throwable)e);
            throw new WorkflowListenerException("failed to create tubemq resource: " + e.getMessage());
        }
    }

    @Override
    public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
    }

    @Override
    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
        String streamId;
        String groupId = groupInfo.getInlongGroupId();
        List<StreamSink> streamSinks = this.sinkService.listSink(groupId, streamId = streamInfo.getInlongStreamId());
        if (CollectionUtils.isEmpty(streamSinks)) {
            log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
            return;
        }
        for (StreamSink sink : streamSinks) {
            String clusterTag = groupInfo.getInlongClusterTag();
            TubeClusterInfo tubeCluster = (TubeClusterInfo)this.clusterService.getOne(clusterTag, null, "TUBEMQ");
            String topicName = groupInfo.getMqResource();
            String consumeGroup = String.format(TUBE_CONSUMER_GROUP, clusterTag, topicName, sink.getId());
            this.tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
            log.info("success to create tubemq consumer group for groupId={}", (Object)groupId);
            Integer id = this.consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}", new Object[]{id, consumeGroup, groupId, topicName});
        }
    }

    @Override
    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
    }

    @Override
    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, Integer messageCount) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        String clusterTag = groupInfo.getInlongClusterTag();
        TubeClusterInfo tubeCluster = (TubeClusterInfo)this.clusterService.getOne(clusterTag, null, "TUBEMQ");
        String topicName = groupInfo.getMqResource();
        return this.tubeMQOperator.queryLastMessage(tubeCluster, topicName, messageCount, streamInfo);
    }

    @Override
    public String getSortConsumeGroup(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity) {
        String topicName = streamEntity.getMqResource();
        return groupInfo.getInlongClusterTag() + "_" + topicName + "_consumer_group";
    }
}

