/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.thirdpart.sort;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.BusinessEntity;
import org.apache.inlong.manager.dao.entity.StorageHiveFieldEntity;
import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.thirdpart.sort.SortFieldFormatUtils;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
import org.apache.inlong.sort.ZkTools;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushHiveConfigTaskListener
implements TaskEventListener {
    private static final Logger log = LoggerFactory.getLogger(PushHiveConfigTaskListener.class);
    private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<String, String>();
    private static final Map<String, TimeUnit> PARTITION_TIME_UNIT_MAP = new HashMap<String, TimeUnit>();
    @Autowired
    private ClusterBean clusterBean;
    @Autowired
    private BusinessEntityMapper businessMapper;
    @Autowired
    private StorageHiveEntityMapper storageHiveMapper;
    @Autowired
    private StorageHiveFieldEntityMapper hiveFieldMapper;
    @Autowired
    private DataStreamService dataStreamService;

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        BusinessResourceWorkflowForm form;
        BusinessInfo businessInfo;
        String groupId;
        BusinessEntity business;
        if (log.isDebugEnabled()) {
            log.debug("begin push hive config to sort, context={}", (Object)context);
        }
        if ((business = this.businessMapper.selectByIdentifier(groupId = (businessInfo = (form = (BusinessResourceWorkflowForm)context.getProcessForm()).getBusinessInfo()).getInlongGroupId())) == null || EntityStatus.IS_DELETED.getCode().equals(business.getIsDeleted())) {
            log.warn("skip to push sort hive config for groupId={}, as biz not exists or has been deleted", (Object)groupId);
            return ListenerResult.success();
        }
        String streamId = form.getInlongStreamId();
        List hiveInfoList = this.storageHiveMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
        for (StorageHiveSortInfo hiveInfo : hiveInfoList) {
            Integer storageId = hiveInfo.getId();
            if (log.isDebugEnabled()) {
                log.debug("hive storage info: {}", (Object)hiveInfo);
            }
            DataFlowInfo dataFlowInfo = this.getDataFlowInfo(business, hiveInfo);
            if (log.isDebugEnabled()) {
                log.debug("try to push hive config to sort: {}", (Object)JsonUtils.toJson((Object)dataFlowInfo));
            }
            try {
                String zkUrl = this.clusterBean.getZkUrl();
                String zkRoot = this.clusterBean.getZkRoot();
                String sortClusterName = this.clusterBean.getAppName();
                ZkTools.updateDataFlowInfo((DataFlowInfo)dataFlowInfo, (String)sortClusterName, (long)storageId.intValue(), (String)zkUrl, (String)zkRoot);
                ZkTools.addDataFlowToCluster((String)sortClusterName, (long)storageId.intValue(), (String)zkUrl, (String)zkRoot);
            }
            catch (Exception e) {
                log.error("add or update data stream information to zk failed, storageId={} ", (Object)storageId, (Object)e);
                throw new WorkflowListenerException("push hive config to sort failed, reason: " + e.getMessage());
            }
        }
        return ListenerResult.success();
    }

    private DataFlowInfo getDataFlowInfo(BusinessEntity businessEntity, StorageHiveSortInfo hiveInfo) {
        String streamId;
        String groupId = hiveInfo.getInlongGroupId();
        List fieldList = this.hiveFieldMapper.selectHiveFields(groupId, streamId = hiveInfo.getInlongStreamId());
        if (fieldList == null || fieldList.size() == 0) {
            throw new WorkflowListenerException("no hive fields for groupId=" + groupId + ", streamId=" + streamId);
        }
        SourceInfo sourceInfo = this.getSourceInfo(businessEntity, hiveInfo, fieldList);
        HiveSinkInfo sinkInfo = this.getSinkInfo(hiveInfo, fieldList);
        return new DataFlowInfo((long)hiveInfo.getId().intValue(), sourceInfo, (SinkInfo)sinkInfo);
    }

    private HiveSinkInfo getSinkInfo(StorageHiveSortInfo hiveInfo, List<StorageHiveFieldEntity> fieldList) {
        if (hiveInfo.getJdbcUrl() == null) {
            throw new WorkflowListenerException("hive server url cannot be empty");
        }
        Character separator = Character.valueOf((char)Integer.parseInt(hiveInfo.getTargetSeparator()));
        String format = hiveInfo.getFileFormat();
        Object fileFormat = "OrcFile".equalsIgnoreCase(format) ? new HiveSinkInfo.OrcFileFormat(1000) : ("SequenceFile".equalsIgnoreCase(format) ? new HiveSinkInfo.SequenceFileFormat(separator, 100) : ("Parquet".equalsIgnoreCase(format) ? new HiveSinkInfo.ParquetFileFormat() : new HiveSinkInfo.TextFileFormat(separator)));
        ArrayList<Object> partitionList = new ArrayList<Object>();
        String primary = hiveInfo.getPrimaryPartition();
        if (StringUtils.isNotEmpty((CharSequence)primary)) {
            String unit = hiveInfo.getPartitionUnit();
            HiveSinkInfo.HiveTimePartitionInfo timePartitionInfo = new HiveSinkInfo.HiveTimePartitionInfo(primary, PARTITION_TIME_FORMAT_MAP.get(unit));
            partitionList.add(timePartitionInfo);
        }
        if (StringUtils.isNotEmpty((CharSequence)hiveInfo.getSecondaryPartition())) {
            partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
        }
        StringBuilder dataPathBuilder = new StringBuilder();
        String hdfsUrl = hiveInfo.getHdfsDefaultFs();
        String warehouseDir = hiveInfo.getWarehouseDir();
        if (hdfsUrl.endsWith("/")) {
            dataPathBuilder.append(hdfsUrl, 0, hdfsUrl.length() - 1);
        } else {
            dataPathBuilder.append(hdfsUrl);
        }
        if (warehouseDir.endsWith("/")) {
            dataPathBuilder.append(warehouseDir, 0, warehouseDir.length() - 1);
        } else {
            dataPathBuilder.append(warehouseDir);
        }
        String dataPath = dataPathBuilder.append("/").append(hiveInfo.getDbName()).append(".db/").append(hiveInfo.getTableName()).toString();
        List<FieldInfo> fieldInfoList = this.getSinkFields(fieldList, hiveInfo.getPrimaryPartition());
        return new HiveSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), hiveInfo.getJdbcUrl(), hiveInfo.getDbName(), hiveInfo.getTableName(), hiveInfo.getUsername(), hiveInfo.getPassword(), dataPath, partitionList.toArray(new HiveSinkInfo.HivePartitionInfo[0]), (HiveSinkInfo.HiveFileFormat)fileFormat);
    }

    private SourceInfo getSourceInfo(BusinessEntity businessEntity, StorageHiveSortInfo info, List<StorageHiveFieldEntity> fieldList) {
        String dataType;
        TDMsgCsvDeserializationInfo deserializationInfo = null;
        boolean isDbType = "DB".equals(info.getDataSourceType());
        if (!isDbType && ("TEXT".equalsIgnoreCase(dataType = info.getDataType()) || "KEY-VALUE".equalsIgnoreCase(dataType))) {
            char separator = (char)Integer.parseInt(info.getSourceSeparator());
            deserializationInfo = new TDMsgCsvDeserializationInfo(info.getInlongStreamId(), separator);
        }
        TubeSourceInfo sourceInfo = null;
        List<FieldInfo> sourceFields = this.getSourceFields(fieldList, info.getPrimaryPartition());
        String middleWare = businessEntity.getMiddlewareType();
        if ("TUBE".equalsIgnoreCase(middleWare)) {
            String masterAddress = this.clusterBean.getTubeMaster();
            Preconditions.checkNotNull((Object)masterAddress, (String)"tube cluster address cannot be empty");
            String topic = businessEntity.getMqResourceObj();
            String consumerGroup = this.clusterBean.getAppName() + "_" + topic + "_consumer_group";
            sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup, (DeserializationInfo)deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
        } else if ("PULSAR".equalsIgnoreCase(middleWare)) {
            String tenant = this.clusterBean.getDefaultTenant();
            String namespace = businessEntity.getMqResourceObj();
            String pulsarTopic = info.getMqResourceObj();
            String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
            String adminUrl = this.clusterBean.getPulsarAdminUrl();
            String serviceUrl = this.clusterBean.getPulsarServiceUrl();
            String consumerGroup = this.clusterBean.getAppName() + "_" + pulsarTopic + "_consumer_group";
            sourceInfo = new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup, (DeserializationInfo)deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
        }
        return sourceInfo;
    }

    private List<FieldInfo> getSinkFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
        boolean duplicate = false;
        ArrayList<FieldInfo> fieldInfoList = new ArrayList<FieldInfo>();
        for (StorageHiveFieldEntity field : fieldList) {
            String fieldName = field.getFieldName();
            if (fieldName.equals(partitionField)) {
                duplicate = true;
            }
            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
            FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
            fieldInfoList.add(fieldInfo);
        }
        if (!duplicate && StringUtils.isNotEmpty((CharSequence)partitionField)) {
            FieldInfo fieldInfo = new FieldInfo(partitionField, (FormatInfo)new TimestampFormatInfo("MILLIS"));
            fieldInfoList.add(0, fieldInfo);
        }
        return fieldInfoList;
    }

    private List<FieldInfo> getSourceFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
        ArrayList<FieldInfo> fieldInfoList = new ArrayList<FieldInfo>();
        for (StorageHiveFieldEntity field : fieldList) {
            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
            String fieldName = field.getSourceFieldName();
            FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
            fieldInfoList.add(fieldInfo);
        }
        return fieldInfoList;
    }

    public boolean async() {
        return false;
    }

    static {
        PARTITION_TIME_FORMAT_MAP.put("D", "yyyyMMdd");
        PARTITION_TIME_FORMAT_MAP.put("H", "yyyyMMddHH");
        PARTITION_TIME_FORMAT_MAP.put("I", "yyyyMMddHHmm");
        PARTITION_TIME_UNIT_MAP.put("D", TimeUnit.DAYS);
        PARTITION_TIME_UNIT_MAP.put("H", TimeUnit.HOURS);
        PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
    }
}

