/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.relocation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentRelocator
extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentRelocator.class);
    private final ExecutorService _executorService;
    private final HttpClientConnectionManager _connectionManager;
    private final boolean _enableLocalTierMigration;
    private final int _serverAdminRequestTimeoutMs;
    private final long _externalViewCheckIntervalInMs;
    private final long _externalViewStabilizationTimeoutInMs;
    private final Set<String> _waitingTables;
    private final BlockingQueue<String> _waitingQueue;

    public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, ExecutorService executorService, HttpClientConnectionManager connectionManager) {
        super(SegmentRelocator.class.getSimpleName(), config.getSegmentRelocatorFrequencyInSeconds(), config.getSegmentRelocatorInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._executorService = executorService;
        this._connectionManager = connectionManager;
        this._enableLocalTierMigration = config.enableSegmentRelocatorLocalTierMigration();
        this._serverAdminRequestTimeoutMs = config.getServerAdminRequestTimeoutSeconds() * 1000;
        long taskIntervalInMs = (long)config.getSegmentRelocatorFrequencyInSeconds() * 1000L;
        this._externalViewCheckIntervalInMs = Math.min(taskIntervalInMs, config.getSegmentRelocatorExternalViewCheckIntervalInMs());
        this._externalViewStabilizationTimeoutInMs = Math.min(taskIntervalInMs, config.getSegmentRelocatorExternalViewStabilizationTimeoutInMs());
        if (config.isSegmentRelocatorRebalanceTablesSequentially()) {
            this._waitingTables = ConcurrentHashMap.newKeySet();
            this._waitingQueue = new LinkedBlockingQueue<String>();
            this._executorService.submit(() -> {
                LOGGER.info("Rebalance tables sequentially");
                try {
                    while (true) {
                        this.rebalanceWaitingTable(this::rebalanceTable);
                    }
                }
                catch (InterruptedException e) {
                    LOGGER.warn("Got interrupted while rebalancing tables sequentially", (Throwable)e);
                    return;
                }
            });
        } else {
            this._waitingTables = null;
            this._waitingQueue = null;
        }
    }

    @Override
    protected void processTable(String tableNameWithType) {
        if (this._waitingTables == null) {
            LOGGER.debug("Rebalance table: {} immediately", (Object)tableNameWithType);
            this._executorService.submit(() -> this.rebalanceTable(tableNameWithType));
            return;
        }
        this.putTableToWait(tableNameWithType);
    }

    @VisibleForTesting
    void putTableToWait(String tableNameWithType) {
        if (this._waitingTables.add(tableNameWithType)) {
            this._waitingQueue.offer(tableNameWithType);
            LOGGER.debug("Table: {} is added in waiting queue, total waiting: {}", (Object)tableNameWithType, (Object)this._waitingTables.size());
            return;
        }
        LOGGER.debug("Table: {} is already in waiting queue", (Object)tableNameWithType);
    }

    @VisibleForTesting
    void rebalanceWaitingTable(Consumer<String> rebalancer) throws InterruptedException {
        LOGGER.debug("Getting next waiting table to rebalance");
        String nextTable = this._waitingQueue.take();
        try {
            rebalancer.accept(nextTable);
        }
        finally {
            this._waitingTables.remove(nextTable);
            LOGGER.debug("Rebalance done for table: {}, total waiting: {}", (Object)nextTable, (Object)this._waitingTables.size());
        }
    }

    @VisibleForTesting
    BlockingQueue<String> getWaitingQueue() {
        return this._waitingQueue;
    }

    private void rebalanceTable(String tableNameWithType) {
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
        Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (String)"Failed to find table config for table: {}", (Object)tableNameWithType);
        boolean isRealtimeTable = TableNameBuilder.isRealtimeTableResource((String)tableNameWithType);
        if (isRealtimeTable && new StreamConfig(tableNameWithType, IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig)).hasHighLevelConsumerType()) {
            return;
        }
        boolean relocate = false;
        if (TierConfigUtils.shouldRelocateToTiers((TableConfig)tableConfig)) {
            relocate = true;
            LOGGER.info("Relocating segments to tiers for table: {}", (Object)tableNameWithType);
        }
        if (isRealtimeTable && InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments((TableConfig)tableConfig)) {
            relocate = true;
            LOGGER.info("Relocating COMPLETED segments for table: {}", (Object)tableNameWithType);
        }
        if (!relocate) {
            LOGGER.debug("No need to relocate segments of table: {}", (Object)tableNameWithType);
            return;
        }
        BaseConfiguration rebalanceConfig = new BaseConfiguration();
        rebalanceConfig.addProperty("minReplicasToKeepUpForNoDowntime", (Object)-1);
        rebalanceConfig.addProperty("externalViewCheckIntervalInMs", (Object)this._externalViewCheckIntervalInMs);
        rebalanceConfig.addProperty("externalViewStabilizationTimeoutInMs", (Object)this._externalViewStabilizationTimeoutInMs);
        rebalanceConfig.addProperty("updateTargetTier", (Object)TierConfigUtils.shouldRelocateToTiers((TableConfig)tableConfig));
        rebalanceConfig.addProperty("jobId", (Object)TableRebalancer.createUniqueRebalanceJobIdentifier());
        try {
            RebalanceResult rebalance = this._pinotHelixResourceManager.rebalanceTable(tableNameWithType, (Configuration)rebalanceConfig, false);
            switch (rebalance.getStatus()) {
                case NO_OP: {
                    LOGGER.info("All segments are already relocated for table: {}", (Object)tableNameWithType);
                    this.migrateToTargetTier(tableNameWithType);
                    break;
                }
                case DONE: {
                    LOGGER.info("Finished relocating segments for table: {}", (Object)tableNameWithType);
                    this.migrateToTargetTier(tableNameWithType);
                    break;
                }
                default: {
                    LOGGER.error("Relocation failed for table: {}", (Object)tableNameWithType);
                    break;
                }
            }
        }
        catch (Throwable t) {
            LOGGER.error("Caught exception/error while rebalancing table: {}", (Object)tableNameWithType, (Object)t);
        }
    }

    private void migrateToTargetTier(String tableNameWithType) {
        if (!this._enableLocalTierMigration) {
            LOGGER.debug("Skipping migrating segments of table: {} to new tiers on hosting servers", (Object)tableNameWithType);
            return;
        }
        LOGGER.info("Migrating segments of table: {} to new tiers on hosting servers", (Object)tableNameWithType);
        try {
            TableTierReader.TableTierDetails tableTiers = new TableTierReader(this._executorService, this._connectionManager, this._pinotHelixResourceManager).getTableTierDetails(tableNameWithType, null, this._serverAdminRequestTimeoutMs, true);
            SegmentRelocator.triggerLocalTierMigration(tableNameWithType, tableTiers, this._pinotHelixResourceManager.getHelixZkManager().getMessagingService());
            LOGGER.info("Migrated segments of table: {} to new tiers on hosting servers", (Object)tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.error("Failed to migrate segments of table: {} to new tiers on hosting servers", (Object)tableNameWithType, (Object)e);
        }
    }

    @VisibleForTesting
    static void triggerLocalTierMigration(String tableNameWithType, TableTierReader.TableTierDetails tableTiers, ClusterMessagingService messagingService) {
        Map<String, Map<String, String>> currentTiers = tableTiers.getSegmentCurrentTiers();
        Map<String, String> targetTiers = tableTiers.getSegmentTargetTiers();
        LOGGER.debug("Got segment current tiers: {} and target tiers: {}", currentTiers, targetTiers);
        HashMap<String, Set<String>> serverToSegmentsToMigrate = new HashMap<String, Set<String>>();
        for (Map.Entry<String, Map<String, String>> segmentTiers : currentTiers.entrySet()) {
            String segmentName = segmentTiers.getKey();
            Map<String, String> serverToCurrentTiers = segmentTiers.getValue();
            String targetTier = targetTiers.get(segmentName);
            for (Map.Entry<String, String> serverTier : serverToCurrentTiers.entrySet()) {
                String tier = serverTier.getValue();
                String server = serverTier.getKey();
                if (tier == null && targetTier == null || tier != null && tier.equals(targetTier)) {
                    LOGGER.debug("Segment: {} is already on the target tier: {} on server: {}", new Object[]{segmentName, TierConfigUtils.normalizeTierName((String)tier), server});
                    continue;
                }
                LOGGER.debug("Segment: {} needs to move from current tier: {} to target tier: {} on server: {}", new Object[]{segmentName, TierConfigUtils.normalizeTierName((String)tier), TierConfigUtils.normalizeTierName((String)targetTier), server});
                serverToSegmentsToMigrate.computeIfAbsent(server, s -> new HashSet()).add(segmentName);
            }
        }
        if (serverToSegmentsToMigrate.size() > 0) {
            LOGGER.info("Notify servers: {} to move segments to new tiers locally", serverToSegmentsToMigrate.keySet());
            SegmentRelocator.reloadSegmentsForLocalTierMigration(tableNameWithType, serverToSegmentsToMigrate, messagingService);
        } else {
            LOGGER.info("No server needs to move segments to new tiers locally");
        }
    }

    private static void reloadSegmentsForLocalTierMigration(String tableNameWithType, Map<String, Set<String>> serverToSegmentsToMigrate, ClusterMessagingService messagingService) {
        for (Map.Entry<String, Set<String>> entry : serverToSegmentsToMigrate.entrySet()) {
            String serverName = entry.getKey();
            Set<String> segmentNames = entry.getValue();
            Criteria recipientCriteria = new Criteria();
            recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
            recipientCriteria.setInstanceName(serverName);
            recipientCriteria.setResource(tableNameWithType);
            recipientCriteria.setSessionSpecific(true);
            SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, new ArrayList<String>(segmentNames), false);
            LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", new Object[]{serverName, segmentNames, tableNameWithType});
            int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", (Object)serverName, (Object)tableNameWithType);
                continue;
            }
            LOGGER.warn("No SegmentReloadMessage sent to server: {} for table: {}", (Object)serverName, (Object)tableNameWithType);
        }
    }
}

