/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sort.util;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSource;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSource;
import org.apache.inlong.manager.service.sort.util.FieldInfoUtils;
import org.apache.inlong.sort.protocol.constant.OracleConstant;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtractNodeUtils {
    private static final Logger log = LoggerFactory.getLogger(ExtractNodeUtils.class);

    public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
        if (CollectionUtils.isEmpty(sourceInfos)) {
            return Lists.newArrayList();
        }
        return sourceInfos.stream().map(ExtractNodeUtils::createExtractNode).collect(Collectors.toList());
    }

    public static ExtractNode createExtractNode(StreamSource sourceInfo) {
        SourceType sourceType = SourceType.forType((String)sourceInfo.getSourceType());
        switch (sourceType) {
            case BINLOG: {
                return ExtractNodeUtils.createExtractNode((MySQLBinlogSource)sourceInfo);
            }
            case KAFKA: {
                return ExtractNodeUtils.createExtractNode((KafkaSource)sourceInfo);
            }
            case PULSAR: {
                return ExtractNodeUtils.createExtractNode((PulsarSource)sourceInfo);
            }
            case POSTGRES: {
                return ExtractNodeUtils.createExtractNode((PostgresSource)sourceInfo);
            }
            case ORACLE: {
                return ExtractNodeUtils.createExtractNode((OracleSource)sourceInfo);
            }
            case SQLSERVER: {
                return ExtractNodeUtils.createExtractNode((SqlServerSource)sourceInfo);
            }
            case MONGODB: {
                return ExtractNodeUtils.createExtractNode((MongoDBSource)sourceInfo);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported sourceType=%s to create extractNode", sourceType));
    }

    public static MySqlExtractNode createExtractNode(MySQLBinlogSource binlogSource) {
        String id = binlogSource.getSourceName();
        String name = binlogSource.getSourceName();
        String database = binlogSource.getDatabaseWhiteList();
        String primaryKey = binlogSource.getPrimaryKey();
        String hostName = binlogSource.getHostname();
        String userName = binlogSource.getUser();
        String password = binlogSource.getPassword();
        Integer port = binlogSource.getPort();
        Integer serverId = null;
        if (binlogSource.getServerId() != null && binlogSource.getServerId() > 0) {
            serverId = binlogSource.getServerId();
        }
        String tables = binlogSource.getTableWhiteList();
        List tableNames = Splitter.on((String)",").splitToList((CharSequence)tables);
        List streamFields = binlogSource.getFieldList();
        List fieldInfos = streamFields.stream().map(streamField -> FieldInfoUtils.parseStreamFieldInfo(streamField, name)).collect(Collectors.toList());
        String serverTimeZone = binlogSource.getServerTimezone();
        boolean incrementalSnapshotEnabled = true;
        HashMap properties = Maps.newHashMap();
        if (binlogSource.isAllMigration()) {
            incrementalSnapshotEnabled = false;
            properties.put("migrate-all", "true");
        }
        properties.put("append-mode", "true");
        if (StringUtils.isEmpty((CharSequence)primaryKey)) {
            incrementalSnapshotEnabled = false;
            properties.put("scan.incremental.snapshot.enabled", "false");
        }
        return new MySqlExtractNode(id, name, fieldInfos, null, (Map)properties, primaryKey, tableNames, hostName, userName, password, database, port, serverId, Boolean.valueOf(incrementalSnapshotEnabled), serverTimeZone);
    }

    public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
        KafkaScanStartupMode startupMode;
        CsvFormat format;
        String id = kafkaSource.getSourceName();
        String name = kafkaSource.getSourceName();
        List streamFields = kafkaSource.getFieldList();
        List fieldInfos = streamFields.stream().map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)).collect(Collectors.toList());
        String topic = kafkaSource.getTopic();
        String bootstrapServers = kafkaSource.getBootstrapServers();
        DataTypeEnum dataType = DataTypeEnum.forName((String)kafkaSource.getSerializationType());
        switch (dataType) {
            case CSV: {
                format = new CsvFormat();
                break;
            }
            case AVRO: {
                format = new AvroFormat();
                break;
            }
            case JSON: {
                format = new JsonFormat();
                break;
            }
            case CANAL: {
                format = new CanalJsonFormat();
                break;
            }
            case DEBEZIUM_JSON: {
                format = new DebeziumJsonFormat();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
            }
        }
        KafkaOffset kafkaOffset = KafkaOffset.forName((String)kafkaSource.getAutoOffsetReset());
        switch (kafkaOffset) {
            case EARLIEST: {
                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
                break;
            }
            default: {
                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
            }
        }
        String primaryKey = kafkaSource.getPrimaryKey();
        String groupId = kafkaSource.getGroupId();
        return new KafkaExtractNode(id, name, fieldInfos, null, (Map)Maps.newHashMap(), topic, bootstrapServers, (Format)format, startupMode, primaryKey, groupId);
    }

    public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
        CsvFormat format;
        String id = pulsarSource.getSourceName();
        String name = pulsarSource.getSourceName();
        List streamFields = pulsarSource.getFieldList();
        List fieldInfos = streamFields.stream().map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)).collect(Collectors.toList());
        String fullTopicName = pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
        DataTypeEnum dataType = DataTypeEnum.forName((String)pulsarSource.getSerializationType());
        switch (dataType) {
            case CSV: {
                format = new CsvFormat();
                break;
            }
            case AVRO: {
                format = new AvroFormat();
                break;
            }
            case JSON: {
                format = new JsonFormat();
                break;
            }
            case CANAL: {
                format = new CanalJsonFormat();
                break;
            }
            case DEBEZIUM_JSON: {
                format = new DebeziumJsonFormat();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for pulsar source", dataType));
            }
        }
        if (pulsarSource.isInlongComponent()) {
            CsvFormat innerFormat = format;
            format = new InLongMsgFormat((Format)innerFormat, Boolean.valueOf(false));
        }
        PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName((String)pulsarSource.getScanStartupMode());
        String primaryKey = pulsarSource.getPrimaryKey();
        String serviceUrl = pulsarSource.getServiceUrl();
        String adminUrl = pulsarSource.getAdminUrl();
        return new PulsarExtractNode(id, name, fieldInfos, null, (Map)Maps.newHashMap(), fullTopicName, adminUrl, serviceUrl, (Format)format, startupMode.getValue(), primaryKey);
    }

    public static PostgresExtractNode createExtractNode(PostgresSource postgresSource) {
        List streamFields = postgresSource.getFieldList();
        String id = postgresSource.getSourceName();
        String name = postgresSource.getSourceName();
        List fields = streamFields.stream().map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)).collect(Collectors.toList());
        return new PostgresExtractNode(id, name, fields, null, null, postgresSource.getPrimaryKey(), postgresSource.getTableNameList(), postgresSource.getHostname(), postgresSource.getUsername(), postgresSource.getPassword(), postgresSource.getDatabase(), postgresSource.getSchema(), Integer.valueOf(postgresSource.getPort()), postgresSource.getDecodingPluginName());
    }

    public static OracleExtractNode createExtractNode(OracleSource source) {
        String name = source.getSourceName();
        List streamFieldInfos = source.getFieldList();
        List fieldInfos = streamFieldInfos.stream().map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)).collect(Collectors.toList());
        HashMap properties = Maps.newHashMap();
        OracleConstant.ScanStartUpMode scanStartupMode = StringUtils.isBlank((CharSequence)source.getScanStartupMode()) ? null : OracleConstant.ScanStartUpMode.forName((String)source.getScanStartupMode());
        return new OracleExtractNode(name, name, fieldInfos, null, (Map)properties, source.getPrimaryKey(), source.getHostname(), source.getUsername(), source.getPassword(), source.getDatabase(), source.getSchemaName(), source.getTableName(), source.getPort(), scanStartupMode);
    }

    public static SqlServerExtractNode createExtractNode(SqlServerSource source) {
        String name = source.getSourceName();
        List streamFields = source.getFieldList();
        List fieldInfos = streamFields.stream().map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name)).collect(Collectors.toList());
        HashMap properties = Maps.newHashMap();
        return new SqlServerExtractNode(name, name, fieldInfos, null, (Map)properties, source.getPrimaryKey(), source.getHostname(), Integer.valueOf(source.getPort()), source.getUsername(), source.getPassword(), source.getDatabase(), source.getSchemaName(), source.getTableName(), source.getServerTimezone());
    }

    public static MongoExtractNode createExtractNode(MongoDBSource source) {
        String name = source.getSourceName();
        List streamFields = source.getFieldList();
        List fieldInfos = streamFields.stream().map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)).collect(Collectors.toList());
        HashMap properties = Maps.newHashMap();
        return new MongoExtractNode(name, name, fieldInfos, null, (Map)properties, source.getPrimaryKey(), source.getCollection(), source.getHosts(), source.getUsername(), source.getPassword(), source.getDatabase());
    }
}

