/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
import org.apache.inlong.manager.service.audit.InlongAuditSourceOperatorFactory;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Lazy
@Service
public class AuditServiceImpl
implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private static final Gson GSON = new GsonBuilder().create();
    private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
    private static final String DAY_FORMAT = "yyyy-MM-dd";
    private static final DateTimeFormatter SECOND_DATE_FORMATTER = DateTimeFormat.forPattern((String)"yyyy-MM-dd HH:mm:ss");
    private static final DateTimeFormatter HOUR_DATE_FORMATTER = DateTimeFormat.forPattern((String)"yyyy-MM-dd HH");
    private static final DateTimeFormatter DAY_DATE_FORMATTER = DateTimeFormat.forPattern((String)"yyyy-MM-dd");
    private static final double DEFAULT_BOOST = 1.0;
    private static final boolean ADJUST_PURE_NEGATIVE = true;
    private static final int QUERY_FROM = 0;
    private static final int QUERY_SIZE = 0;
    private static final String SORT_ORDER = "ASC";
    private static final String TERM_FILED = "log_ts";
    private static final String AGGREGATIONS_COUNT = "count";
    private static final String AGGREGATIONS_DELAY = "delay";
    private static final String AGGREGATIONS = "aggregations";
    private static final String BUCKETS = "buckets";
    private static final String KEY = "key";
    private static final String VALUE = "value";
    private static final String INLONG_GROUP_ID = "inlong_group_id";
    private static final String INLONG_STREAM_ID = "inlong_stream_id";
    private static final String COUNT = "count";
    private static final String DELAY = "delay";
    private static final String TERMS = "terms";
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private final Map<String, Map<Integer, AuditBaseEntity>> auditIndicatorMap = new ConcurrentHashMap<String, Map<Integer, AuditBaseEntity>>();
    private final Map<String, AuditBaseEntity> auditItemMap = new ConcurrentHashMap<String, AuditBaseEntity>();
    @Value(value="#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForAdmin;
    @Value(value="#{'${audit.user.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForUser;
    @Value(value="${audit.query.source}")
    private String auditQuerySource;
    @Value(value="${audit.query.url:http://127.0.0.1:10080}")
    private String auditQueryUrl;
    @Autowired
    private AuditBaseEntityMapper auditBaseMapper;
    @Autowired
    private AuditEntityMapper auditEntityMapper;
    @Autowired
    private ElasticsearchApi elasticsearchApi;
    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;
    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;
    @Autowired
    private ClickHouseConfig config;
    @Autowired
    private AuditSourceEntityMapper auditSourceMapper;
    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;
    @Autowired
    private InlongAuditSourceOperatorFactory auditSourceOperatorFactory;
    @Autowired
    private RestTemplate restTemplate;

    @PostConstruct
    public void initialize() {
        LOGGER.info("init audit base item cache map for {}", (Object)AuditServiceImpl.class.getSimpleName());
        try {
            this.refreshBaseItemCache();
        }
        catch (Throwable t) {
            LOGGER.error("initialize audit base item cache error", t);
        }
    }

    @Override
    public Boolean refreshBaseItemCache() {
        LOGGER.debug("start to reload audit base item info");
        try {
            List auditBaseEntities = this.auditBaseMapper.selectAll();
            for (AuditBaseEntity auditBaseEntity : auditBaseEntities) {
                this.auditItemMap.put(auditBaseEntity.getAuditId(), auditBaseEntity);
                String type = auditBaseEntity.getType();
                Map itemMap = this.auditIndicatorMap.computeIfAbsent(type, v -> new HashMap());
                itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity);
            }
        }
        catch (Throwable t) {
            LOGGER.error("failed to reload audit base item info", t);
            return false;
        }
        LOGGER.debug("success to reload audit base item info");
        return true;
    }

    @Override
    public Integer updateAuditSource(AuditSourceRequest request, String operator) {
        InlongAuditSourceOperator auditSourceOperator = this.auditSourceOperatorFactory.getInstance(request.getType());
        request.setUrl(auditSourceOperator.convertTo(request.getUrl()));
        String offlineUrl = request.getOfflineUrl();
        if (StringUtils.isNotBlank((CharSequence)offlineUrl)) {
            this.auditSourceMapper.offlineSourceByUrl(offlineUrl);
            LOGGER.info("success offline the audit source with url: {}", (Object)offlineUrl);
        }
        AuditSourceEntity entity = (AuditSourceEntity)CommonBeanUtils.copyProperties((Object)request, AuditSourceEntity::new);
        entity.setStatus(Integer.valueOf(1));
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.auditSourceMapper.insert(entity);
        Integer id = entity.getId();
        LOGGER.info("success to insert audit source with id={}", (Object)id);
        this.config.updateRuntimeConfig();
        LOGGER.info("success to update audit source with id={}", (Object)id);
        return id;
    }

    @Override
    public AuditSourceResponse getAuditSource() {
        AuditSourceEntity entity = this.auditSourceMapper.selectOnlineSource();
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND);
        }
        LOGGER.debug("success to get audit source, id={}", (Object)entity.getId());
        return (AuditSourceResponse)CommonBeanUtils.copyProperties((Object)entity, AuditSourceResponse::new);
    }

    @Override
    public String getAuditId(String type, IndicatorType indicatorType) {
        if (StringUtils.isBlank((CharSequence)type)) {
            return null;
        }
        Map itemMap = this.auditIndicatorMap.computeIfAbsent(type, v -> new HashMap());
        AuditBaseEntity auditBaseEntity = (AuditBaseEntity)itemMap.get(indicatorType.getCode());
        if (auditBaseEntity != null) {
            return auditBaseEntity.getAuditId();
        }
        auditBaseEntity = this.auditBaseMapper.selectByTypeAndIndicatorType(type, Integer.valueOf(indicatorType.getCode()));
        Preconditions.expectNotNull((Object)auditBaseEntity, (ErrorCodeEnum)ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, (String)String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
        itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity);
        return auditBaseEntity.getAuditId();
    }

    @Override
    public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
        LOGGER.info("begin query audit list request={}", (Object)request);
        Preconditions.expectNotNull((Object)request, (String)"request is null");
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkNodeType = null;
        String sourceNodeType = null;
        Integer sinkId = request.getSinkId();
        StreamSinkEntity sinkEntity = null;
        List sinkEntityList = this.sinkEntityMapper.selectByRelatedId(groupId, streamId);
        if (sinkId != null) {
            sinkEntity = this.sinkEntityMapper.selectByPrimaryKey(sinkId);
        } else if (CollectionUtils.isNotEmpty((Collection)sinkEntityList)) {
            sinkEntity = (StreamSinkEntity)sinkEntityList.get(0);
        }
        if (sinkEntity != null) {
            sinkNodeType = sinkEntity.getSinkType();
        }
        HashMap<String, String> auditIdMap = new HashMap<String, String>();
        if (StringUtils.isNotBlank((CharSequence)groupId)) {
            InlongGroupEntity groupEntity = this.inlongGroupMapper.selectByGroupId(groupId);
            List sourceEntityList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
            if (CollectionUtils.isNotEmpty((Collection)sourceEntityList)) {
                sourceNodeType = ((StreamSourceEntity)sourceEntityList.get(0)).getSourceType();
            }
            auditIdMap.put(this.getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS), sinkNodeType);
            if (CollectionUtils.isEmpty((Collection)request.getAuditIds())) {
                if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
                    auditIdMap.put(this.getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
                    request.setAuditIds(this.getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
                } else {
                    auditIdMap.put(this.getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
                    request.setAuditIds(this.getAuditIds(groupId, streamId, null, sinkNodeType));
                }
            }
        } else if (CollectionUtils.isEmpty((Collection)request.getAuditIds())) {
            throw new BusinessException("audits id is empty");
        }
        List<AuditVO> result = new ArrayList<AuditVO>();
        AuditQuerySource querySource = AuditQuerySource.valueOf((String)this.auditQuerySource);
        CountDownLatch latch = new CountDownLatch(request.getAuditIds().size());
        for (String auditId : request.getAuditIds()) {
            String auditName;
            AuditBaseEntity auditBaseEntity = this.auditItemMap.get(auditId);
            String string = auditName = auditBaseEntity != null ? auditBaseEntity.getName() : "";
            if (AuditQuerySource.MYSQL == querySource) {
                String format = "%Y-%m-%d %H:%i:00";
                DateTime endDate = DAY_DATE_FORMATTER.parseDateTime(request.getEndDate());
                String endDateStr = endDate.plusDays(1).toString(DAY_DATE_FORMATTER);
                List sumList = StringUtils.isNotBlank((CharSequence)request.getIp()) ? this.auditEntityMapper.sumByLogTsAndIp(request.getIp(), auditId, request.getStartDate(), endDateStr, format) : this.auditEntityMapper.sumByLogTs(groupId, streamId, auditId, request.getStartDate(), endDateStr, format);
                List auditSet = sumList.stream().map(s -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setInlongGroupId((String)s.get("inlongGroupId"));
                    vo.setInlongStreamId((String)s.get("inlongStreamId"));
                    vo.setLogTs((String)s.get("logTs"));
                    vo.setCount(((BigDecimal)s.get("total")).longValue());
                    vo.setDelay(((BigDecimal)s.get("totalDelay")).longValue());
                    vo.setSize(((BigDecimal)s.get("totalSize")).longValue());
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditName, auditSet, (String)auditIdMap.getOrDefault(auditId, null)));
                continue;
            }
            if (AuditQuerySource.CLICKHOUSE != querySource) continue;
            this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, this.restTemplate, this.auditQueryUrl, auditIdMap, false));
        }
        if (AuditQuerySource.CLICKHOUSE == querySource) {
            latch.await(30L, TimeUnit.SECONDS);
        } else {
            result = this.aggregateByTimeDim(result, request.getTimeStaticsDim());
        }
        LOGGER.info("success to query audit list for request={}", (Object)request);
        return result;
    }

    @Override
    public List<AuditVO> listAll(AuditRequest request) throws Exception {
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        AuditQuerySource querySource = AuditQuerySource.valueOf((String)this.auditQuerySource);
        CountDownLatch latch = new CountDownLatch(request.getAuditIds().size());
        for (String auditId : request.getAuditIds()) {
            AuditBaseEntity auditBaseEntity = this.auditItemMap.get(auditId);
            String auditName = "";
            if (auditBaseEntity != null) {
                auditName = auditBaseEntity.getName();
            }
            if (AuditQuerySource.MYSQL == querySource) {
                DateTime endDate = SECOND_DATE_FORMATTER.parseDateTime(request.getEndDate());
                String endDateStr = endDate.plusDays(1).toString(SECOND_DATE_FORMATTER);
                List sumList = this.auditEntityMapper.sumGroupByIp(request.getInlongGroupId(), request.getInlongStreamId(), request.getIp(), auditId, request.getStartDate(), endDateStr);
                List auditSet = sumList.stream().map(s -> {
                    AuditInfo vo = new AuditInfo();
                    vo.setInlongGroupId((String)s.get("inlongGroupId"));
                    vo.setInlongStreamId((String)s.get("inlongStreamId"));
                    vo.setLogTs((String)s.get("logTs"));
                    vo.setIp((String)s.get("ip"));
                    vo.setCount(((BigDecimal)s.get("total")).longValue());
                    vo.setDelay(((BigDecimal)s.get("totalDelay")).longValue());
                    vo.setSize(((BigDecimal)s.get("totalSize")).longValue());
                    return vo;
                }).collect(Collectors.toList());
                result.add(new AuditVO(auditId, auditName, auditSet, null));
                continue;
            }
            if (AuditQuerySource.CLICKHOUSE != querySource) continue;
            this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, this.restTemplate, this.auditQueryUrl, null, true));
        }
        if (AuditQuerySource.CLICKHOUSE == querySource) {
            latch.await(30L, TimeUnit.SECONDS);
        }
        return result;
    }

    @Override
    public List<AuditBaseResponse> getAuditBases() {
        List auditBaseEntityList = this.auditBaseMapper.selectAll();
        return CommonBeanUtils.copyListProperties((List)auditBaseEntityList, AuditBaseResponse::new);
    }

    private List<String> getAuditIds(String groupId, String streamId, String sourceNodeType, String sinkNodeType) {
        boolean dpReceivedNeeded;
        HashSet<String> auditSet;
        HashSet<String> hashSet = auditSet = LoginUserUtils.getLoginUser().getRoles().contains("TENANT_ADMIN") ? new HashSet<String>(this.auditIdListForAdmin) : new HashSet<String>(this.auditIdListForUser);
        if (sinkNodeType == null) {
            auditSet.add(this.getAuditId("DATAPROXY", IndicatorType.SEND_SUCCESS));
        } else {
            auditSet.add(this.getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS));
            InlongGroupEntity inlongGroup = this.inlongGroupMapper.selectByGroupId(groupId);
            if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
                auditSet.add(this.getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS));
            } else {
                auditSet.add(this.getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS));
            }
        }
        List sourceList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
        if ((CollectionUtils.isEmpty((Collection)sourceList) || sourceList.stream().allMatch(s -> "AUTO_PUSH".equals(s.getSourceType()))) && (dpReceivedNeeded = auditSet.contains(this.getAuditId("AGENT", IndicatorType.RECEIVED_SUCCESS)))) {
            auditSet.add(this.getAuditId("DATAPROXY", IndicatorType.RECEIVED_SUCCESS));
        }
        return new ArrayList<String>(auditSet);
    }

    private PreparedStatement getAuditCkStatementGroupByLogTs(Connection connection, String groupId, String streamId, String ip, String auditId, String startDate, String endDate) throws SQLException {
        String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
        String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
        if (StringUtils.isNotBlank((CharSequence)ip)) {
            return this.getAuditCkStatementByIp(connection, auditId, ip, startDate, endDate);
        }
        String subQuery = ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("inlong_group_id = ?")).WHERE("inlong_stream_id = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString();
        String sql = ((SQL)((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, TERM_FILED, "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + subQuery + ") as sub")).GROUP_BY(new String[]{TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID})).ORDER_BY(TERM_FILED)).toString();
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, groupId);
        statement.setString(2, streamId);
        statement.setString(3, auditId);
        statement.setString(4, start);
        statement.setString(5, end);
        return statement;
    }

    private PreparedStatement getAuditCkStatementGroupByIp(Connection connection, String groupId, String streamId, String ip, String auditId, String startDate, String endDate) throws SQLException {
        if (StringUtils.isNotBlank((CharSequence)ip)) {
            return this.getAuditCkStatementByIpGroupByIp(connection, auditId, ip, startDate, endDate);
        }
        String subQuery = ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("inlong_group_id = ?")).WHERE("inlong_stream_id = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString();
        String sql = ((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "sum(count) as total", "ip", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + subQuery + ") as sub")).GROUP_BY(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip"})).toString();
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, groupId);
        statement.setString(2, streamId);
        statement.setString(3, auditId);
        statement.setString(4, startDate);
        statement.setString(5, endDate);
        return statement;
    }

    private List<AuditVO> aggregateByTimeDim(List<AuditVO> auditVOList, TimeStaticsDim timeStaticsDim) {
        List<AuditVO> result;
        switch (timeStaticsDim) {
            case HOUR: {
                result = this.doAggregate(auditVOList, HOUR_FORMAT);
                break;
            }
            case DAY: {
                result = this.doAggregate(auditVOList, DAY_FORMAT);
                break;
            }
            default: {
                result = this.doAggregate(auditVOList, SECOND_FORMAT);
            }
        }
        return result;
    }

    private List<AuditVO> doAggregate(List<AuditVO> auditVOList, String format) {
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        for (AuditVO auditVO : auditVOList) {
            AuditVO statInfo = new AuditVO();
            HashMap<String, AtomicLong> countMap = new HashMap<String, AtomicLong>();
            HashMap<String, AtomicLong> delayMap = new HashMap<String, AtomicLong>();
            HashMap<String, AtomicLong> sizeMap = new HashMap<String, AtomicLong>();
            statInfo.setAuditId(auditVO.getAuditId());
            statInfo.setAuditName(auditVO.getAuditName());
            statInfo.setNodeType(auditVO.getNodeType());
            for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                String statKey = this.formatLogTime(auditInfo.getLogTs(), format);
                if (statKey == null) continue;
                countMap.computeIfAbsent(statKey, k -> new AtomicLong(0L)).addAndGet(auditInfo.getCount());
                delayMap.computeIfAbsent(statKey, k -> new AtomicLong(0L)).addAndGet(auditInfo.getDelay());
                sizeMap.computeIfAbsent(statKey, k -> new AtomicLong(0L)).addAndGet(auditInfo.getSize());
            }
            LinkedList<AuditInfo> auditInfoList = new LinkedList<AuditInfo>();
            for (Map.Entry entry : countMap.entrySet()) {
                AuditInfo auditInfoStat = new AuditInfo();
                auditInfoStat.setLogTs((String)entry.getKey());
                long count = ((AtomicLong)entry.getValue()).get();
                auditInfoStat.setCount(((AtomicLong)entry.getValue()).get());
                auditInfoStat.setDelay(count == 0L ? 0L : ((AtomicLong)delayMap.get(entry.getKey())).get() / count);
                auditInfoStat.setSize(count == 0L ? 0L : ((AtomicLong)sizeMap.get(entry.getKey())).get() / count);
                auditInfoList.add(auditInfoStat);
            }
            statInfo.setAuditSet(auditInfoList);
            result.add(statInfo);
        }
        return result;
    }

    private String formatLogTime(String dateString, String format) {
        String formatDateString = null;
        try {
            SimpleDateFormat formatter = new SimpleDateFormat(format);
            Date date = formatter.parse(dateString);
            formatDateString = formatter.format(date);
        }
        catch (Exception e) {
            LOGGER.error("format lot time exception", (Throwable)e);
        }
        return formatDateString;
    }

    private PreparedStatement getAuditCkStatementByIp(Connection connection, String auditId, String ip, String startDate, String endDate) throws SQLException {
        String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
        String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
        String subQuery = ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("ip = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString();
        String sql = ((SQL)((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, TERM_FILED, "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + subQuery + ") as sub")).GROUP_BY(new String[]{TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID})).ORDER_BY(TERM_FILED)).toString();
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, ip);
        statement.setString(2, auditId);
        statement.setString(3, start);
        statement.setString(4, end);
        return statement;
    }

    private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection connection, String auditId, String ip, String startDate, String endDate) throws SQLException {
        String subQuery = ((SQL)((SQL)((SQL)((SQL)((SQL)((SQL)new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("ip = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString();
        String sql = ((SQL)((SQL)((SQL)new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip", "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + subQuery + ") as sub")).GROUP_BY(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip"})).toString();
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, ip);
        statement.setString(2, auditId);
        statement.setString(3, startDate);
        statement.setString(4, endDate);
        return statement;
    }
}

