/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.base.Objects;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarResourceOperator
implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(PulsarResourceOperator.class);
    public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
    public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = "%s_%s_consumer_group_realtime_review";
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private InlongConsumeService consumeService;
    @Autowired
    private PulsarOperator pulsarOperator;

    @Override
    public boolean accept(String mqType) {
        return "PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType);
    }

    @Override
    public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.expectNotBlank((String)operator, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"operator cannot be null");
        String groupId = groupInfo.getInlongGroupId();
        String clusterTag = groupInfo.getInlongClusterTag();
        log.info("begin to create pulsar resource for groupId={}, clusterTag={}", (Object)groupId, (Object)clusterTag);
        if (!(groupInfo instanceof InlongPulsarInfo)) {
            throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + groupId);
        }
        InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
        String tenant = pulsarInfo.getPulsarTenant();
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, "PULSAR");
        for (ClusterInfo clusterInfo : clusterInfos) {
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
            try {
                PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);
                Throwable throwable = null;
                try {
                    if (StringUtils.isBlank((CharSequence)tenant)) {
                        tenant = pulsarCluster.getPulsarTenant();
                    }
                    if (Objects.equal((Object)GroupStatus.CONFIG_SUCCESSFUL.getCode(), (Object)groupInfo.getStatus())) continue;
                    this.pulsarOperator.createTenant(pulsarAdmin, tenant);
                    String namespace = groupInfo.getMqResource();
                    this.pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
                    log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}", new Object[]{groupId, tenant, namespace, pulsarCluster});
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (pulsarAdmin == null) continue;
                    if (throwable != null) {
                        try {
                            pulsarAdmin.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    pulsarAdmin.close();
                }
            }
            catch (Exception e) {
                String msg = "failed to create pulsar resource for groupId=" + groupId;
                log.error(msg + ", cluster=" + pulsarCluster, (Throwable)e);
                throw new WorkflowListenerException(msg + ": " + e.getMessage());
            }
        }
        log.info("success to create pulsar resource for groupId={}, clusterTag={}", (Object)groupId, (Object)clusterTag);
    }

    @Override
    public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        String groupId = groupInfo.getInlongGroupId();
        String clusterTag = groupInfo.getInlongClusterTag();
        log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", (Object)groupId, (Object)clusterTag);
        List<InlongStreamBriefInfo> streamInfos = this.streamService.getTopicList(groupId);
        if (CollectionUtils.isEmpty(streamInfos)) {
            log.warn("skip to delete pulsar resource as no streams for groupId={}", (Object)groupId);
            return;
        }
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, "PULSAR");
        for (ClusterInfo clusterInfo : clusterInfos) {
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
            try {
                for (InlongStreamBriefInfo streamInfo : streamInfos) {
                    this.deletePulsarTopic((InlongPulsarInfo)groupInfo, pulsarCluster, streamInfo.getMqResource());
                }
            }
            catch (Exception e) {
                String msg = "failed to delete pulsar resource for groupId=" + groupId;
                log.error(msg + ", cluster=" + pulsarCluster, (Throwable)e);
                throw new WorkflowListenerException(msg + ": " + e.getMessage());
            }
        }
        log.info("success to delete pulsar resource for groupId={}, clusterTag={}", (Object)groupId, (Object)clusterTag);
    }

    @Override
    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.expectNotNull((Object)streamInfo, (String)"inlong stream info cannot be null");
        Preconditions.expectNotBlank((String)operator, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"operator cannot be null");
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        String clusterTag = groupInfo.getInlongClusterTag();
        log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}", new Object[]{groupId, streamId, clusterTag});
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, "PULSAR");
        for (ClusterInfo clusterInfo : clusterInfos) {
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
            try {
                String topicName = streamInfo.getMqResource();
                this.createTopic((InlongPulsarInfo)groupInfo, pulsarCluster, topicName);
                this.createSubscription((InlongPulsarInfo)groupInfo, pulsarCluster, topicName, streamId);
                log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}", new Object[]{groupId, streamId, topicName, pulsarCluster});
            }
            catch (Exception e) {
                String msg = "failed to create pulsar resource for groupId=" + groupId + ", streamId=" + streamId;
                log.error(msg + ", cluster=" + pulsarCluster, (Throwable)e);
                throw new WorkflowListenerException(msg + ": " + e.getMessage());
            }
        }
        log.info("success to create pulsar resource for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
    }

    @Override
    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
        Preconditions.expectNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.expectNotNull((Object)streamInfo, (String)"inlong stream info cannot be null");
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        String clusterTag = groupInfo.getInlongClusterTag();
        log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}", new Object[]{groupId, streamId, clusterTag});
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, "PULSAR");
        for (ClusterInfo clusterInfo : clusterInfos) {
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
            try {
                this.deletePulsarTopic((InlongPulsarInfo)groupInfo, pulsarCluster, streamInfo.getMqResource());
                log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}", new Object[]{groupId, streamId, streamInfo.getMqResource(), pulsarCluster});
            }
            catch (Exception e) {
                String msg = "failed to delete pulsar topic for groupId=" + groupId + ", streamId=" + streamId;
                log.error(msg + ", cluster=" + pulsarCluster, (Throwable)e);
                throw new WorkflowListenerException(msg + ": " + e.getMessage());
            }
        }
        log.info("success to delete pulsar resource for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
    }

    private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception {
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = pulsarInfo.getPulsarTenant();
            if (StringUtils.isBlank((CharSequence)tenant)) {
                tenant = pulsarCluster.getPulsarTenant();
            }
            String namespace = pulsarInfo.getMqResource();
            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder().pulsarTenant(tenant).namespace(namespace).topicName(topicName).queueModule(pulsarInfo.getQueueModule()).numPartitions(Integer.valueOf(pulsarInfo.getPartitionNum())).build();
            this.pulsarOperator.createTopic(pulsarAdmin, topicInfo);
        }
    }

    private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName, String streamId) throws Exception {
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = pulsarInfo.getPulsarTenant();
            if (StringUtils.isBlank((CharSequence)tenant)) {
                tenant = pulsarCluster.getPulsarTenant();
            }
            String namespace = pulsarInfo.getMqResource();
            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
            boolean exist = this.pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName, "PARALLEL".equals(pulsarInfo.getQueueModule()));
            if (!exist) {
                String serviceUrl = pulsarCluster.getAdminUrl();
                log.error("topic={} not exists in {}", (Object)fullTopicName, (Object)serviceUrl);
                throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
            }
            String groupId = pulsarInfo.getInlongGroupId();
            List<StreamSink> streamSinks = this.sinkService.listSink(groupId, streamId);
            if (CollectionUtils.isEmpty(streamSinks)) {
                log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
                return;
            }
            String clusterTag = pulsarInfo.getInlongClusterTag();
            for (StreamSink sink : streamSinks) {
                String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
                this.pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
                log.info("success to create subs={} for groupId={}, topic={}", new Object[]{subs, groupId, fullTopicName});
                Integer id = this.consumeService.saveBySystem((InlongGroupInfo)pulsarInfo, topicName, subs);
                log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", new Object[]{id, subs, groupId, topicName});
            }
        }
    }

    private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception {
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = pulsarInfo.getPulsarTenant();
            if (StringUtils.isBlank((CharSequence)tenant)) {
                tenant = pulsarCluster.getPulsarTenant();
            }
            String namespace = pulsarInfo.getMqResource();
            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder().pulsarTenant(tenant).namespace(namespace).topicName(topicName).build();
            this.pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo);
        }
    }

    @Override
    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, Integer messageCount) throws PulsarClientException {
        String groupId = streamInfo.getInlongGroupId();
        InlongPulsarInfo inlongPulsarInfo = (InlongPulsarInfo)groupInfo;
        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)this.clusterService.getOne(groupInfo.getInlongClusterTag(), null, "PULSAR");
        ArrayList<BriefMQMessage> briefMQMessages = new ArrayList();
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = inlongPulsarInfo.getPulsarTenant();
            if (StringUtils.isBlank((CharSequence)tenant)) {
                tenant = pulsarCluster.getPulsarTenant();
            }
            String namespace = groupInfo.getMqResource();
            String topicName = streamInfo.getMqResource();
            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
            String clusterTag = inlongPulsarInfo.getInlongClusterTag();
            String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
            briefMQMessages = this.pulsarOperator.queryLatestMessage(pulsarAdmin, fullTopicName, subs, messageCount, streamInfo);
            Integer id = this.consumeService.saveBySystem(groupInfo, topicName, subs);
            log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", new Object[]{id, subs, groupId, topicName});
        }
        return briefMQMessages;
    }
}

