/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.workflow.consumption.listener;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ConsumptionCompleteProcessListener
implements ProcessEventListener {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionCompleteProcessListener.class);
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private ConsumptionEntityMapper consumptionMapper;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private PulsarOperator pulsarOperator;
    @Autowired
    private TubeMQOperator tubeMQOperator;

    public ProcessEvent event() {
        return ProcessEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        NewConsumptionProcessForm consumptionForm = (NewConsumptionProcessForm)context.getProcessForm();
        Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
        ConsumptionEntity entity = this.consumptionMapper.selectByPrimaryKey(consumptionId);
        if (entity == null) {
            throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
        }
        MQType mqType = MQType.forType((String)entity.getMqType());
        if (mqType == MQType.TUBE) {
            this.createTubeConsumerGroup(entity, context.getOperator());
            return ListenerResult.success((String)"Create Tube consumer group successful");
        }
        if (mqType != MQType.PULSAR && mqType != MQType.TDMQ_PULSAR) {
            throw new WorkflowListenerException("Unsupported MQ type " + mqType);
        }
        this.createPulsarSubscription(entity);
        this.updateConsumerInfo(consumptionId, entity.getConsumerGroup());
        return ListenerResult.success((String)"Create MQ consumer group successful");
    }

    private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
        ConsumptionEntity update = new ConsumptionEntity();
        update.setId(consumptionId);
        update.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        update.setConsumerGroup(consumerGroup);
        update.setModifyTime(new Date());
        this.consumptionMapper.updateByPrimaryKeySelective(update);
    }

    private void createPulsarSubscription(ConsumptionEntity entity) {
        String groupId = entity.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        Preconditions.checkNotNull((Object)groupEntity, (String)("inlong group not found for groupId=" + groupId));
        String mqResource = groupEntity.getMqResource();
        Preconditions.checkNotNull((Object)mqResource, (String)("mq resource cannot empty for groupId=" + groupId));
        String clusterTag = groupEntity.getInlongClusterTag();
        InlongClusterInfo clusterInfo = this.clusterService.getOne(clusterTag, null, "PULSAR");
        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            PulsarTopicBean topicMessage = new PulsarTopicBean();
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            topicMessage.setTenant(tenant);
            topicMessage.setNamespace(mqResource);
            String consumerGroup = entity.getConsumerGroup();
            List<String> topics = Arrays.asList(entity.getTopic().split(","));
            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
        }
        catch (Exception e) {
            log.error("create pulsar topic failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " + e.getMessage());
        }
    }

    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean, List<String> topics) {
        try {
            this.pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
        }
        catch (Exception e) {
            log.error("create pulsar consumer group failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar consumer group");
        }
    }

    private void createTubeConsumerGroup(ConsumptionEntity entity, String operator) {
        String groupId = entity.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        Preconditions.checkNotNull((Object)groupEntity, (String)("inlong group not found for groupId=" + groupId));
        String mqResource = groupEntity.getMqResource();
        Preconditions.checkNotNull((Object)mqResource, (String)("mq resource cannot empty for groupId=" + groupId));
        String clusterTag = groupEntity.getInlongClusterTag();
        TubeClusterInfo clusterInfo = (TubeClusterInfo)this.clusterService.getOne(clusterTag, null, "TUBE");
        try {
            this.tubeMQOperator.createConsumerGroup(clusterInfo, entity.getTopic(), entity.getConsumerGroup(), operator);
        }
        catch (Exception e) {
            log.error("failed to create tube consumer group: ", (Throwable)e);
            throw new WorkflowListenerException("failed to create tube consumer group: " + e.getMessage());
        }
    }

    public boolean async() {
        return false;
    }
}

