/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.ibatis.jdbc.SQL;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class AuditServiceImpl
implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
    private static final String DAY_FORMAT = "yyyy-MM-dd";
    @Value(value="#{'${audit.admin.ids:3,4,5,6,7,8}'.split(',')}")
    private List<String> auditIdListForAdmin;
    @Value(value="#{'${audit.user.ids:3,4,5,6,7,8}'.split(',')}")
    private List<String> auditIdListForUser;
    @Value(value="${audit.query.source}")
    private String auditQuerySource = AuditQuerySource.MYSQL.name();
    @Autowired
    private AuditEntityMapper auditEntityMapper;
    @Autowired
    private ElasticsearchApi elasticsearchApi;
    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;
    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;

    @Override
    public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
        LOGGER.info("begin query audit list request={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"request is null");
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        request.setAuditIds(this.getAuditIds(groupId, streamId));
        List sinkEntityList = this.sinkEntityMapper.selectByRelatedId(groupId, streamId);
        String sinkNodeType = null;
        if (CollectionUtils.isNotEmpty((Collection)sinkEntityList)) {
            sinkNodeType = ((StreamSinkEntity)sinkEntityList.get(0)).getSinkType();
        }
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        AuditQuerySource querySource = AuditQuerySource.valueOf((String)this.auditQuerySource);
        for (String auditId : request.getAuditIds()) {
            if (AuditQuerySource.MYSQL == querySource) {
                String format = "%Y-%m-%d %H:%i:00";
                DateTimeFormatter forPattern = DateTimeFormat.forPattern((String)DAY_FORMAT);
                DateTime dtDate = forPattern.parseDateTime(request.getDt());
                String eDate = dtDate.plusDays(1).toString(forPattern);
                List sumList = this.auditEntityMapper.sumByLogTs(groupId, streamId, auditId, request.getDt(), eDate, format);
                List auditSet = sumList.stream().map(s -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs((String)s.get("logTs"));
                    vo.setCount(Long.valueOf(((BigDecimal)s.get("total")).longValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet, auditId.equals("8") ? sinkNodeType : null));
                continue;
            }
            if (AuditQuerySource.ELASTICSEARCH == querySource) {
                ParsedTerms terms;
                String index = String.format("%s_%s", request.getDt().replaceAll("-", ""), auditId);
                if (!this.elasticsearchApi.indexExists(index)) {
                    LOGGER.warn("elasticsearch index={} not exists", (Object)index);
                    continue;
                }
                SearchResponse response = this.elasticsearchApi.search(this.toAuditSearchRequest(index, groupId, streamId));
                List aggregations = response.getAggregations().asList();
                if (!CollectionUtils.isNotEmpty((Collection)aggregations) || !CollectionUtils.isNotEmpty((Collection)(terms = (ParsedTerms)aggregations.get(0)).getBuckets())) continue;
                List auditSet = terms.getBuckets().stream().map(bucket -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs(bucket.getKeyAsString());
                    vo.setCount(Long.valueOf((long)((ParsedSum)bucket.getAggregations().asList().get(0)).getValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet, auditId.equals("8") ? sinkNodeType : null));
                continue;
            }
            if (AuditQuerySource.CLICKHOUSE != querySource) continue;
            Connection connection = ClickHouseConfig.getCkConnection();
            Throwable throwable = null;
            try {
                Statement statement = connection.createStatement();
                Throwable throwable2 = null;
                try {
                    ResultSet resultSet = statement.executeQuery(this.toAuditCkSql(groupId, streamId, auditId, request.getDt()));
                    Throwable throwable3 = null;
                    try {
                        ArrayList<AuditInfo> auditSet = new ArrayList<AuditInfo>();
                        while (resultSet.next()) {
                            AuditInfo vo = new AuditInfo();
                            vo.setLogTs(resultSet.getString("log_ts"));
                            vo.setCount(Long.valueOf(resultSet.getLong("total")));
                            auditSet.add(vo);
                        }
                        result.add(new AuditVO(auditId, auditSet, auditId.equals("8") ? sinkNodeType : null));
                    }
                    catch (Throwable throwable4) {
                        throwable3 = throwable4;
                        throw throwable4;
                    }
                    finally {
                        if (resultSet == null) continue;
                        if (throwable3 != null) {
                            try {
                                resultSet.close();
                            }
                            catch (Throwable throwable5) {
                                throwable3.addSuppressed(throwable5);
                            }
                            continue;
                        }
                        resultSet.close();
                    }
                }
                catch (Throwable throwable6) {
                    throwable2 = throwable6;
                    throw throwable6;
                }
                finally {
                    if (statement == null) continue;
                    if (throwable2 != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable throwable7) {
                            throwable2.addSuppressed(throwable7);
                        }
                        continue;
                    }
                    statement.close();
                }
            }
            catch (Throwable throwable8) {
                throwable = throwable8;
                throw throwable8;
            }
            finally {
                if (connection == null) continue;
                if (throwable != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable9) {
                        throwable.addSuppressed(throwable9);
                    }
                    continue;
                }
                connection.close();
            }
        }
        LOGGER.info("success to query audit list for request={}", (Object)request);
        return this.aggregateByTimeDim(result, request.getTimeStaticsDim());
    }

    private List<String> getAuditIds(String groupId, String streamId) {
        List<String> auditIds = LoginUserUtils.getLoginUser().getRoles().contains("ADMIN") ? this.auditIdListForAdmin : this.auditIdListForUser;
        List sourceList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isEmpty((Collection)sourceList) || sourceList.stream().allMatch(s -> "AUTO_PUSH".equals(s.getSourceType()))) {
            boolean dpReceivedNeeded;
            boolean bl = dpReceivedNeeded = auditIds.contains("3") && !auditIds.contains("5");
            if (dpReceivedNeeded) {
                auditIds.add("5");
            }
        }
        if (this.sinkEntityMapper.selectCount(groupId, streamId) == 0) {
            boolean dpSentNeeded;
            boolean bl = dpSentNeeded = auditIds.contains("8") && !auditIds.contains("6");
            if (dpSentNeeded) {
                auditIds.add("6");
            }
        }
        return auditIds;
    }

    private SearchRequest toAuditSearchRequest(String index, String groupId, String streamId) {
        TermsAggregationBuilder builder = (TermsAggregationBuilder)((TermsAggregationBuilder)AggregationBuilders.terms((String)"log_ts").field("log_ts")).size(Integer.MAX_VALUE).subAggregation((AggregationBuilder)AggregationBuilders.sum((String)"count").field("count"));
        BoolQueryBuilder filterBuilder = new BoolQueryBuilder();
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_group_id", (String)groupId));
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_stream_id", (String)streamId));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.aggregation((AggregationBuilder)builder);
        sourceBuilder.query((QueryBuilder)filterBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(0);
        sourceBuilder.sort("log_ts", SortOrder.ASC);
        return new SearchRequest(new String[]{index}, sourceBuilder);
    }

    private String toAuditCkSql(String groupId, String streamId, String auditId, String dt) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern((String)DAY_FORMAT);
        DateTime date = formatter.parseDateTime(dt);
        String startDate = date.toString(SECOND_FORMAT);
        String endDate = date.plusDays(1).toString(SECOND_FORMAT);
        return ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{"log_ts", "sum(count) as total"})).FROM("audit_data")).WHERE(new String[]{"inlong_group_id = '" + groupId + "'", "inlong_stream_id = '" + streamId + "'", "audit_id = '" + auditId + "'"})).WHERE(new String[]{"log_ts >= '" + startDate + "'", "log_ts < '" + endDate + "'"})).GROUP_BY("log_ts")).ORDER_BY("log_ts")).toString();
    }

    private List<AuditVO> aggregateByTimeDim(List<AuditVO> auditVOList, TimeStaticsDim timeStaticsDim) {
        List<AuditVO> result;
        switch (timeStaticsDim) {
            case HOUR: {
                result = this.doAggregate(auditVOList, HOUR_FORMAT);
                break;
            }
            case DAY: {
                result = this.doAggregate(auditVOList, DAY_FORMAT);
                break;
            }
            default: {
                result = auditVOList;
            }
        }
        return result;
    }

    private List<AuditVO> doAggregate(List<AuditVO> auditVOList, String format) {
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        for (AuditVO auditVO : auditVOList) {
            AuditVO statInfo = new AuditVO();
            ConcurrentHashMap<String, AtomicLong> countMap = new ConcurrentHashMap<String, AtomicLong>();
            statInfo.setAuditId(auditVO.getAuditId());
            statInfo.setNodeType(auditVO.getNodeType());
            for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                String statKey = this.formatLogTime(auditInfo.getLogTs(), format);
                if (statKey == null) continue;
                if (countMap.get(statKey) == null) {
                    countMap.put(statKey, new AtomicLong(0L));
                }
                ((AtomicLong)countMap.get(statKey)).addAndGet(auditInfo.getCount());
            }
            LinkedList<AuditInfo> auditInfoList = new LinkedList<AuditInfo>();
            for (Map.Entry entry : countMap.entrySet()) {
                AuditInfo auditInfoStat = new AuditInfo();
                auditInfoStat.setLogTs((String)entry.getKey());
                auditInfoStat.setCount(Long.valueOf(((AtomicLong)entry.getValue()).get()));
                auditInfoList.add(auditInfoStat);
            }
            statInfo.setAuditSet(auditInfoList);
            result.add(statInfo);
        }
        return result;
    }

    private String formatLogTime(String dateString, String format) {
        String formatDateString = null;
        try {
            SimpleDateFormat formatter = new SimpleDateFormat(format);
            Date date = formatter.parse(dateString);
            formatDateString = formatter.format(date);
        }
        catch (Exception e) {
            LOGGER.error("format lot time exception", (Throwable)e);
        }
        return formatDateString;
    }
}

