/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.pojo.audit.AuditInfo;
import org.apache.inlong.manager.common.pojo.audit.AuditRequest;
import org.apache.inlong.manager.common.pojo.audit.AuditVO;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.es.ElasticsearchApi;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class AuditServiceImpl
implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    @Value(value="${audit.query.source}")
    private String auditQuerySource = AuditQuerySource.MYSQL.name();
    @Autowired
    private AuditEntityMapper auditEntityMapper;
    @Autowired
    private ElasticsearchApi elasticsearchApi;

    @Override
    public List<AuditVO> listByCondition(AuditRequest request) throws IOException {
        LOGGER.info("begin query audit list request={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"request is null");
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        AuditQuerySource querySource = AuditQuerySource.valueOf((String)this.auditQuerySource);
        for (String auditId : request.getAuditIds()) {
            if (AuditQuerySource.MYSQL == querySource) {
                String format = "%Y-%m-%d %H:%i:00";
                DateTimeFormatter forPattern = DateTimeFormat.forPattern((String)"yyyy-MM-dd");
                DateTime dtDate = forPattern.parseDateTime(request.getDt());
                String eDate = dtDate.plusDays(1).toString(forPattern);
                List sumList = this.auditEntityMapper.sumByLogTs(request.getInlongGroupId(), request.getInlongStreamId(), auditId, request.getDt(), eDate, format);
                List auditSet = sumList.stream().map(s -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs((String)s.get("logTs"));
                    vo.setCount(Long.valueOf(((BigDecimal)s.get("total")).longValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet));
                continue;
            }
            if (AuditQuerySource.ELASTICSEARCH != querySource) continue;
            String index = String.format("%s_%s", request.getDt().replaceAll("-", ""), auditId);
            if (this.elasticsearchApi.indexExists(index)) {
                ParsedTerms terms;
                SearchResponse response = this.elasticsearchApi.search(this.toAuditSearchRequest(index, request.getInlongGroupId(), request.getInlongStreamId()));
                List aggregations = response.getAggregations().asList();
                if (!CollectionUtils.isNotEmpty((Collection)aggregations) || !CollectionUtils.isNotEmpty((Collection)(terms = (ParsedTerms)aggregations.get(0)).getBuckets())) continue;
                List auditSet = terms.getBuckets().stream().map(bucket -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setLogTs(bucket.getKeyAsString());
                    vo.setCount(Long.valueOf((long)((ParsedSum)bucket.getAggregations().asList().get(0)).getValue()));
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditSet));
                continue;
            }
            LOGGER.warn("Elasticsearch index={} not exists", (Object)index);
        }
        LOGGER.info("success to query audit list for request={}", (Object)request);
        return result;
    }

    private SearchRequest toAuditSearchRequest(String index, String groupId, String streamId) {
        TermsAggregationBuilder aggrBuilder = (TermsAggregationBuilder)((TermsAggregationBuilder)AggregationBuilders.terms((String)"log_ts").field("log_ts")).size(Integer.MAX_VALUE).subAggregation((AggregationBuilder)AggregationBuilders.sum((String)"count").field("count"));
        BoolQueryBuilder filterBuilder = new BoolQueryBuilder();
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_group_id", (String)groupId));
        filterBuilder.must((QueryBuilder)QueryBuilders.termQuery((String)"inlong_stream_id", (String)streamId));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.aggregation((AggregationBuilder)aggrBuilder);
        sourceBuilder.query((QueryBuilder)filterBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(0);
        sourceBuilder.sort("log_ts", SortOrder.ASC);
        return new SearchRequest(new String[]{index}, sourceBuilder);
    }
}

