package org.apache.inlong.manager.service.source.pulsar;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSourceDTO;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSourceRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.class */
public class PulsarSourceOperator extends AbstractSourceOperator {
    private static final String AUTH_CLASSNAME_KEY = "properties.auth-plugin-classname";
    private static final String AUTH_CLASSNAME_VALUE = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
    private static final String AUTH_PARAMS_KEY = "properties.auth-params";
    private static final String AUTH_PARAMS_VALUE = "token:%s";

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private StreamSinkEntityMapper sinkMapper;

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public Boolean accept(String str) {
        return Boolean.valueOf("PULSAR".equals(str));
    }

    @Override // org.apache.inlong.manager.service.source.AbstractSourceOperator
    protected String getSourceType() {
        return "PULSAR";
    }

    @Override // org.apache.inlong.manager.service.source.AbstractSourceOperator
    protected void setTargetEntity(SourceRequest sourceRequest, StreamSourceEntity streamSourceEntity) {
        PulsarSourceRequest pulsarSourceRequest = (PulsarSourceRequest) sourceRequest;
        CommonBeanUtils.copyProperties(pulsarSourceRequest, streamSourceEntity, true);
        try {
            streamSourceEntity.setExtParams(this.objectMapper.writeValueAsString(PulsarSourceDTO.getFromRequest(pulsarSourceRequest, streamSourceEntity.getExtParams())));
        } catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, String.format("serialize extParams of Pulsar SourceDTO failure: %s", e.getMessage()));
        }
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public StreamSource getFromEntity(StreamSourceEntity streamSourceEntity) {
        PulsarSource pulsarSource = new PulsarSource();
        if (streamSourceEntity == null) {
            return pulsarSource;
        }
        PulsarSourceDTO fromJson = PulsarSourceDTO.getFromJson(streamSourceEntity.getExtParams());
        CommonBeanUtils.copyProperties(streamSourceEntity, pulsarSource, true);
        CommonBeanUtils.copyProperties(fromJson, pulsarSource, true);
        pulsarSource.setFieldList(super.getSourceFields(streamSourceEntity.getId()));
        return pulsarSource;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> list, List<StreamSource> list2) {
        PulsarClusterInfo one = this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR");
        String adminUrl = one.getAdminUrl();
        String url = one.getUrl();
        String pulsarTenant = ((InlongPulsarInfo) inlongGroupInfo).getPulsarTenant();
        if (StringUtils.isBlank(pulsarTenant)) {
            pulsarTenant = one.getPulsarTenant();
        }
        HashMap newHashMap = Maps.newHashMap();
        for (InlongStreamInfo inlongStreamInfo : list) {
            PulsarSource pulsarSource = new PulsarSource();
            String inlongStreamId = inlongStreamInfo.getInlongStreamId();
            pulsarSource.setSourceName(inlongStreamId);
            pulsarSource.setPulsarTenant(pulsarTenant);
            pulsarSource.setNamespace(inlongGroupInfo.getMqResource());
            pulsarSource.setTopic(inlongStreamInfo.getMqResource());
            pulsarSource.setAdminUrl(adminUrl);
            pulsarSource.setServiceUrl(url);
            pulsarSource.setInlongComponent(true);
            if (StringUtils.isNotBlank(inlongStreamInfo.getDataType())) {
                pulsarSource.setSerializationType(DataTypeEnum.forType(inlongStreamInfo.getDataType()).getType());
            }
            pulsarSource.setWrapType(inlongStreamInfo.getWrapType());
            pulsarSource.setIgnoreParseError(inlongStreamInfo.getIgnoreParseError().booleanValue());
            if (StringUtils.isNotBlank(one.getToken())) {
                Map properties = pulsarSource.getProperties();
                properties.putIfAbsent(AUTH_CLASSNAME_KEY, AUTH_CLASSNAME_VALUE);
                properties.putIfAbsent(AUTH_PARAMS_KEY, String.format(AUTH_PARAMS_VALUE, one.getToken()));
            }
            Iterator<StreamSource> it = list2.iterator();
            while (it.hasNext()) {
                KafkaSource kafkaSource = (StreamSource) it.next();
                if (Objects.equal(inlongStreamId, kafkaSource.getInlongStreamId())) {
                    List selectByRelatedId = this.sinkMapper.selectByRelatedId(inlongGroupInfo.getInlongGroupId(), inlongStreamId);
                    if (selectByRelatedId.size() == 1) {
                        pulsarSource.setSubscription(String.format(PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION, inlongGroupInfo.getInlongClusterTag(), pulsarSource.getTopic(), ((StreamSinkEntity) selectByRelatedId.get(0)).getId()));
                    }
                    pulsarSource.setSerializationType(getSerializationType(kafkaSource, inlongStreamInfo.getDataType()));
                    if ("KAFKA".equals(kafkaSource.getSourceType())) {
                        pulsarSource.setPrimaryKey(kafkaSource.getPrimaryKey());
                    }
                }
            }
            if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
                pulsarSource.setSerializationType(DataTypeEnum.CSV.getType());
            }
            if (DataTypeEnum.CSV.getType().equalsIgnoreCase(pulsarSource.getSerializationType())) {
                pulsarSource.setDataSeparator(inlongStreamInfo.getDataSeparator());
                if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
                    pulsarSource.setDataSeparator(String.valueOf(44));
                }
            }
            if (StringUtils.isNotBlank(pulsarSource.getSubscription())) {
                pulsarSource.setScanStartupMode(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue());
            } else {
                pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
            }
            pulsarSource.setFieldList(inlongStreamInfo.getFieldList());
            ((List) newHashMap.computeIfAbsent(inlongStreamId, str -> {
                return Lists.newArrayList();
            })).add(pulsarSource);
        }
        return newHashMap;
    }
}
