/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.thirdparty.hive;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
import org.apache.inlong.manager.common.pojo.query.hive.HiveColumnQueryBean;
import org.apache.inlong.manager.common.pojo.query.hive.HiveTableQueryBean;
import org.apache.inlong.manager.common.pojo.sink.SinkForSortDTO;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkDTO;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.core.DataSourceService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.thirdparty.hive.IHiveTableOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class DefaultHiveTableOperator
implements IHiveTableOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHiveTableOperator.class);
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private StreamSinkFieldEntityMapper hiveFieldMapper;
    @Autowired
    private DataSourceService<DatabaseQueryBean, HiveTableQueryBean> dataSourceService;

    @Override
    public void createHiveResource(String groupId, List<SinkForSortDTO> configList) {
        if (CollectionUtils.isEmpty(configList)) {
            LOGGER.warn("no hive config, skip to create");
            return;
        }
        for (SinkForSortDTO config : configList) {
            if (EntityStatus.SINK_CONFIG_SUCCESSFUL.getCode().equals(config.getStatus())) {
                LOGGER.warn("hive [" + config.getId() + "] already success, skip to create");
                continue;
            }
            if (Constant.DISABLE_CREATE_RESOURCE.equals(config.getEnableCreateResource())) {
                LOGGER.warn("create table was disable, skip to create table for hive [" + config.getId() + "]");
                continue;
            }
            this.createTable(groupId, config);
        }
    }

    private void createTable(String groupId, SinkForSortDTO config) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin create hive table for inlong group={}, config={}", (Object)groupId, (Object)config);
        }
        HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson((String)config.getExtParams());
        HiveTableQueryBean tableBean = this.getTableQueryBean(config, hiveInfo);
        try {
            this.dataSourceService.createDb(tableBean);
            List<ColumnInfoBean> columns = this.dataSourceService.queryColumns(tableBean);
            if (columns.size() == 0) {
                this.dataSourceService.createTable(tableBean);
            } else {
                List columnsSkipHistory = tableBean.getColumns().stream().skip(columns.size()).collect(Collectors.toList());
                if (columnsSkipHistory.size() != 0) {
                    tableBean.setColumns(columnsSkipHistory);
                    this.dataSourceService.createColumn(tableBean);
                }
            }
            this.sinkService.updateStatus(config.getId(), EntityStatus.SINK_CONFIG_SUCCESSFUL.getCode(), "create hive table success");
        }
        catch (Throwable e) {
            LOGGER.error("create hive table error, ", e);
            this.sinkService.updateStatus(config.getId(), EntityStatus.SINK_CONFIG_FAILED.getCode(), e.getMessage());
            throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
        }
        LOGGER.info("success create hive table for data group [" + groupId + "]");
    }

    protected HiveTableQueryBean getTableQueryBean(SinkForSortDTO config, HiveSinkDTO hiveInfo) {
        HiveColumnQueryBean columnBean;
        String groupId = config.getInlongGroupId();
        String streamId = config.getInlongStreamId();
        LOGGER.info("begin to get table query bean for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        List fieldEntities = this.hiveFieldMapper.selectFields(groupId, streamId);
        ArrayList<HiveColumnQueryBean> columnQueryBeans = new ArrayList<HiveColumnQueryBean>();
        for (StreamSinkFieldEntity field : fieldEntities) {
            columnBean = new HiveColumnQueryBean();
            columnBean.setColumnName(field.getFieldName());
            columnBean.setColumnType(field.getFieldType());
            columnBean.setColumnDesc(field.getFieldComment());
            columnQueryBeans.add(columnBean);
        }
        if (CollectionUtils.isNotEmpty((Collection)hiveInfo.getPartitionFieldList())) {
            for (StreamSinkFieldEntity field : hiveInfo.getPartitionFieldList()) {
                columnBean = new HiveColumnQueryBean();
                columnBean.setColumnName(field.getFieldName());
                columnBean.setPartition(true);
                columnBean.setColumnType("string");
                columnQueryBeans.add(columnBean);
            }
        }
        HiveTableQueryBean queryBean = new HiveTableQueryBean();
        queryBean.setColumns(columnQueryBeans);
        if (hiveInfo.getDataSeparator() != null) {
            char ch = (char)Integer.parseInt(hiveInfo.getDataSeparator());
            queryBean.setFieldTerSymbol(String.valueOf(ch));
        }
        queryBean.setUsername(hiveInfo.getUsername());
        queryBean.setPassword(hiveInfo.getPassword());
        queryBean.setTableName(hiveInfo.getTableName());
        queryBean.setDbName(hiveInfo.getDbName());
        queryBean.setJdbcUrl(hiveInfo.getJdbcUrl());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get table query bean={}", (Object)queryBean);
        }
        return queryBean;
    }
}

