package org.apache.inlong.manager.service.core.impl;

import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AuditServiceImpl.class */
public class AuditServiceImpl implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
    private static final String DAY_FORMAT = "yyyy-MM-dd";

    @Value("${audit.query.source}")
    private String auditQuerySource = AuditQuerySource.MYSQL.name();

    @Autowired
    private AuditEntityMapper auditEntityMapper;

    @Autowired
    private ElasticsearchApi elasticsearchApi;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.inlong.manager.service.core.impl.AuditServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AuditServiceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim = new int[TimeStaticsDim.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[TimeStaticsDim.HOUR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[TimeStaticsDim.DAY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditVO> listByCondition(AuditRequest auditRequest) throws IOException {
        LOGGER.info("begin query audit list request={}", auditRequest);
        Preconditions.checkNotNull(auditRequest, "request is null");
        String inlongGroupId = auditRequest.getInlongGroupId();
        String inlongStreamId = auditRequest.getInlongStreamId();
        ArrayList arrayList = new ArrayList();
        AuditQuerySource valueOf = AuditQuerySource.valueOf(this.auditQuerySource);
        for (String str : auditRequest.getAuditIds()) {
            if (AuditQuerySource.MYSQL == valueOf) {
                DateTimeFormatter forPattern = DateTimeFormat.forPattern(DAY_FORMAT);
                arrayList.add(new AuditVO(str, (List) this.auditEntityMapper.sumByLogTs(inlongGroupId, inlongStreamId, str, auditRequest.getDt(), forPattern.parseDateTime(auditRequest.getDt()).plusDays(1).toString(forPattern), "%Y-%m-%d %H:%i:00").stream().map(map -> {
                    AuditInfo auditInfo = new AuditInfo();
                    auditInfo.setLogTs((String) map.get("logTs"));
                    auditInfo.setCount(Long.valueOf(((BigDecimal) map.get("total")).longValue()));
                    return auditInfo;
                }).collect(Collectors.toList())));
            } else if (AuditQuerySource.ELASTICSEARCH == valueOf) {
                String format = String.format("%s_%s", auditRequest.getDt().replaceAll("-", ""), str);
                if (this.elasticsearchApi.indexExists(format)) {
                    List asList = this.elasticsearchApi.search(toAuditSearchRequest(format, inlongGroupId, inlongStreamId)).getAggregations().asList();
                    if (CollectionUtils.isNotEmpty(asList)) {
                        ParsedTerms parsedTerms = (ParsedTerms) asList.get(0);
                        if (CollectionUtils.isNotEmpty(parsedTerms.getBuckets())) {
                            arrayList.add(new AuditVO(str, (List) parsedTerms.getBuckets().stream().map(bucket -> {
                                AuditInfo auditInfo = new AuditInfo();
                                auditInfo.setLogTs(bucket.getKeyAsString());
                                auditInfo.setCount(Long.valueOf((long) ((ParsedSum) bucket.getAggregations().asList().get(0)).getValue()));
                                return auditInfo;
                            }).collect(Collectors.toList())));
                        }
                    }
                } else {
                    LOGGER.warn("elasticsearch index={} not exists", format);
                }
            }
        }
        LOGGER.info("success to query audit list for request={}", auditRequest);
        return aggregateByTimeDim(arrayList, auditRequest.getTimeStaticsDim());
    }

    private SearchRequest toAuditSearchRequest(String str, String str2, String str3) {
        TermsAggregationBuilder subAggregation = AggregationBuilders.terms("log_ts").field("log_ts").size(Integer.MAX_VALUE).subAggregation(AggregationBuilders.sum("count").field("count"));
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.must(QueryBuilders.termQuery("inlong_group_id", str2));
        boolQueryBuilder.must(QueryBuilders.termQuery("inlong_stream_id", str3));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.aggregation(subAggregation);
        searchSourceBuilder.query(boolQueryBuilder);
        searchSourceBuilder.from(0);
        searchSourceBuilder.size(0);
        searchSourceBuilder.sort("log_ts", SortOrder.ASC);
        return new SearchRequest(new String[]{str}, searchSourceBuilder);
    }

    private List<AuditVO> aggregateByTimeDim(List<AuditVO> list, TimeStaticsDim timeStaticsDim) {
        List<AuditVO> list2;
        switch (AnonymousClass1.$SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[timeStaticsDim.ordinal()]) {
            case 1:
                list2 = doAggregate(list, HOUR_FORMAT);
                break;
            case 2:
                list2 = doAggregate(list, DAY_FORMAT);
                break;
            default:
                list2 = list;
                break;
        }
        return list2;
    }

    private List<AuditVO> doAggregate(List<AuditVO> list, String str) {
        ArrayList arrayList = new ArrayList();
        for (AuditVO auditVO : list) {
            AuditVO auditVO2 = new AuditVO();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            auditVO2.setAuditId(auditVO.getAuditId());
            for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                String formatLogTime = formatLogTime(auditInfo.getLogTs(), str);
                if (formatLogTime != null) {
                    if (concurrentHashMap.get(formatLogTime) == null) {
                        concurrentHashMap.put(formatLogTime, new AtomicLong(0L));
                    }
                    ((AtomicLong) concurrentHashMap.get(formatLogTime)).addAndGet(auditInfo.getCount().longValue());
                }
            }
            LinkedList linkedList = new LinkedList();
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                AuditInfo auditInfo2 = new AuditInfo();
                auditInfo2.setLogTs((String) entry.getKey());
                auditInfo2.setCount(Long.valueOf(((AtomicLong) entry.getValue()).get()));
                linkedList.add(auditInfo2);
            }
            auditVO2.setAuditSet(linkedList);
            arrayList.add(auditVO2);
        }
        return arrayList;
    }

    private String formatLogTime(String str, String str2) {
        String str3 = null;
        try {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(str2);
            str3 = simpleDateFormat.format(simpleDateFormat.parse(str));
        } catch (Exception e) {
            LOGGER.error("format lot time exception", e);
        }
        return str3;
    }
}
