/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.rebalance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebalanceChecker
extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceChecker.class);
    private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
    private final ExecutorService _executorService;

    public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, ExecutorService executorService) {
        super(RebalanceChecker.class.getSimpleName(), config.getRebalanceCheckerFrequencyInSeconds(), config.getRebalanceCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._executorService = executorService;
    }

    @Override
    protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) {
        int numTables = tableNamesWithType.size();
        LOGGER.info("Processing {} tables in task: {}", (Object)numTables, (Object)this._taskName);
        int numTablesProcessed = this.retryRebalanceTables(new HashSet<String>(tableNamesWithType));
        this._controllerMetrics.setValueOfGlobalGauge((AbstractMetrics.Gauge)ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, this._taskName, (long)numTablesProcessed);
        LOGGER.info("Finish processing {}/{} tables in task: {}", new Object[]{numTablesProcessed, numTables, this._taskName});
    }

    private synchronized int retryRebalanceTables(Set<String> tableNamesWithType) {
        Map<String, Map<String, String>> allJobMetadataByJobId = this._pinotHelixResourceManager.getAllJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE), jobMetadata -> tableNamesWithType.contains(jobMetadata.get("tableName")));
        HashMap tableJobMetadataMap = new HashMap();
        allJobMetadataByJobId.forEach((jobId, jobMetadata) -> {
            String tableNameWithType = (String)jobMetadata.get("tableName");
            tableJobMetadataMap.computeIfAbsent(tableNameWithType, k -> new HashMap()).put(jobId, jobMetadata);
        });
        int numTablesProcessed = 0;
        for (Map.Entry entry : tableJobMetadataMap.entrySet()) {
            String tableNameWithType = (String)entry.getKey();
            Map allJobMetadata = (Map)entry.getValue();
            try {
                LOGGER.info("Start to retry rebalance for table: {} with {} rebalance jobs tracked", (Object)tableNameWithType, (Object)allJobMetadata.size());
                this.retryRebalanceTable(tableNameWithType, allJobMetadata);
                ++numTablesProcessed;
            }
            catch (Exception e) {
                LOGGER.error("Failed to retry rebalance for table: {}", (Object)tableNameWithType, (Object)e);
                this._controllerMetrics.addMeteredTableValue(tableNameWithType + "." + this._taskName, (AbstractMetrics.Meter)ControllerMeter.PERIODIC_TASK_ERROR, 1L);
            }
        }
        return numTablesProcessed;
    }

    @VisibleForTesting
    void retryRebalanceTable(String tableNameWithType, Map<String, Map<String, String>> allJobMetadata) throws Exception {
        long retryDelayMs;
        Map<String, Set<Pair<TableRebalanceContext, Long>>> candidateJobs = RebalanceChecker.getCandidateJobs(tableNameWithType, allJobMetadata);
        if (candidateJobs.isEmpty()) {
            LOGGER.info("Found no failed rebalance jobs for table: {}. Skip retry", (Object)tableNameWithType);
            return;
        }
        this._controllerMetrics.addMeteredTableValue(tableNameWithType, (AbstractMetrics.Meter)ControllerMeter.TABLE_REBALANCE_FAILURE_DETECTED, 1L);
        Pair<TableRebalanceContext, Long> jobContextAndStartTime = RebalanceChecker.getLatestJob(candidateJobs);
        if (jobContextAndStartTime == null) {
            LOGGER.info("Rebalance has been retried too many times for table: {}. Skip retry", (Object)tableNameWithType);
            this._controllerMetrics.addMeteredTableValue(tableNameWithType, (AbstractMetrics.Meter)ControllerMeter.TABLE_REBALANCE_RETRY_TOO_MANY_TIMES, 1L);
            return;
        }
        TableRebalanceContext jobCtx = (TableRebalanceContext)jobContextAndStartTime.getLeft();
        String prevJobId = jobCtx.getJobId();
        RebalanceConfig rebalanceConfig = jobCtx.getConfig();
        long jobStartTimeMs = (Long)jobContextAndStartTime.getRight();
        if (jobStartTimeMs + (retryDelayMs = RebalanceChecker.getRetryDelayInMs(rebalanceConfig.getRetryInitialDelayInMs(), jobCtx.getAttemptId())) > System.currentTimeMillis()) {
            LOGGER.info("Delay retry for failed rebalance job: {} that started at: {}, by: {}ms", new Object[]{prevJobId, jobStartTimeMs, retryDelayMs});
            return;
        }
        RebalanceChecker.abortExistingJobs(tableNameWithType, this._pinotHelixResourceManager);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
        Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (String)"Failed to find table config for table: %s", (Object)tableNameWithType);
        this._executorService.submit(() -> {
            try {
                this.retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
            }
            catch (Throwable t) {
                LOGGER.error("Failed to retry rebalance for table: {} asynchronously", (Object)tableNameWithType, (Object)t);
            }
        });
    }

    private void retryRebalanceTableWithContext(String tableNameWithType, TableConfig tableConfig, TableRebalanceContext jobCtx) {
        String prevJobId = jobCtx.getJobId();
        RebalanceConfig rebalanceConfig = jobCtx.getConfig();
        TableRebalanceContext retryCtx = TableRebalanceContext.forRetry(jobCtx.getOriginalJobId(), rebalanceConfig, jobCtx.getAttemptId() + 1);
        String attemptJobId = retryCtx.getJobId();
        LOGGER.info("Retry rebalance job: {} for table: {} with attempt job: {}", new Object[]{prevJobId, tableNameWithType, attemptJobId});
        this._controllerMetrics.addMeteredTableValue(tableNameWithType, (AbstractMetrics.Meter)ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
        ZkBasedTableRebalanceObserver observer = new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, retryCtx, this._pinotHelixResourceManager);
        RebalanceResult result = this._pinotHelixResourceManager.rebalanceTable(tableNameWithType, tableConfig, attemptJobId, rebalanceConfig, observer);
        LOGGER.info("New attempt: {} for table: {} is done with result status: {}", new Object[]{attemptJobId, tableNameWithType, result.getStatus()});
    }

    @VisibleForTesting
    static long getRetryDelayInMs(long initDelayMs, int attemptId) {
        double minDelayMs = (double)initDelayMs * Math.pow(2.0, attemptId - 1);
        double maxDelayMs = minDelayMs * 2.0;
        return RandomUtils.nextLong((long)((long)minDelayMs), (long)((long)maxDelayMs));
    }

    private static void abortExistingJobs(String tableNameWithType, PinotHelixResourceManager pinotHelixResourceManager) {
        boolean updated = pinotHelixResourceManager.updateJobsForTable(tableNameWithType, ControllerJobType.TABLE_REBALANCE, jobMetadata -> {
            String jobId = (String)jobMetadata.get("jobId");
            try {
                String jobStatsInStr = (String)jobMetadata.get("REBALANCE_PROGRESS_STATS");
                TableRebalanceProgressStats jobStats = (TableRebalanceProgressStats)JsonUtils.stringToObject((String)jobStatsInStr, TableRebalanceProgressStats.class);
                if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
                    return;
                }
                LOGGER.info("Abort rebalance job: {} for table: {}", (Object)jobId, (Object)tableNameWithType);
                jobStats.setStatus(RebalanceResult.Status.ABORTED);
                jobMetadata.put("REBALANCE_PROGRESS_STATS", JsonUtils.objectToString((Object)jobStats));
            }
            catch (Exception e) {
                LOGGER.error("Failed to abort rebalance job: {} for table: {}", new Object[]{jobId, tableNameWithType, e});
            }
        });
        LOGGER.info("Tried to abort existing jobs at best effort and done: {}", (Object)updated);
    }

    @VisibleForTesting
    static Pair<TableRebalanceContext, Long> getLatestJob(Map<String, Set<Pair<TableRebalanceContext, Long>>> candidateJobs) {
        Pair<TableRebalanceContext, Long> candidateJobRun = null;
        for (Map.Entry<String, Set<Pair<TableRebalanceContext, Long>>> entry : candidateJobs.entrySet()) {
            Set<Pair<TableRebalanceContext, Long>> jobRuns = entry.getValue();
            int maxAttempts = ((TableRebalanceContext)jobRuns.iterator().next().getLeft()).getConfig().getMaxAttempts();
            Pair<TableRebalanceContext, Long> latestJobRun = null;
            for (Pair<TableRebalanceContext, Long> jobRun : jobRuns) {
                if (((TableRebalanceContext)jobRun.getLeft()).getAttemptId() >= maxAttempts) {
                    latestJobRun = null;
                    break;
                }
                if (latestJobRun != null && (Long)latestJobRun.getRight() >= (Long)jobRun.getRight()) continue;
                latestJobRun = jobRun;
            }
            if (latestJobRun == null) {
                LOGGER.info("Rebalance job: {} had exceeded maxAttempts: {}. Skip retry", (Object)entry.getKey(), (Object)maxAttempts);
                continue;
            }
            if (candidateJobRun != null && (Long)candidateJobRun.getRight() >= (Long)latestJobRun.getRight()) continue;
            candidateJobRun = latestJobRun;
        }
        return candidateJobRun;
    }

    @VisibleForTesting
    static Map<String, Set<Pair<TableRebalanceContext, Long>>> getCandidateJobs(String tableNameWithType, Map<String, Map<String, String>> allJobMetadata) throws Exception {
        long nowMs = System.currentTimeMillis();
        HashMap<String, Set<Pair<TableRebalanceContext, Long>>> candidates = new HashMap<String, Set<Pair<TableRebalanceContext, Long>>>();
        Pair latestStartedJob = null;
        Pair latestCompletedJob = null;
        HashMap<String, String> completedOriginalJobs = new HashMap<String, String>();
        HashSet<String> cancelledOriginalJobs = new HashSet<String>();
        for (Map.Entry<String, Map<String, String>> entry : allJobMetadata.entrySet()) {
            String jobId = entry.getKey();
            Map<String, String> jobMetadata = entry.getValue();
            long statsUpdatedAt = Long.parseLong(jobMetadata.get("submissionTimeMs"));
            String jobStatsInStr = jobMetadata.get("REBALANCE_PROGRESS_STATS");
            if (StringUtils.isEmpty((CharSequence)jobStatsInStr)) {
                LOGGER.info("Skip rebalance job: {} as it has no job progress stats", (Object)jobId);
                continue;
            }
            String jobCtxInStr = jobMetadata.get("REBALANCE_CONTEXT");
            if (StringUtils.isEmpty((CharSequence)jobCtxInStr)) {
                LOGGER.info("Skip rebalance job: {} as it has no job context", (Object)jobId);
                continue;
            }
            TableRebalanceProgressStats jobStats = (TableRebalanceProgressStats)JsonUtils.stringToObject((String)jobStatsInStr, TableRebalanceProgressStats.class);
            TableRebalanceContext jobCtx = (TableRebalanceContext)JsonUtils.stringToObject((String)jobCtxInStr, TableRebalanceContext.class);
            long jobStartTimeMs = jobStats.getStartTimeMs();
            if (latestStartedJob == null || (Long)latestStartedJob.getRight() < jobStartTimeMs) {
                latestStartedJob = Pair.of((Object)jobId, (Object)jobStartTimeMs);
            }
            String originalJobId = jobCtx.getOriginalJobId();
            RebalanceResult.Status jobStatus = jobStats.getStatus();
            if (jobStatus == RebalanceResult.Status.DONE || jobStatus == RebalanceResult.Status.NO_OP) {
                LOGGER.info("Skip rebalance job: {} as it has completed with status: {}", (Object)jobId, (Object)jobStatus);
                completedOriginalJobs.put(originalJobId, jobId);
                if (latestCompletedJob != null && (Long)latestCompletedJob.getRight() >= jobStartTimeMs) continue;
                latestCompletedJob = Pair.of((Object)jobId, (Object)jobStartTimeMs);
                continue;
            }
            if (jobStatus == RebalanceResult.Status.FAILED || jobStatus == RebalanceResult.Status.ABORTED) {
                LOGGER.info("Found rebalance job: {} for original job: {} has been stopped with status: {}", new Object[]{jobId, originalJobId, jobStatus});
                candidates.computeIfAbsent(originalJobId, k -> new HashSet()).add(Pair.of((Object)jobCtx, (Object)jobStartTimeMs));
                continue;
            }
            if (jobStatus == RebalanceResult.Status.CANCELLED) {
                LOGGER.info("Found cancelled rebalance job: {} for original job: {}", (Object)jobId, (Object)originalJobId);
                cancelledOriginalJobs.add(originalJobId);
                continue;
            }
            long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs();
            if (nowMs - statsUpdatedAt < heartbeatTimeoutMs) {
                LOGGER.info("Rebalance job: {} is actively running with status updated at: {} within timeout: {}. Skip retry for table: {}", new Object[]{jobId, statsUpdatedAt, heartbeatTimeoutMs, tableNameWithType});
                return Collections.emptyMap();
            }
            LOGGER.info("Found stuck rebalance job: {} for original job: {}", (Object)jobId, (Object)originalJobId);
            candidates.computeIfAbsent(originalJobId, k -> new HashSet()).add(Pair.of((Object)jobCtx, (Object)jobStartTimeMs));
        }
        if (latestCompletedJob != null && ((String)latestCompletedJob.getLeft()).equals(latestStartedJob.getLeft())) {
            LOGGER.info("Rebalance job: {} started most recently has already done. Skip retry for table: {}", latestCompletedJob.getLeft(), (Object)tableNameWithType);
            return Collections.emptyMap();
        }
        for (String string : cancelledOriginalJobs) {
            LOGGER.info("Skip original job: {} as it's cancelled", (Object)string);
            candidates.remove(string);
        }
        for (Map.Entry<String, Map<String, String>> entry : completedOriginalJobs.entrySet()) {
            LOGGER.info("Skip original job: {} as it's completed by attempt: {}", (Object)entry.getKey(), entry.getValue());
            candidates.remove(entry.getKey());
        }
        return candidates;
    }
}

