package org.apache.kylin.common.persistence.transaction;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.constant.LogConstant;
import org.apache.kylin.common.exception.CommonErrorCode;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.persistence.AuditLog;
import org.apache.kylin.common.persistence.UnitMessages;
import org.apache.kylin.common.persistence.event.Event;
import org.apache.kylin.common.persistence.metadata.AuditLogStore;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.class */
public abstract class AbstractAuditLogReplayWorker {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractAuditLogReplayWorker.class);
    protected static final long STEP = 1000;
    protected final AuditLogStore auditLogStore;
    protected final KylinConfig config;
    protected volatile ScheduledExecutorService consumeExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("ReplayWorker"));
    protected final AtomicBoolean isStopped = new AtomicBoolean(false);
    protected final int replayWaitMaxRetryTimes;
    protected final long replayWaitMaxTimeoutMills;
    protected String modelUuid;

    /* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker$DatabaseNotAvailableException.class */
    protected static class DatabaseNotAvailableException extends KylinException {
        public DatabaseNotAvailableException(Exception exc) {
            super(CommonErrorCode.FAILED_CONNECT_META_DATABASE, exc);
        }
    }

    /* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker$EndReloadEvent.class */
    public static class EndReloadEvent {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker$FixedWindow.class */
    public static class FixedWindow {
        protected long start;
        protected long end;

        public FixedWindow(long j, long j2) {
            this.start = j;
            this.end = j2;
            Preconditions.checkState(j <= j2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isEmpty() {
            return length() == 0;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public long length() {
            return this.end - this.start;
        }

        public String toString() {
            return String.format(Locale.ROOT, "(%s,%s]", Long.valueOf(this.start), Long.valueOf(this.end));
        }

        @Generated
        public long getStart() {
            return this.start;
        }

        @Generated
        public long getEnd() {
            return this.end;
        }
    }

    /* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker$SlideWindow.class */
    protected static class SlideWindow extends FixedWindow {
        private final long upperLimit;

        public SlideWindow(FixedWindow fixedWindow) {
            super(fixedWindow.getStart(), fixedWindow.getStart());
            this.upperLimit = fixedWindow.getEnd();
            Preconditions.checkState(this.end <= this.upperLimit);
        }

        boolean canForwardRight() {
            return this.upperLimit > this.end;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean forwardRightStep(long j) {
            if (!canForwardRight()) {
                return false;
            }
            this.end = Math.min(this.end + j, this.upperLimit);
            return true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void syncRightStep() {
            this.start = this.end;
        }

        @Override // org.apache.kylin.common.persistence.transaction.AbstractAuditLogReplayWorker.FixedWindow
        public String toString() {
            return String.format(Locale.ROOT, "(%s,%s,%s]", Long.valueOf(this.start), Long.valueOf(this.end), Long.valueOf(this.upperLimit));
        }
    }

    /* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker$StartReloadEvent.class */
    public static class StartReloadEvent {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractAuditLogReplayWorker(KylinConfig kylinConfig, AuditLogStore auditLogStore) {
        this.config = kylinConfig;
        this.auditLogStore = auditLogStore;
        this.replayWaitMaxRetryTimes = kylinConfig.getReplayWaitMaxRetryTimes();
        this.replayWaitMaxTimeoutMills = kylinConfig.getReplayWaitMaxTimeout();
    }

    public abstract void startSchedule(long j, boolean z);

    public void catchup() {
        if (this.isStopped.get()) {
            return;
        }
        this.consumeExecutor.submit(() -> {
            catchupInternal(1);
        });
    }

    public void catchupFrom(long j) {
        updateOffset(j);
        catchup();
    }

    public void close(boolean z) {
        this.isStopped.set(true);
        if (z) {
            ExecutorServiceUtil.shutdownGracefully(this.consumeExecutor, 60);
        } else {
            ExecutorServiceUtil.forceShutdown(this.consumeExecutor);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void replayLogs(MessageSynchronization messageSynchronization, List<AuditLog> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        for (AuditLog auditLog : list) {
            if (this.modelUuid == null || this.modelUuid.equals(auditLog.getModelUuid())) {
                Event fromLog = Event.fromLog(auditLog);
                String unitId = auditLog.getUnitId();
                if (newLinkedHashMap.get(unitId) == null) {
                    UnitMessages unitMessages = new UnitMessages();
                    unitMessages.getMessages().add(fromLog);
                    newLinkedHashMap.put(unitId, unitMessages);
                } else {
                    ((UnitMessages) newLinkedHashMap.get(unitId)).getMessages().add(fromLog);
                }
            }
        }
        SetLogCategory setLogCategory = new SetLogCategory(LogConstant.METADATA_CATEGORY);
        Throwable th = null;
        try {
            try {
                for (UnitMessages unitMessages2 : newLinkedHashMap.values()) {
                    log.debug("replay {} event for project:{}", Integer.valueOf(unitMessages2.getMessages().size()), unitMessages2.getKey());
                    messageSynchronization.replay(unitMessages2);
                }
                if (setLogCategory != null) {
                    if (0 == 0) {
                        setLogCategory.close();
                        return;
                    }
                    try {
                        setLogCategory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (setLogCategory != null) {
                if (th != null) {
                    try {
                        setLogCategory.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    setLogCategory.close();
                }
            }
            throw th4;
        }
    }

    public abstract long getLogOffset();

    public abstract void updateOffset(long j);

    protected abstract void catchupInternal(int i);

    protected abstract boolean hasCatch(long j);

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean logAllCommit(long j, long j2) {
        return this.auditLogStore.count(j, j2) == j2 - j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleReloadAll(Exception exc) {
        log.error("Critical exception happened, try to reload metadata ", exc);
        try {
            MessageSynchronization.getInstance(KylinConfig.getInstanceFromEnv()).replayAllMetadata(false);
        } catch (Throwable th) {
            log.error("reload all failed", th);
        }
        log.info("Reload finished");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void threadWait(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void waitForCatchup(long j, long j2) throws TimeoutException {
        long currentTimeMillis = System.currentTimeMillis() + (j2 * 1000);
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                if (hasCatch(j)) {
                    return;
                } else {
                    Thread.sleep(50L);
                }
            } catch (Exception e) {
                log.info("Wait for catchup to {} failed", Long.valueOf(j), e);
                Thread.currentThread().interrupt();
            }
        }
        throw new TimeoutException(String.format(Locale.ROOT, "Cannot reach %s before %s, current is %s", Long.valueOf(j), Long.valueOf(currentTimeMillis), Long.valueOf(getLogOffset())));
    }

    public void reStartSchedule(long j) {
        if (!this.isStopped.get()) {
            log.info("replayer is running , don't need restart");
            return;
        }
        this.isStopped.set(false);
        this.consumeExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("ReplayWorker"));
        startSchedule(j, false);
    }

    @Generated
    public void setModelUuid(String str) {
        this.modelUuid = str;
    }
}
