/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
public class AuditServiceImpl
implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
    private static final String DAY_FORMAT = "yyyy-MM-dd";
    private final Map<String, AuditBaseEntity> auditSentItemMap = new ConcurrentHashMap<String, AuditBaseEntity>();
    private final Map<String, AuditBaseEntity> auditReceivedItemMap = new ConcurrentHashMap<String, AuditBaseEntity>();
    @Value(value="#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForAdmin;
    @Value(value="#{'${audit.user.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForUser;
    @Value(value="${audit.query.source}")
    private String auditQuerySource = AuditQuerySource.MYSQL.name();
    @Autowired
    private AuditBaseEntityMapper auditBaseMapper;
    @Autowired
    private AuditEntityMapper auditEntityMapper;
    @Autowired
    private ElasticsearchApi elasticsearchApi;
    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;
    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;

    @PostConstruct
    public void initialize() {
        LOGGER.info("init audit base item cache map for {}", (Object)AuditServiceImpl.class.getSimpleName());
        try {
            this.refreshBaseItemCache();
        }
        catch (Throwable t) {
            LOGGER.error("initialize audit base item cache error", t);
        }
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public Boolean refreshBaseItemCache() {
        LOGGER.debug("start to reload audit base item info");
        try {
            List auditBaseEntities = this.auditBaseMapper.selectAll();
            for (AuditBaseEntity auditBaseEntity : auditBaseEntities) {
                String type = auditBaseEntity.getType();
                if (auditBaseEntity.getIsSent() == 1) {
                    this.auditSentItemMap.put(type, auditBaseEntity);
                    continue;
                }
                this.auditReceivedItemMap.put(type, auditBaseEntity);
            }
        }
        catch (Throwable t) {
            LOGGER.error("failed to reload audit base item info", t);
            return false;
        }
        LOGGER.debug("success to reload audit base item info");
        return true;
    }

    @Override
    public String getAuditId(String type, boolean isSent) {
        AuditBaseEntity auditBaseEntity;
        if (StringUtils.isBlank((CharSequence)type)) {
            return null;
        }
        AuditBaseEntity auditBaseEntity2 = auditBaseEntity = isSent ? this.auditSentItemMap.get(type) : this.auditReceivedItemMap.get(type);
        if (auditBaseEntity != null) {
            return auditBaseEntity.getAuditId();
        }
        auditBaseEntity = this.auditBaseMapper.selectByTypeAndIsSent(type, Integer.valueOf(isSent ? 1 : 0));
        Preconditions.expectNotNull((Object)auditBaseEntity, (ErrorCodeEnum)ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, (String)String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
        if (isSent) {
            this.auditSentItemMap.put(type, auditBaseEntity);
        } else {
            this.auditReceivedItemMap.put(type, auditBaseEntity);
        }
        return auditBaseEntity.getAuditId();
    }

    @Override
    public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
        LOGGER.info("begin query audit list request={}", (Object)request);
        Preconditions.expectNotNull((Object)request, (String)"request is null");
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        List sinkEntityList = this.sinkEntityMapper.selectByRelatedId(groupId, streamId);
        String sinkNodeType = null;
        if (CollectionUtils.isNotEmpty((Collection)sinkEntityList)) {
            sinkNodeType = ((StreamSinkEntity)sinkEntityList.get(0)).getSinkType();
        }
        request.setAuditIds(this.getAuditIds(groupId, streamId, sinkNodeType));
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        AuditQuerySource querySource = AuditQuerySource.valueOf((String)this.auditQuerySource);
        for (String auditId : request.getAuditIds()) {
            if (AuditQuerySource.MYSQL == querySource) {
                String format = "%Y-%m-%d %H:%i:00";
                DateTimeFormatter forPattern = DateTimeFormat.forPattern((String)DAY_FORMAT);
                DateTime dtDate = forPattern.parseDateTime(request.getDt());
                String eDate = dtDate.plusDays(1).toString(forPattern);
                List sumList = this.auditEntityMapper.sumByLogTs(groupId, streamId, auditId, request.getDt(), eDate, format);
                List auditSet = sumList.stream().map(s -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs((String)s.get("logTs"));
                    vo.setCount(Long.valueOf(((BigDecimal)s.get("total")).longValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet, auditId.equals(this.getAuditId(sinkNodeType, true)) ? sinkNodeType : null));
                continue;
            }
            if (AuditQuerySource.ELASTICSEARCH == querySource) {
                ParsedTerms terms;
                String index = String.format("%s_%s", request.getDt().replaceAll("-", ""), auditId);
                if (!this.elasticsearchApi.indexExists(index)) {
                    LOGGER.warn("elasticsearch index={} not exists", (Object)index);
                    continue;
                }
                SearchResponse response = this.elasticsearchApi.search(this.toAuditSearchRequest(index, groupId, streamId));
                List aggregations = response.getAggregations().asList();
                if (!CollectionUtils.isNotEmpty((Collection)aggregations) || !CollectionUtils.isNotEmpty((Collection)(terms = (ParsedTerms)aggregations.get(0)).getBuckets())) continue;
                List auditSet = terms.getBuckets().stream().map(bucket -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs(bucket.getKeyAsString());
                    vo.setCount(Long.valueOf((long)((ParsedSum)bucket.getAggregations().asList().get(0)).getValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet, auditId.equals(this.getAuditId(sinkNodeType, true)) ? sinkNodeType : null));
                continue;
            }
            if (AuditQuerySource.CLICKHOUSE != querySource) continue;
            Connection connection = ClickHouseConfig.getCkConnection();
            try {
                Statement statement = connection.createStatement();
                try {
                    ResultSet resultSet = statement.executeQuery(this.toAuditCkSql(groupId, streamId, auditId, request.getDt()));
                    try {
                        ArrayList<AuditInfo> auditSet = new ArrayList<AuditInfo>();
                        while (resultSet.next()) {
                            AuditInfo vo = new AuditInfo();
                            vo.setLogTs(resultSet.getString("log_ts"));
                            vo.setCount(Long.valueOf(resultSet.getLong("total")));
                            auditSet.add(vo);
                        }
                        result.add(new AuditVO(auditId, auditSet, auditId.equals(this.getAuditId(sinkNodeType, true)) ? sinkNodeType : null));
                    }
                    finally {
                        if (resultSet == null) continue;
                        resultSet.close();
                    }
                }
                finally {
                    if (statement == null) continue;
                    statement.close();
                }
            }
            finally {
                if (connection == null) continue;
                connection.close();
            }
        }
        LOGGER.info("success to query audit list for request={}", (Object)request);
        return this.aggregateByTimeDim(result, request.getTimeStaticsDim());
    }

    private List<String> getAuditIds(String groupId, String streamId, String sinkNodeType) {
        boolean dpReceivedNeeded;
        HashSet<String> auditSet;
        HashSet<String> hashSet = auditSet = LoginUserUtils.getLoginUser().getRoles().contains("ADMIN") ? new HashSet<String>(this.auditIdListForAdmin) : new HashSet<String>(this.auditIdListForUser);
        if (sinkNodeType == null) {
            auditSet.add(this.getAuditId("DATAPROXY", true));
        } else {
            auditSet.add(this.getAuditId(sinkNodeType, false));
        }
        List sourceList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
        if ((CollectionUtils.isEmpty((Collection)sourceList) || sourceList.stream().allMatch(s -> "AUTO_PUSH".equals(s.getSourceType()))) && (dpReceivedNeeded = auditSet.contains(this.getAuditId("AGENT", false)))) {
            auditSet.add(this.getAuditId("DATAPROXY", false));
        }
        return new ArrayList<String>(auditSet);
    }

    private SearchRequest toAuditSearchRequest(String index, String groupId, String streamId) {
        TermsAggregationBuilder builder = (TermsAggregationBuilder)((TermsAggregationBuilder)AggregationBuilders.terms((String)"log_ts").field("log_ts")).size(Integer.MAX_VALUE).subAggregation((AggregationBuilder)AggregationBuilders.sum((String)"count").field("count"));
        BoolQueryBuilder filterBuilder = new BoolQueryBuilder();
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_group_id", (String)groupId));
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_stream_id", (String)streamId));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.aggregation((AggregationBuilder)builder);
        sourceBuilder.query((QueryBuilder)filterBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(0);
        sourceBuilder.sort("log_ts", SortOrder.ASC);
        return new SearchRequest(new String[]{index}, sourceBuilder);
    }

    private String toAuditCkSql(String groupId, String streamId, String auditId, String dt) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern((String)DAY_FORMAT);
        DateTime date = formatter.parseDateTime(dt);
        String startDate = date.toString(SECOND_FORMAT);
        String endDate = date.plusDays(1).toString(SECOND_FORMAT);
        return ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{"log_ts", "sum(count) as total"})).FROM("audit_data")).WHERE(new String[]{"inlong_group_id = '" + groupId + "'", "inlong_stream_id = '" + streamId + "'", "audit_id = '" + auditId + "'"})).WHERE(new String[]{"log_ts >= '" + startDate + "'", "log_ts < '" + endDate + "'"})).GROUP_BY("log_ts")).ORDER_BY("log_ts")).toString();
    }

    private List<AuditVO> aggregateByTimeDim(List<AuditVO> auditVOList, TimeStaticsDim timeStaticsDim) {
        List<AuditVO> result;
        switch (timeStaticsDim) {
            case HOUR: {
                result = this.doAggregate(auditVOList, HOUR_FORMAT);
                break;
            }
            case DAY: {
                result = this.doAggregate(auditVOList, DAY_FORMAT);
                break;
            }
            default: {
                result = auditVOList;
            }
        }
        return result;
    }

    private List<AuditVO> doAggregate(List<AuditVO> auditVOList, String format) {
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        for (AuditVO auditVO : auditVOList) {
            AuditVO statInfo = new AuditVO();
            ConcurrentHashMap<String, AtomicLong> countMap = new ConcurrentHashMap<String, AtomicLong>();
            statInfo.setAuditId(auditVO.getAuditId());
            statInfo.setNodeType(auditVO.getNodeType());
            for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                String statKey = this.formatLogTime(auditInfo.getLogTs(), format);
                if (statKey == null) continue;
                if (countMap.get(statKey) == null) {
                    countMap.put(statKey, new AtomicLong(0L));
                }
                ((AtomicLong)countMap.get(statKey)).addAndGet(auditInfo.getCount());
            }
            LinkedList<AuditInfo> auditInfoList = new LinkedList<AuditInfo>();
            for (Map.Entry entry : countMap.entrySet()) {
                AuditInfo auditInfoStat = new AuditInfo();
                auditInfoStat.setLogTs((String)entry.getKey());
                auditInfoStat.setCount(Long.valueOf(((AtomicLong)entry.getValue()).get()));
                auditInfoList.add(auditInfoStat);
            }
            statInfo.setAuditSet(auditInfoList);
            result.add(statInfo);
        }
        return result;
    }

    private String formatLogTime(String dateString, String format) {
        String formatDateString = null;
        try {
            SimpleDateFormat formatter = new SimpleDateFormat(format);
            Date date = formatter.parse(dateString);
            formatDateString = formatter.format(date);
        }
        catch (Exception e) {
            LOGGER.error("format lot time exception", (Throwable)e);
        }
        return formatDateString;
    }
}

