/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.pojo.sort.util;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.OracleConstant;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtractNodeUtils {
    private static final Logger log = LoggerFactory.getLogger(ExtractNodeUtils.class);

    public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
        if (CollectionUtils.isEmpty(sourceInfos)) {
            return Lists.newArrayList();
        }
        return sourceInfos.stream().map(ExtractNodeUtils::createExtractNode).collect(Collectors.toList());
    }

    public static ExtractNode createExtractNode(StreamSource sourceInfo) {
        String sourceType;
        switch (sourceType = sourceInfo.getSourceType()) {
            case "MYSQL_BINLOG": {
                return ExtractNodeUtils.createExtractNode((MySQLBinlogSource)sourceInfo);
            }
            case "KAFKA": {
                return ExtractNodeUtils.createExtractNode((KafkaSource)sourceInfo);
            }
            case "PULSAR": {
                return ExtractNodeUtils.createExtractNode((PulsarSource)sourceInfo);
            }
            case "POSTGRESQL": {
                return ExtractNodeUtils.createExtractNode((PostgreSQLSource)sourceInfo);
            }
            case "ORACLE": {
                return ExtractNodeUtils.createExtractNode((OracleSource)sourceInfo);
            }
            case "SQLSERVER": {
                return ExtractNodeUtils.createExtractNode((SQLServerSource)sourceInfo);
            }
            case "MONGODB": {
                return ExtractNodeUtils.createExtractNode((MongoDBSource)sourceInfo);
            }
            case "TUBEMQ": {
                return ExtractNodeUtils.createExtractNode((TubeMQSource)sourceInfo);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported sourceType=%s to create extractNode", sourceType));
    }

    public static MySqlExtractNode createExtractNode(MySQLBinlogSource binlogSource) {
        String database = binlogSource.getDatabaseWhiteList();
        String primaryKey = binlogSource.getPrimaryKey();
        String hostName = binlogSource.getHostname();
        String username = binlogSource.getUser();
        String password = binlogSource.getPassword();
        Integer port = binlogSource.getPort();
        Integer serverId = null;
        if (binlogSource.getServerId() != null && binlogSource.getServerId() > 0) {
            serverId = binlogSource.getServerId();
        }
        String tables = binlogSource.getTableWhiteList();
        List tableNames = Splitter.on((String)",").splitToList((CharSequence)tables);
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
        String serverTimeZone = binlogSource.getServerTimezone();
        boolean incrementalSnapshotEnabled = true;
        Map<String, String> properties = ExtractNodeUtils.parseProperties(binlogSource.getProperties());
        if (binlogSource.isAllMigration()) {
            incrementalSnapshotEnabled = false;
            properties.put("migrate-all", "true");
        }
        if (StringUtils.isEmpty((CharSequence)primaryKey)) {
            incrementalSnapshotEnabled = false;
            properties.put("scan.incremental.snapshot.enabled", "false");
        }
        return new MySqlExtractNode(binlogSource.getSourceName(), binlogSource.getSourceName(), fieldInfos, null, properties, primaryKey, tableNames, hostName, username, password, database, port, serverId, Boolean.valueOf(incrementalSnapshotEnabled), serverTimeZone);
    }

    public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
        KafkaScanStartupMode startupMode;
        CsvFormat format;
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
        String topic = kafkaSource.getTopic();
        String bootstrapServers = kafkaSource.getBootstrapServers();
        DataTypeEnum dataType = DataTypeEnum.forName((String)kafkaSource.getSerializationType());
        switch (dataType) {
            case CSV: {
                format = new CsvFormat();
                break;
            }
            case AVRO: {
                format = new AvroFormat();
                break;
            }
            case JSON: {
                format = new JsonFormat();
                break;
            }
            case CANAL: {
                format = new CanalJsonFormat();
                break;
            }
            case DEBEZIUM_JSON: {
                format = new DebeziumJsonFormat();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
            }
        }
        KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
        switch (kafkaOffset) {
            case EARLIEST: {
                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
                break;
            }
            case SPECIFIC: {
                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
                break;
            }
            default: {
                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
            }
        }
        String primaryKey = kafkaSource.getPrimaryKey();
        String groupId = kafkaSource.getGroupId();
        Map<String, String> properties = ExtractNodeUtils.parseProperties(kafkaSource.getProperties());
        String partitionOffset = kafkaSource.getPartitionOffsets();
        return new KafkaExtractNode(kafkaSource.getSourceName(), kafkaSource.getSourceName(), fieldInfos, null, properties, topic, bootstrapServers, (Format)format, startupMode, primaryKey, groupId, partitionOffset);
    }

    public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
        AvroFormat format;
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
        String fullTopicName = pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
        DataTypeEnum dataType = DataTypeEnum.forName((String)pulsarSource.getSerializationType());
        switch (dataType) {
            case CSV: {
                String separator = DataSeparator.forAscii((int)Integer.parseInt(pulsarSource.getDataSeparator())).getSeparator();
                format = new CsvFormat(separator);
                break;
            }
            case AVRO: {
                format = new AvroFormat();
                break;
            }
            case JSON: {
                format = new JsonFormat();
                break;
            }
            case CANAL: {
                format = new CanalJsonFormat();
                break;
            }
            case DEBEZIUM_JSON: {
                format = new DebeziumJsonFormat();
                break;
            }
            case RAW: {
                format = new RawFormat();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for pulsar source", dataType));
            }
        }
        if (pulsarSource.isInlongComponent()) {
            AvroFormat innerFormat = format;
            format = new InLongMsgFormat((Format)innerFormat, Boolean.valueOf(false));
        }
        PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName((String)pulsarSource.getScanStartupMode());
        String primaryKey = pulsarSource.getPrimaryKey();
        String serviceUrl = pulsarSource.getServiceUrl();
        String adminUrl = pulsarSource.getAdminUrl();
        Map<String, String> properties = ExtractNodeUtils.parseProperties(pulsarSource.getProperties());
        return new PulsarExtractNode(pulsarSource.getSourceName(), pulsarSource.getSourceName(), fieldInfos, null, properties, fullTopicName, adminUrl, serviceUrl, (Format)format, startupMode.getValue(), primaryKey);
    }

    public static PostgresExtractNode createExtractNode(PostgreSQLSource postgreSQLSource) {
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(postgreSQLSource.getFieldList(), postgreSQLSource.getSourceName());
        Map<String, String> properties = ExtractNodeUtils.parseProperties(postgreSQLSource.getProperties());
        return new PostgresExtractNode(postgreSQLSource.getSourceName(), postgreSQLSource.getSourceName(), fieldInfos, null, properties, postgreSQLSource.getPrimaryKey(), postgreSQLSource.getTableNameList(), postgreSQLSource.getHostname(), postgreSQLSource.getUsername(), postgreSQLSource.getPassword(), postgreSQLSource.getDatabase(), postgreSQLSource.getSchema(), postgreSQLSource.getPort(), postgreSQLSource.getDecodingPluginName());
    }

    public static OracleExtractNode createExtractNode(OracleSource source) {
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(source.getFieldList(), source.getSourceName());
        OracleConstant.ScanStartUpMode scanStartupMode = StringUtils.isBlank((CharSequence)source.getScanStartupMode()) ? null : OracleConstant.ScanStartUpMode.forName((String)source.getScanStartupMode());
        Map<String, String> properties = ExtractNodeUtils.parseProperties(source.getProperties());
        return new OracleExtractNode(source.getSourceName(), source.getSourceName(), fieldInfos, null, properties, source.getPrimaryKey(), source.getHostname(), source.getUsername(), source.getPassword(), source.getDatabase(), source.getSchemaName(), source.getTableName(), source.getPort(), scanStartupMode);
    }

    public static SqlServerExtractNode createExtractNode(SQLServerSource source) {
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(source.getFieldList(), source.getSourceName());
        Map<String, String> properties = ExtractNodeUtils.parseProperties(source.getProperties());
        return new SqlServerExtractNode(source.getSourceName(), source.getSourceName(), fieldInfos, null, properties, source.getPrimaryKey(), source.getHostname(), source.getPort(), source.getUsername(), source.getPassword(), source.getDatabase(), source.getSchemaName(), source.getTableName(), source.getServerTimezone());
    }

    public static MongoExtractNode createExtractNode(MongoDBSource source) {
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(source.getFieldList(), source.getSourceName());
        Map<String, String> properties = ExtractNodeUtils.parseProperties(source.getProperties());
        return new MongoExtractNode(source.getSourceName(), source.getSourceName(), fieldInfos, null, properties, source.getCollection(), source.getHosts(), source.getUsername(), source.getPassword(), source.getDatabase());
    }

    public static TubeMQExtractNode createExtractNode(TubeMQSource source) {
        List<FieldInfo> fieldInfos = ExtractNodeUtils.parseFieldInfos(source.getFieldList(), source.getSourceName());
        Map<String, String> properties = ExtractNodeUtils.parseProperties(source.getProperties());
        return new TubeMQExtractNode(source.getSourceName(), source.getSourceName(), fieldInfos, null, properties, source.getMasterRpc(), source.getTopic(), source.getSerializationType(), source.getGroupId(), source.getSessionKey(), source.getTid());
    }

    private static List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, String nodeId) {
        return streamFields.stream().filter(s -> s.getFieldValue() == null).map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId)).collect(Collectors.toList());
    }

    private static Map<String, String> parseProperties(Map<String, Object> properties) {
        return properties.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
    }
}

