package org.apache.kylin.common.persistence.metadata;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.AuditLog;
import org.apache.kylin.common.persistence.UnitMessages;
import org.apache.kylin.common.persistence.event.ResourceCreateOrUpdateEvent;
import org.apache.kylin.common.persistence.event.ResourceDeleteEvent;
import org.apache.kylin.common.persistence.metadata.jdbc.AuditLogRowMapper;
import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
import org.apache.kylin.common.persistence.transaction.AbstractAuditLogReplayWorker;
import org.apache.kylin.common.persistence.transaction.AuditLogReplayWorker;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.base.Joiner;
import org.apache.kylin.guava30.shaded.common.base.Strings;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.security.core.context.SecurityContextHolder;

/* loaded from: input_file:org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.class */
public class JdbcAuditLogStore implements AuditLogStore {
    public static final String AUDIT_LOG_SUFFIX = "_audit_log_v2";
    static final String AUDIT_LOG_TABLE_ID = "id";
    static final String AUDIT_LOG_TABLE_KEY = "meta_key";
    static final String CREATE_TABLE = "create.auditlog.store.table";
    static final String META_INDEX_KEY_PREFIX = "create.auditlog.store.tableindex.";
    static final String SELECT_MAX_ID_SQL = "select max(id) from %s";
    static final String SELECT_MIN_ID_SQL = "select min(id) from %s";
    static final String SELECT_COUNT_ID_RANGE = "select count(id) from %s where id > %d and id <= %d";
    static final String DELETE_ID_LESSTHAN_SQL = "delete from %s where id < ?";
    static final String SELECT_COUNT_ID_ALL = "select count(id) from %s";
    static final String SELECT_MAX_ID_WITH_OFFSET = "select id from %s order by id limit 1 offset %s ";
    private final KylinConfig config;
    private final JdbcTemplate jdbcTemplate;
    private final String table;
    protected final AbstractAuditLogReplayWorker replayWorker;
    private String instance;
    private final DataSourceTransactionManager transactionManager;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcAuditLogStore.class);
    static final String META_KEY_META_MVCC_INDEX = "meta_key_meta_mvcc_index";
    static final String META_TS_INDEX = "meta_ts_index";
    static final String[] AUDIT_LOG_INDEX_NAMES = {META_KEY_META_MVCC_INDEX, META_TS_INDEX};
    static final String AUDIT_LOG_TABLE_CONTENT = "meta_content";
    static final String AUDIT_LOG_TABLE_TS = "meta_ts";
    static final String AUDIT_LOG_TABLE_MVCC = "meta_mvcc";
    static final String AUDIT_LOG_TABLE_UNIT = "unit_id";
    static final String AUDIT_LOG_MODEL_UUID = "model_uuid";
    static final String AUDIT_LOG_TABLE_OPERATOR = "operator";
    static final String AUDIT_LOG_TABLE_INSTANCE = "instance";
    static final String AUDIT_LOG_DIFF_FLAG = "diff_flag";
    static final String INSERT_SQL = "insert into %s (" + Joiner.on(",").join("meta_key", AUDIT_LOG_TABLE_CONTENT, new Object[]{AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, "project", AUDIT_LOG_DIFF_FLAG}) + ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    public static final String SELECT_TERM = "select ";
    static final String SELECT_BY_RANGE_SQL = SELECT_TERM + Joiner.on(",").join("id", "meta_key", new Object[]{AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, "project", AUDIT_LOG_DIFF_FLAG}) + " from %s where id > %d and id <= %d order by id";
    static final String SELECT_BY_ID_SQL = SELECT_TERM + Joiner.on(",").join("id", "meta_key", new Object[]{AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, "project", AUDIT_LOG_DIFF_FLAG}) + " from %s where id in(%s) order by id";
    static final String SELECT_BY_PROJECT_RANGE_SQL = SELECT_TERM + Joiner.on(",").join("id", "meta_key", new Object[]{AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, "project", AUDIT_LOG_DIFF_FLAG}) + " from %s where meta_key like '/%s/%%' and id > %d and id <= %d order by id";
    static final String SELECT_LIST_TERM = SELECT_TERM + Joiner.on(",").join("id", "meta_key", new Object[]{AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, "project", AUDIT_LOG_DIFF_FLAG});
    static final String SELECT_TS_RANGE = SELECT_LIST_TERM + " from %s where id < %d and meta_ts between %d and %d order by id desc limit %d";
    static final String SELECT_BY_META_KET_AND_MVCC = SELECT_LIST_TERM + " from %s where meta_key = '%s' and meta_mvcc = %s";

    public JdbcAuditLogStore(KylinConfig kylinConfig) throws Exception {
        this(kylinConfig, -1);
    }

    public JdbcAuditLogStore(KylinConfig kylinConfig, int i) throws Exception {
        this.config = kylinConfig;
        StorageURL metadataUrl = kylinConfig.getMetadataUrl();
        DataSource dataSource = JdbcDataSource.getDataSource(JdbcUtil.datasourceParameters(metadataUrl));
        this.transactionManager = new DataSourceTransactionManager(dataSource);
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.jdbcTemplate.setQueryTimeout(i);
        this.table = metadataUrl.getIdentifier() + AUDIT_LOG_SUFFIX;
        this.instance = AddressUtil.getLocalInstance();
        createIfNotExist();
        this.replayWorker = new AuditLogReplayWorker(kylinConfig, this);
    }

    public JdbcAuditLogStore(KylinConfig kylinConfig, JdbcTemplate jdbcTemplate, DataSourceTransactionManager dataSourceTransactionManager, String str) throws SQLException, IOException {
        this.config = kylinConfig;
        this.jdbcTemplate = jdbcTemplate;
        this.transactionManager = dataSourceTransactionManager;
        this.table = str;
        this.instance = AddressUtil.getLocalInstance();
        createIfNotExist();
        this.replayWorker = new AuditLogReplayWorker(kylinConfig, this);
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public void save(UnitMessages unitMessages) {
        String unitId = unitMessages.getUnitId();
        String str = (String) Optional.ofNullable(SecurityContextHolder.getContext().getAuthentication()).map((v0) -> {
            return v0.getName();
        }).orElse(null);
        JdbcUtil.Callback callback = null;
        if (this.config.isUnitOfWorkSimulationEnabled() && UnitOfWork.isAlreadyInTransaction() && UnitOfWork.get().getSleepMills() > 0) {
            callback = () -> {
                long sleepMills = UnitOfWork.get().getSleepMills();
                log.debug("audit log sleep {} ", Long.valueOf(sleepMills));
                try {
                    TimeUnit.MILLISECONDS.sleep(sleepMills);
                    return null;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            };
        }
        JdbcUtil.withTransaction(this.transactionManager, () -> {
            return this.jdbcTemplate.batchUpdate(String.format(Locale.ROOT, INSERT_SQL, this.table), (List) unitMessages.getMessages().stream().map(event -> {
                if (!(event instanceof ResourceCreateOrUpdateEvent)) {
                    if (!(event instanceof ResourceDeleteEvent)) {
                        return null;
                    }
                    ResourceDeleteEvent resourceDeleteEvent = (ResourceDeleteEvent) event;
                    return new Object[]{resourceDeleteEvent.getResPath(), null, Long.valueOf(System.currentTimeMillis()), null, unitId, null, str, this.instance, resourceDeleteEvent.getKey(), false};
                }
                ResourceCreateOrUpdateEvent resourceCreateOrUpdateEvent = (ResourceCreateOrUpdateEvent) event;
                try {
                    Object[] objArr = new Object[10];
                    objArr[0] = resourceCreateOrUpdateEvent.getResPath();
                    objArr[1] = CompressionUtils.compress(ByteSource.wrap(resourceCreateOrUpdateEvent.getMetaContent()).read());
                    objArr[2] = resourceCreateOrUpdateEvent.getCreatedOrUpdated().getTs();
                    objArr[3] = Long.valueOf(resourceCreateOrUpdateEvent.getCreatedOrUpdated().getMvcc());
                    objArr[4] = unitId;
                    objArr[5] = resourceCreateOrUpdateEvent.getCreatedOrUpdated().getModelUuid();
                    objArr[6] = str;
                    objArr[7] = this.instance;
                    objArr[8] = resourceCreateOrUpdateEvent.getCreatedOrUpdated().getProject();
                    objArr[9] = Boolean.valueOf(resourceCreateOrUpdateEvent.getCreatedOrUpdated().getContentDiff() != null);
                    return objArr;
                } catch (IOException e) {
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
        }, 4, callback, -1);
    }

    public void batchInsert(List<AuditLog> list) {
        JdbcUtil.withTransaction(this.transactionManager, () -> {
            return this.jdbcTemplate.batchUpdate(String.format(Locale.ROOT, INSERT_SQL, this.table), (List) list.stream().map(auditLog -> {
                try {
                    return new Object[]{auditLog.getResPath(), CompressionUtils.compress(Objects.isNull(auditLog.getByteSource()) ? null : auditLog.getByteSource().read()), auditLog.getTimestamp(), auditLog.getMvcc(), auditLog.getUnitId(), auditLog.getModelUuid(), auditLog.getOperator(), auditLog.getInstance(), auditLog.getProject(), Boolean.valueOf(auditLog.isDiffFlag())};
                } catch (IOException e) {
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
        });
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public List<AuditLog> fetch(long j, long j2) {
        log.trace("fetch log from {} < id <= {}", Long.valueOf(j), Long.valueOf(j + j2));
        return this.jdbcTemplate.query(String.format(Locale.ROOT, SELECT_BY_RANGE_SQL, this.table, Long.valueOf(j), Long.valueOf(j + j2)), new AuditLogRowMapper());
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public List<AuditLog> fetch(List<Long> list) {
        if (CollectionUtils.isEmpty(list)) {
            return Lists.newArrayList();
        }
        log.trace("fetch log from {} =< id <= {},{}", new Object[]{list.get(0), list.get(list.size() - 1), Integer.valueOf(list.size())});
        return this.jdbcTemplate.query(String.format(Locale.ROOT, SELECT_BY_ID_SQL, this.table, (String) list.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.joining(","))), new AuditLogRowMapper());
    }

    public List<AuditLog> fetch(String str, long j, long j2) {
        log.trace("fetch log from {} < id <= {}", Long.valueOf(j), Long.valueOf(j + j2));
        return this.jdbcTemplate.query(String.format(Locale.ROOT, SELECT_BY_PROJECT_RANGE_SQL, this.table, str, Long.valueOf(j), Long.valueOf(j + j2)), new AuditLogRowMapper());
    }

    public List<AuditLog> fetchRange(long j, long j2, long j3, int i) {
        log.trace("Fetch log from {} meta_ts between {} and {}, fromId: {}.", new Object[]{this.table, Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j)});
        return this.jdbcTemplate.query(String.format(Locale.ROOT, SELECT_TS_RANGE, this.table, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Integer.valueOf(i)), new AuditLogRowMapper());
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public long getMaxId() {
        return ((Long) Optional.ofNullable(this.jdbcTemplate.queryForObject(String.format(Locale.ROOT, SELECT_MAX_ID_SQL, this.table), Long.class)).orElse(0L)).longValue();
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public long getMinId() {
        return ((Long) Optional.ofNullable(this.jdbcTemplate.queryForObject(String.format(Locale.ROOT, SELECT_MIN_ID_SQL, this.table), Long.class)).orElse(0L)).longValue();
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public long count(long j, long j2) {
        return ((Long) Optional.ofNullable(getJdbcTemplate().queryForObject(String.format(Locale.ROOT, SELECT_COUNT_ID_RANGE, this.table, Long.valueOf(j), Long.valueOf(j2)), Long.class)).orElse(0L)).longValue();
    }

    public long countAll() {
        return ((Long) Optional.ofNullable(getJdbcTemplate().queryForObject(String.format(Locale.ROOT, SELECT_COUNT_ID_ALL, this.table), Long.class)).orElse(0L)).longValue();
    }

    public long getMaxIdWithOffset(long j) {
        return ((Long) Optional.ofNullable(this.jdbcTemplate.queryForObject(String.format(Locale.ROOT, SELECT_MAX_ID_WITH_OFFSET, this.table, Long.valueOf(j)), Long.class)).orElse(0L)).longValue();
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public void restore(long j) {
        if ((!this.config.isJobNode() && !this.config.isMetadataNode()) || this.config.isUTEnv()) {
            this.replayWorker.startSchedule(j, true);
        } else {
            log.info("current maxId is {}", Long.valueOf(j));
            this.replayWorker.startSchedule(j, false);
        }
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public void setInstance(String str) {
        this.instance = str;
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public AuditLog get(String str, long j) {
        return (AuditLog) JdbcUtil.withTransaction(this.transactionManager, () -> {
            List query = this.jdbcTemplate.query(String.format(Locale.ROOT, SELECT_BY_META_KET_AND_MVCC, this.table, str, Long.valueOf(j)), new AuditLogRowMapper());
            if (query.isEmpty()) {
                return null;
            }
            return (AuditLog) query.get(0);
        });
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public void rotate() {
        long metadataAuditLogMaxSize = this.config.getMetadataAuditLogMaxSize();
        int auditLogDeleteBatchSize = KylinConfig.getInstanceFromEnv().getAuditLogDeleteBatchSize();
        long countAll = countAll();
        if (countAll <= metadataAuditLogMaxSize) {
            log.info("Audit log size:[{}] is less than or equal to maximum limit:[{}], so skip it.", Long.valueOf(countAll), Long.valueOf(metadataAuditLogMaxSize));
            return;
        }
        long j = countAll - metadataAuditLogMaxSize;
        log.info("Total audit_logs rows [{}], need to delete [{}] rows", Long.valueOf(countAll), Long.valueOf(j));
        while (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            long min = Math.min(j, auditLogDeleteBatchSize);
            log.info("delete audit_logs count: {}, cost: {}ms", Integer.valueOf(miniBatchRotate(getMaxIdWithOffset(min))), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            j -= min;
        }
    }

    public int miniBatchRotate(long j) {
        return ((Integer) JdbcUtil.withTransaction(this.transactionManager, () -> {
            return Integer.valueOf(this.jdbcTemplate.update(String.format(Locale.ROOT, DELETE_ID_LESSTHAN_SQL, this.table), new Object[]{Long.valueOf(j)}));
        })).intValue();
    }

    private Properties loadMedataProperties() throws IOException {
        String str = "metadata-jdbc-default.properties";
        if (((BasicDataSource) Objects.requireNonNull(getJdbcTemplate().getDataSource())).getDriverClassName().equals("org.postgresql.Driver")) {
            str = "metadata-jdbc-postgresql.properties";
        } else if (getJdbcTemplate().getDataSource().getDriverClassName().equals("com.mysql.jdbc.Driver")) {
            str = "metadata-jdbc-mysql.properties";
        }
        InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(str);
        Properties properties = new Properties();
        properties.load(resourceAsStream);
        return properties;
    }

    void createTableIfNotExist() throws SQLException, IOException {
        if (JdbcUtil.isTableExists(((DataSource) Objects.requireNonNull(this.jdbcTemplate.getDataSource())).getConnection(), this.table)) {
            return;
        }
        this.jdbcTemplate.execute(String.format(Locale.ROOT, loadMedataProperties().getProperty(CREATE_TABLE), this.table, "meta_key", AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, "project", AUDIT_LOG_DIFF_FLAG));
        log.info("Succeed to create table: {}", this.table);
    }

    void createIndexIfNotExist() {
        Arrays.stream(AUDIT_LOG_INDEX_NAMES).forEach(str -> {
            try {
                String str = this.table + "_" + str;
                if (JdbcUtil.isIndexExists(((DataSource) Objects.requireNonNull(this.jdbcTemplate.getDataSource())).getConnection(), this.table, str)) {
                    return;
                }
                String property = loadMedataProperties().getProperty(META_INDEX_KEY_PREFIX + str);
                if (Strings.isNullOrEmpty(property)) {
                    return;
                }
                this.jdbcTemplate.execute(String.format(Locale.ROOT, property, str, this.table));
                log.info("Succeed to create table {} index: {}", this.table, str);
            } catch (Exception e) {
                log.warn("Failed create index on table {}", this.table, e);
            }
        });
    }

    void createIfNotExist() throws SQLException, IOException {
        createTableIfNotExist();
        createIndexIfNotExist();
    }

    @VisibleForTesting
    public void forceClose() {
        this.replayWorker.close(true);
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    public long getLogOffset() {
        return this.replayWorker.getLogOffset();
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    @Generated
    public KylinConfig getConfig() {
        return this.config;
    }

    @Generated
    public JdbcTemplate getJdbcTemplate() {
        return this.jdbcTemplate;
    }

    @Generated
    public String getTable() {
        return this.table;
    }

    @Override // org.apache.kylin.common.persistence.metadata.AuditLogStore
    @Generated
    public AbstractAuditLogReplayWorker getReplayWorker() {
        return this.replayWorker;
    }

    @Generated
    public DataSourceTransactionManager getTransactionManager() {
        return this.transactionManager;
    }
}
