/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.pojo.sort.node.provider;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.springframework.stereotype.Service;

@Service
public class PulsarProvider
implements ExtractNodeProvider {
    @Override
    public Boolean accept(String sourceType) {
        return "PULSAR".equals(sourceType);
    }

    @Override
    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
        PulsarSource pulsarSource = (PulsarSource)streamNodeInfo;
        List<FieldInfo> fieldInfos = this.parseStreamFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
        Map<String, String> properties = this.parseProperties(pulsarSource.getProperties());
        String fullTopicName = pulsarSource.getPulsarTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
        Format format = this.parsingFormat(pulsarSource.getSerializationType(), pulsarSource.getWrapType(), pulsarSource.getDataSeparator(), pulsarSource.getKvSeparator(), pulsarSource.getDataEscapeChar(), pulsarSource.getIgnoreParseError());
        PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName((String)pulsarSource.getScanStartupMode());
        String primaryKey = pulsarSource.getPrimaryKey();
        String serviceUrl = pulsarSource.getServiceUrl();
        String adminUrl = pulsarSource.getAdminUrl();
        String scanStartupSubStartOffset = StringUtils.isNotBlank((CharSequence)pulsarSource.getSubscription()) ? PulsarScanStartupMode.EARLIEST.getValue() : null;
        return new PulsarExtractNode(pulsarSource.getSourceName(), pulsarSource.getSourceName(), fieldInfos, null, properties, fullTopicName, adminUrl, serviceUrl, format, startupMode.getValue(), primaryKey, pulsarSource.getSubscription(), scanStartupSubStartOffset, pulsarSource.getClientAuthPluginClassName(), pulsarSource.getClientAuthParams());
    }

    @Override
    public List<StreamField> addStreamMetaFields(List<StreamField> streamFields) {
        List fieldNames = streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
            streamFields.add(0, new StreamField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1, MetaField.AUDIT_DATA_TIME.name()));
        }
        return streamFields;
    }

    @Override
    public List<FieldInfo> getMetaFields() {
        ArrayList<FieldInfo> fieldInfos = new ArrayList<FieldInfo>();
        fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), (FormatInfo)new LongFormatInfo()));
        return fieldInfos;
    }
}

