/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class PulsarOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final int MAX_PARTITION = 1000;
    private static final int RETRY_TIMES = 3;
    private static final int DELAY_SECONDS = 5;
    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;
    @Autowired
    private ConversionHandle conversionHandle;
    @Autowired
    private RestTemplate restTemplate;

    public void createTenant(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
        LOGGER.info("begin to create pulsar tenant={}", (Object)tenant);
        Preconditions.expectNotBlank((String)tenant, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"Tenant cannot be empty");
        try {
            List<String> clusters = PulsarUtils.getClusters(this.restTemplate, pulsarClusterInfo);
            boolean exists = this.tenantIsExists(pulsarClusterInfo, tenant);
            if (exists) {
                LOGGER.warn("pulsar tenant={} already exists, skip to create", (Object)tenant);
                return;
            }
            PulsarTenantInfo tenantInfo = new PulsarTenantInfo();
            tenantInfo.setAllowedClusters((Set)Sets.newHashSet(clusters));
            tenantInfo.setAdminRoles((Set)Sets.newHashSet());
            PulsarUtils.createTenant(this.restTemplate, pulsarClusterInfo, tenant, tenantInfo);
            LOGGER.info("success to create pulsar tenant={}", (Object)tenant);
        }
        catch (Exception e) {
            LOGGER.error("failed to create pulsar tenant=" + tenant, (Throwable)e);
            throw e;
        }
    }

    public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInfo pulsarInfo, String tenant, String namespace) throws Exception {
        Preconditions.expectNotBlank((String)tenant, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"pulsar tenant cannot be empty during create namespace");
        Preconditions.expectNotBlank((String)namespace, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"pulsar namespace cannot be empty during create namespace");
        String tenantNamespaceName = tenant + "/" + namespace;
        LOGGER.info("begin to create namespace={}", (Object)tenantNamespaceName);
        try {
            Integer retentionSize;
            Integer retentionTime;
            boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, tenantNamespaceName);
            if (isExists) {
                LOGGER.warn("namespace={} already exists, skip to create", (Object)tenantNamespaceName);
                return;
            }
            PulsarNamespacePolicies policies = new PulsarNamespacePolicies();
            Integer ttl = pulsarInfo.getTtl();
            if (ttl > 0) {
                policies.setMessageTtlInSeconds(this.conversionHandle.handleConversion(ttl, pulsarInfo.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            if ((retentionTime = pulsarInfo.getRetentionTime()) > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, pulsarInfo.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            if ((retentionSize = pulsarInfo.getRetentionSize()) > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, pulsarInfo.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies(retentionTime.intValue(), (long)retentionSize.intValue());
            policies.setRetentionPolicies(retentionPolicies);
            PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies(pulsarInfo.getEnsemble().intValue(), pulsarInfo.getWriteQuorum().intValue(), pulsarInfo.getAckQuorum().intValue(), pulsarInfo.getMaxMarkDeleteRate().doubleValue());
            policies.setPersistence(persistencePolicies);
            PulsarUtils.createNamespace(this.restTemplate, pulsarClusterInfo, tenant, namespace, policies);
            LOGGER.info("success to create namespace={}", (Object)tenantNamespaceName);
        }
        catch (Exception e) {
            LOGGER.error("failed to create namespace=" + tenantNamespaceName, (Throwable)e);
            throw e;
        }
    }

    public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
        Preconditions.expectNotNull((Object)topicInfo, (String)"pulsar topic info cannot be empty");
        String tenant = topicInfo.getPulsarTenant();
        String namespace = topicInfo.getNamespace();
        String topicName = topicInfo.getTopicName();
        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
        if (this.topicExists(pulsarClusterInfo, tenant, namespace, topicName, "PARALLEL".equals(topicInfo.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already exists in {}", (Object)fullTopicName, (Object)pulsarClusterInfo.getAdminUrl());
            return;
        }
        try {
            if ("SERIAL".equals(topicInfo.getQueueModule())) {
                PulsarUtils.createNonPartitionedTopic(this.restTemplate, pulsarClusterInfo, fullTopicName);
                String res = PulsarUtils.lookupTopic(this.restTemplate, pulsarClusterInfo, fullTopicName);
                LOGGER.info("success to create topic={}, lookup result is {}", (Object)fullTopicName, (Object)res);
            } else {
                List<String> clusters = PulsarUtils.getClusters(this.restTemplate, pulsarClusterInfo);
                Integer numPartitions = topicInfo.getNumPartitions();
                if (numPartitions < 0 || numPartitions >= 1000) {
                    List<String> brokers = PulsarUtils.getBrokers(this.restTemplate, pulsarClusterInfo);
                    numPartitions = brokers.size();
                }
                PulsarUtils.createPartitionedTopic(this.restTemplate, pulsarClusterInfo, fullTopicName, numPartitions);
                Map<String, String> res = PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, fullTopicName);
                if (res.size() != numPartitions.intValue()) {
                    for (int i = 0; i < 3 && res.size() != numPartitions.intValue(); ++i) {
                        res = PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, fullTopicName);
                        try {
                            Thread.sleep(500L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("Thread has been interrupted");
                        }
                    }
                }
                if (numPartitions.intValue() != res.size()) {
                    throw new Exception("The number of partitions not equal to lookupPartitionedTopic");
                }
                LOGGER.info("success to create topic={}", (Object)fullTopicName);
            }
        }
        catch (Exception e) {
            LOGGER.error("failed to create topic=" + fullTopicName, (Throwable)e);
            throw e;
        }
    }

    public void forceDeleteTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
        Preconditions.expectNotNull((Object)topicInfo, (String)"pulsar topic info cannot be empty");
        String tenant = topicInfo.getPulsarTenant();
        String namespace = topicInfo.getNamespace();
        String topic = topicInfo.getTopicName();
        String fullTopicName = tenant + "/" + namespace + "/" + topic;
        boolean isPartitioned = "PARALLEL".equals(topicInfo.getQueueModule());
        if (this.topicExists(pulsarClusterInfo, tenant, namespace, topic, isPartitioned)) {
            LOGGER.warn("pulsar topic={} already delete", (Object)fullTopicName);
            return;
        }
        try {
            PulsarUtils.forceDeleteTopic(this.restTemplate, pulsarClusterInfo, fullTopicName, isPartitioned);
            LOGGER.info("success to delete topic={}", (Object)fullTopicName);
        }
        catch (Exception e) {
            LOGGER.error("failed to delete topic=" + fullTopicName, (Throwable)e);
            throw e;
        }
    }

    public void createSubscription(PulsarClusterInfo pulsarClusterInfo, String fullTopicName, String queueModule, String subscription) throws Exception {
        LOGGER.info("begin to create pulsar subscription={} for topic={}", (Object)subscription, (Object)fullTopicName);
        try {
            boolean isExists = this.subscriptionExists(pulsarClusterInfo, fullTopicName, subscription, "PARALLEL".equals(queueModule));
            if (isExists) {
                LOGGER.warn("pulsar subscription={} already exists, skip to create", (Object)subscription);
                return;
            }
            PulsarUtils.createSubscription(this.restTemplate, pulsarClusterInfo, fullTopicName, subscription);
            LOGGER.info("success to create subscription={}", (Object)subscription);
        }
        catch (Exception e) {
            LOGGER.error("failed to create pulsar subscription=" + subscription, (Throwable)e);
            throw e;
        }
    }

    public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String subscription, PulsarTopicInfo topicInfo, List<String> topicList) throws Exception {
        for (String topic : topicList) {
            topicInfo.setTopicName(topic);
            String fullTopicName = topicInfo.getPulsarTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
            this.createSubscription(pulsarClusterInfo, fullTopicName, topicInfo.getQueueModule(), subscription);
        }
        LOGGER.info("success to create subscription={} for multiple topics={}", (Object)subscription, topicList);
    }

    private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
        List<String> tenants = PulsarUtils.getTenants(this.restTemplate, pulsarClusterInfo);
        return tenants.contains(tenant);
    }

    private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace) throws Exception {
        List<String> namespaces = PulsarUtils.getNamespaces(this.restTemplate, pulsarClusterInfo, tenant);
        return namespaces.contains(namespace);
    }

    public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace, String topicName, boolean isPartitioned) {
        if (StringUtils.isBlank((CharSequence)topicName)) {
            return true;
        }
        boolean topicExists = false;
        try {
            List<String> topics = isPartitioned ? PulsarUtils.getPartitionedTopics(this.restTemplate, pulsarClusterInfo, tenant, namespace) : PulsarUtils.getTopics(this.restTemplate, pulsarClusterInfo, tenant, namespace);
            for (String t : topics) {
                int suffixIndex;
                t = t.substring(t.lastIndexOf("/") + 1);
                if (!isPartitioned && (suffixIndex = t.lastIndexOf("-partition-")) > 0) {
                    t = t.substring(0, suffixIndex);
                }
                if (!topicName.equals(t)) continue;
                topicExists = true;
                break;
            }
        }
        catch (Exception pe) {
            LOGGER.error("check if the pulsar topic={} exists error, begin retry", (Object)topicName, (Object)pe);
            int count = 0;
            try {
                block5: while (!topicExists && ++count <= 3) {
                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", (Object)topicName, (Object)count);
                    Thread.sleep(5L);
                    List<String> topics = PulsarUtils.getPartitionedTopics(this.restTemplate, pulsarClusterInfo, tenant, namespace);
                    for (String t : topics) {
                        if (!topicName.equals(t = t.substring(t.lastIndexOf("/") + 1))) continue;
                        topicExists = true;
                        continue block5;
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("after retry, check if the pulsar topic={} exists still error", (Object)topicName, (Object)pe);
            }
        }
        return topicExists;
    }

    private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String topic, String subscription, boolean isPartitioned) {
        int count = 0;
        while (++count <= 3) {
            try {
                LOGGER.info("check whether the subscription exists for topic={}, try count={}", (Object)topic, (Object)count);
                Thread.sleep(5L);
                if (isPartitioned) {
                    Map<String, String> topicMap = PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, topic);
                    if (topicMap.isEmpty()) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                } else {
                    String lookupTopic = PulsarUtils.lookupTopic(this.restTemplate, pulsarClusterInfo, topic);
                    if (StringUtils.isBlank((CharSequence)lookupTopic)) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                }
                List<String> subscriptionList = PulsarUtils.getSubscriptions(this.restTemplate, pulsarClusterInfo, topic);
                return subscriptionList.contains(subscription);
            }
            catch (Exception e) {
                LOGGER.error("check if the subscription exists for topic={} error, continue retry", (Object)topic, (Object)e);
                if (count != 3) continue;
                LOGGER.error("after {} times retry, still check subscription exception for topic {}", (Object)count, (Object)topic);
                throw new BusinessException("check if the subscription exists error: " + e.getMessage());
            }
        }
        return false;
    }

    public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName, String subName, Integer messageCount, InlongStreamInfo streamInfo, boolean serial) {
        LOGGER.info("begin to query message for topic {}, subName={}", (Object)topicFullName, (Object)subName);
        ArrayList<BriefMQMessage> messageList = new ArrayList<BriefMQMessage>();
        int partitionCount = this.getPartitionCount(pulsarClusterInfo, topicFullName);
        for (int messageIndex = 0; messageIndex < messageCount; ++messageIndex) {
            int currentPartitionNum = messageIndex % partitionCount;
            int messagePosition = messageIndex / partitionCount;
            String topicNameOfPartition = this.buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
            messageList.addAll(this.queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex, streamInfo, messagePosition));
        }
        LOGGER.info("success query message by subs={} for topic={}", (Object)subName, (Object)topicFullName);
        return messageList;
    }

    private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String topicFullName) {
        PulsarTopicMetadata pulsarTopicMetadata;
        try {
            pulsarTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(this.restTemplate, pulsarClusterInfo, topicFullName);
        }
        catch (Exception e) {
            String errMsg = "get pulsar partition error ";
            LOGGER.error(errMsg, (Throwable)e);
            throw new BusinessException(errMsg + e.getMessage());
        }
        return pulsarTopicMetadata.getPartitions() > 0 ? pulsarTopicMetadata.getPartitions() : 1;
    }

    private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarClusterInfo pulsarClusterInfo, int index, InlongStreamInfo streamInfo, int messagePosition) {
        ArrayList<BriefMQMessage> briefMQMessages = new ArrayList<BriefMQMessage>();
        try {
            ResponseEntity<byte[]> httpResponse = PulsarUtils.examineMessage(this.restTemplate, pulsarClusterInfo, topicPartition, "latest", messagePosition);
            PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
            HashMap<String, String> headers = messageInfo.getProperties();
            if (headers == null) {
                headers = new HashMap<String, String>();
            }
            MessageWrapType messageWrapType = MessageWrapType.forType((String)streamInfo.getWrapType());
            if (headers.get("msgEnType") != null) {
                messageWrapType = MessageWrapType.valueOf((int)Integer.parseInt((String)headers.get("msgEnType")));
            }
            DeserializeOperator deserializeOperator = this.deserializeOperatorFactory.getInstance(messageWrapType);
            briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(), headers, index));
        }
        catch (Exception e) {
            LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}", new Object[]{streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId(), e});
        }
        return briefMQMessages;
    }

    public void resetCursor(PulsarClusterInfo pulsarClusterInfo, String topicFullName, String subName, Long resetTime) {
        try {
            PulsarUtils.resetCursor(this.restTemplate, pulsarClusterInfo, topicFullName, subName, resetTime);
        }
        catch (Exception e) {
            LOGGER.error("failed reset cursor consumer:", (Throwable)e);
            throw new BusinessException("failed reset cursor consumer:" + e.getMessage());
        }
    }

    private String buildTopicNameOfPartition(String topicName, int partition, boolean serial) {
        if (serial) {
            return topicName;
        }
        return topicName + "-partition-" + partition;
    }
}

