/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.rebalance;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkBasedTableRebalanceObserver
implements TableRebalanceObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
    private final String _tableNameWithType;
    private final String _rebalanceJobId;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private final TableRebalanceProgressStats _tableRebalanceProgressStats;
    private final TableRebalanceContext _tableRebalanceContext;
    private long _lastUpdateTimeMs;
    private int _numUpdatesToZk;
    private boolean _isStopped = false;
    private RebalanceResult.Status _stopStatus;
    private final ControllerMetrics _controllerMetrics;

    public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJobId, TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager pinotHelixResourceManager) {
        Preconditions.checkState((tableNameWithType != null ? 1 : 0) != 0, (Object)"Table name cannot be null");
        Preconditions.checkState((rebalanceJobId != null ? 1 : 0) != 0, (Object)"rebalanceId cannot be null");
        Preconditions.checkState((pinotHelixResourceManager != null ? 1 : 0) != 0, (Object)"PinotHelixManager cannot be null");
        this._tableNameWithType = tableNameWithType;
        this._rebalanceJobId = rebalanceJobId;
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._tableRebalanceProgressStats = new TableRebalanceProgressStats();
        this._tableRebalanceContext = tableRebalanceContext;
        this._numUpdatesToZk = 0;
        this._controllerMetrics = ControllerMetrics.get();
    }

    @Override
    public void onTrigger(TableRebalanceObserver.Trigger trigger, Map<String, Map<String, String>> currentState, Map<String, Map<String, String>> targetState) {
        boolean updatedStatsInZk = false;
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, (AbstractMetrics.Gauge)ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1L);
        switch (trigger) {
            case START_TRIGGER: {
                this.updateOnStart(currentState, targetState);
                this.trackStatsInZk();
                updatedStatsInZk = true;
                break;
            }
            case IDEAL_STATE_CHANGE_TRIGGER: {
                TableRebalanceProgressStats.RebalanceStateStats latest = ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState);
                if (!TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getCurrentToTargetConvergence(), latest)) break;
                this._tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
                this.trackStatsInZk();
                updatedStatsInZk = true;
                break;
            }
            case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER: {
                TableRebalanceProgressStats.RebalanceStateStats latest = ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState);
                if (!TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) break;
                this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
                this.trackStatsInZk();
                updatedStatsInZk = true;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unimplemented trigger: " + trigger);
            }
        }
        long heartbeatIntervalInMs = this._tableRebalanceContext.getConfig().getHeartbeatIntervalInMs();
        if (!updatedStatsInZk && System.currentTimeMillis() - this._lastUpdateTimeMs > heartbeatIntervalInMs) {
            LOGGER.debug("Update status of rebalance job: {} for table: {} after {}ms as heartbeat", new Object[]{this._rebalanceJobId, this._tableNameWithType, heartbeatIntervalInMs});
            this.trackStatsInZk();
        }
    }

    private void updateOnStart(Map<String, Map<String, String>> currentState, Map<String, Map<String, String>> targetState) {
        Preconditions.checkState((RebalanceResult.Status.IN_PROGRESS != this._tableRebalanceProgressStats.getStatus() ? 1 : 0) != 0, (Object)"Rebalance Observer onStart called multiple times");
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
        this._tableRebalanceProgressStats.setInitialToTargetStateConvergence(ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState));
        this._tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
    }

    @Override
    public void onSuccess(String msg) {
        Preconditions.checkState((RebalanceResult.Status.DONE != this._tableRebalanceProgressStats.getStatus() ? 1 : 0) != 0, (Object)"Table Rebalance already completed");
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, (AbstractMetrics.Gauge)ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0L);
        long timeToFinishInSeconds = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
        this._tableRebalanceProgressStats.setCompletionStatusMsg(msg);
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE);
        TableRebalanceProgressStats.RebalanceStateStats stats = new TableRebalanceProgressStats.RebalanceStateStats();
        this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(stats);
        this._tableRebalanceProgressStats.setCurrentToTargetConvergence(stats);
        this.trackStatsInZk();
    }

    @Override
    public void onError(String errorMsg) {
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, (AbstractMetrics.Gauge)ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0L);
        long timeToFinishInSeconds = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
        this._tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);
        this.trackStatsInZk();
    }

    @Override
    public boolean isStopped() {
        return this._isStopped;
    }

    @Override
    public RebalanceResult.Status getStopStatus() {
        return this._stopStatus;
    }

    public int getNumUpdatesToZk() {
        return this._numUpdatesToZk;
    }

    @VisibleForTesting
    TableRebalanceContext getTableRebalanceContext() {
        return this._tableRebalanceContext;
    }

    private void trackStatsInZk() {
        Map<String, String> jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(this._tableNameWithType, this._rebalanceJobId, this._tableRebalanceProgressStats, this._tableRebalanceContext);
        this._pinotHelixResourceManager.addControllerJobToZK(this._rebalanceJobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> {
            TableRebalanceProgressStats prevStats;
            if (prevJobMetadata == null) {
                return true;
            }
            String prevStatsInStr = (String)prevJobMetadata.get("REBALANCE_PROGRESS_STATS");
            try {
                prevStats = (TableRebalanceProgressStats)JsonUtils.stringToObject((String)prevStatsInStr, TableRebalanceProgressStats.class);
            }
            catch (JsonProcessingException ignore) {
                return true;
            }
            if (prevStats == null || RebalanceResult.Status.IN_PROGRESS == prevStats.getStatus()) {
                return true;
            }
            this._isStopped = true;
            this._stopStatus = prevStats.getStatus();
            LOGGER.warn("Rebalance job: {} for table: {} has already stopped with status: {}", new Object[]{this._rebalanceJobId, this._tableNameWithType, this._stopStatus});
            return false;
        });
        ++this._numUpdatesToZk;
        this._lastUpdateTimeMs = System.currentTimeMillis();
        LOGGER.debug("Made {} ZK updates for rebalance job: {} of table: {}", new Object[]{this._numUpdatesToZk, this._rebalanceJobId, this._tableNameWithType});
    }

    @VisibleForTesting
    static Map<String, String> createJobMetadata(String tableNameWithType, String jobId, TableRebalanceProgressStats tableRebalanceProgressStats, TableRebalanceContext tableRebalanceContext) {
        HashMap<String, String> jobMetadata = new HashMap<String, String>();
        jobMetadata.put("tableName", tableNameWithType);
        jobMetadata.put("jobId", jobId);
        jobMetadata.put("submissionTimeMs", Long.toString(System.currentTimeMillis()));
        jobMetadata.put("jobType", ControllerJobType.TABLE_REBALANCE.name());
        try {
            jobMetadata.put("REBALANCE_PROGRESS_STATS", JsonUtils.objectToString((Object)tableRebalanceProgressStats));
        }
        catch (JsonProcessingException e) {
            LOGGER.error("Error serialising stats for rebalance job: {} of table: {} to keep in ZK", new Object[]{jobId, tableNameWithType, e});
        }
        try {
            jobMetadata.put("REBALANCE_CONTEXT", JsonUtils.objectToString((Object)tableRebalanceContext));
        }
        catch (JsonProcessingException e) {
            LOGGER.error("Error serialising retry configs for rebalance job: {} of table: {} to keep in ZK", new Object[]{jobId, tableNameWithType, e});
        }
        return jobMetadata;
    }

    public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetweenTableRebalanceStates(Map<String, Map<String, String>> targetState, Map<String, Map<String, String>> sourceState) {
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStats = new TableRebalanceProgressStats.RebalanceStateStats();
        for (Map.Entry<String, Map<String, String>> entry : targetState.entrySet()) {
            String segmentName = entry.getKey();
            Map<String, String> sourceInstanceStateMap = sourceState.get(segmentName);
            if (sourceInstanceStateMap == null) {
                ++rebalanceStats._segmentsMissing;
                ++rebalanceStats._segmentsToRebalance;
                continue;
            }
            Map<String, String> targetStateInstanceStateMap = entry.getValue();
            boolean hasSegmentConverged = true;
            for (Map.Entry<String, String> instanceStateEntry : targetStateInstanceStateMap.entrySet()) {
                String instanceName;
                String sourceInstanceState;
                String targetStateInstanceState = instanceStateEntry.getValue();
                if (targetStateInstanceState.equals("OFFLINE") || targetStateInstanceState.equals(sourceInstanceState = sourceInstanceStateMap.get(instanceName = instanceStateEntry.getKey()))) continue;
                ++rebalanceStats._replicasToRebalance;
                hasSegmentConverged = false;
            }
            if (hasSegmentConverged) continue;
            ++rebalanceStats._segmentsToRebalance;
        }
        int totalSegments = targetState.size();
        rebalanceStats._percentSegmentsToRebalance = totalSegments == 0 ? 0.0 : (double)rebalanceStats._segmentsToRebalance / (double)totalSegments * 100.0;
        return rebalanceStats;
    }
}

