/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.pojo.sort.node.provider;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;

public class KafkaProvider
implements ExtractNodeProvider,
LoadNodeProvider {
    @Override
    public Boolean accept(String streamType) {
        return "KAFKA".equals(streamType);
    }

    @Override
    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
        KafkaSource kafkaSource = (KafkaSource)streamNodeInfo;
        List<FieldInfo> fieldInfos = this.parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
        Map<String, String> properties = this.parseProperties(kafkaSource.getProperties());
        Format format = this.parsingFormat(kafkaSource.getSerializationType(), kafkaSource.getWrapType(), kafkaSource.getDataSeparator(), kafkaSource.getIgnoreParseError());
        KafkaScanStartupMode startupMode = this.parseStartupMode(kafkaSource.getAutoOffsetReset());
        String topic = kafkaSource.getTopic();
        String bootstrapServers = kafkaSource.getBootstrapServers();
        String primaryKey = kafkaSource.getPrimaryKey();
        String groupId = kafkaSource.getGroupId();
        String partitionOffset = kafkaSource.getPartitionOffsets();
        String scanTimestampMillis = kafkaSource.getTimestampMillis();
        return new KafkaExtractNode(kafkaSource.getSourceName(), kafkaSource.getSourceName(), fieldInfos, null, properties, topic, bootstrapServers, format, startupMode, primaryKey, groupId, partitionOffset, scanTimestampMillis);
    }

    @Override
    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
        KafkaSink kafkaSink = (KafkaSink)nodeInfo;
        Map<String, String> properties = this.parseProperties(kafkaSink.getProperties());
        List<FieldInfo> fieldInfos = this.parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
        List<FieldRelation> fieldRelations = this.parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
        Integer sinkParallelism = kafkaSink.getPartitionNum();
        Format format = this.parseFormat(kafkaSink.getSerializationType());
        return new KafkaLoadNode(kafkaSink.getSinkName(), kafkaSink.getSinkName(), fieldInfos, fieldRelations, (List)Lists.newArrayList(), null, kafkaSink.getTopicName(), kafkaSink.getBootstrapServers(), format, sinkParallelism, properties, kafkaSink.getPrimaryKey());
    }

    private KafkaScanStartupMode parseStartupMode(String autoOffsetReset) {
        KafkaScanStartupMode startupMode;
        KafkaOffset kafkaOffset = KafkaOffset.forName(autoOffsetReset);
        switch (kafkaOffset) {
            case EARLIEST: {
                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
                break;
            }
            case SPECIFIC: {
                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
                break;
            }
            case TIMESTAMP_MILLIS: {
                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
                break;
            }
            default: {
                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
            }
        }
        return startupMode;
    }

    private Format parseFormat(String serializationType) {
        CsvFormat format;
        DataTypeEnum dataType = DataTypeEnum.forType((String)serializationType);
        switch (dataType) {
            case CSV: {
                format = new CsvFormat();
                break;
            }
            case AVRO: {
                format = new AvroFormat();
                break;
            }
            case JSON: {
                format = new JsonFormat();
                break;
            }
            case CANAL: {
                format = new CanalJsonFormat();
                break;
            }
            case DEBEZIUM_JSON: {
                format = new DebeziumJsonFormat();
                break;
            }
            case RAW: {
                format = new RawFormat();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
            }
        }
        return format;
    }
}

