/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.rebalance.tenant;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.rebalance.tenant.ZkBasedTenantRebalanceObserver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTenantRebalancer
implements TenantRebalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
    PinotHelixResourceManager _pinotHelixResourceManager;
    ExecutorService _executorService;

    public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._executorService = executorService;
    }

    @Override
    public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
        HashMap<String, RebalanceResult> rebalanceResult = new HashMap<String, RebalanceResult>();
        Set<String> tables = this.getTenantTables(config.getTenantName());
        tables.forEach(table -> {
            try {
                RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
                rebalanceConfig.setDryRun(true);
                rebalanceResult.put((String)table, this._pinotHelixResourceManager.rebalanceTable((String)table, rebalanceConfig, this.createUniqueRebalanceJobIdentifier(), false));
            }
            catch (TableNotFoundException exception) {
                rebalanceResult.put((String)table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), null, null, null));
            }
        });
        if (config.isDryRun() || config.isDowntime()) {
            return new TenantRebalanceResult(null, rebalanceResult, config.isVerboseResult());
        }
        for (String table2 : rebalanceResult.keySet()) {
            RebalanceResult result = (RebalanceResult)rebalanceResult.get(table2);
            if (result.getStatus() != RebalanceResult.Status.DONE) continue;
            rebalanceResult.put(table2, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller task status for the", result.getInstanceAssignment(), result.getTierInstanceAssignment(), result.getSegmentAssignment()));
        }
        String tenantRebalanceJobId = this.createUniqueRebalanceJobIdentifier();
        ZkBasedTenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(), tables, this._pinotHelixResourceManager);
        observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
        LinkedList sequentialQueue = new LinkedList();
        ConcurrentLinkedDeque parallelQueue = new ConcurrentLinkedDeque();
        int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
        Set<String> dimTables = this.getDimensionalTables(config.getTenantName());
        AtomicInteger activeThreads = new AtomicInteger(parallelism);
        try {
            if (parallelism > 1) {
                Object parallelTables = !config.getParallelWhitelist().isEmpty() ? new HashSet<String>(config.getParallelWhitelist()) : new HashSet(tables);
                if (!config.getParallelBlacklist().isEmpty()) {
                    parallelTables = Sets.difference(parallelTables, config.getParallelBlacklist());
                }
                parallelTables.forEach(table -> {
                    if (dimTables.contains(table)) {
                        parallelQueue.addFirst(table);
                    } else {
                        parallelQueue.addLast(table);
                    }
                });
                Sets.difference(tables, (Set)parallelTables).forEach(table -> {
                    if (dimTables.contains(table)) {
                        sequentialQueue.addFirst(table);
                    } else {
                        sequentialQueue.addLast(table);
                    }
                });
            } else {
                tables.forEach(table -> {
                    if (dimTables.contains(table)) {
                        sequentialQueue.addFirst(table);
                    } else {
                        sequentialQueue.addLast(table);
                    }
                });
            }
            for (int i = 0; i < parallelism; ++i) {
                this._executorService.submit(() -> {
                    String table;
                    while ((table = (String)parallelQueue.pollFirst()) != null) {
                        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
                        rebalanceConfig.setDryRun(false);
                        this.rebalanceTable(table, rebalanceConfig, ((RebalanceResult)rebalanceResult.get(table)).getJobId(), observer);
                    }
                    if (activeThreads.decrementAndGet() == 0) {
                        String table2;
                        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
                        rebalanceConfig.setDryRun(false);
                        while ((table2 = (String)sequentialQueue.pollFirst()) != null) {
                            this.rebalanceTable(table2, rebalanceConfig, ((RebalanceResult)rebalanceResult.get(table2)).getJobId(), observer);
                        }
                        observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName()));
                    }
                });
            }
        }
        catch (Exception exception) {
            observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", config.getTenantName(), exception.getMessage()));
        }
        return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, config.isVerboseResult());
    }

    private Set<String> getDimensionalTables(String tenantName) {
        HashSet<String> dimTables = new HashSet<String>();
        for (String table : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(table);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", (Object)table);
                continue;
            }
            if (!tenantName.equals(tableConfig.getTenantConfig().getServer()) || !tableConfig.isDimTable()) continue;
            dimTables.add(table);
        }
        return dimTables;
    }

    private String createUniqueRebalanceJobIdentifier() {
        return UUID.randomUUID().toString();
    }

    private Set<String> getTenantTables(String tenantName) {
        HashSet<String> tables = new HashSet<String>();
        for (String table : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(table);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", (Object)table);
                continue;
            }
            String tableConfigTenant = tableConfig.getTenantConfig().getServer();
            if (!tenantName.equals(tableConfigTenant)) continue;
            tables.add(table);
        }
        return tables;
    }

    private void rebalanceTable(String tableName, RebalanceConfig config, String rebalanceJobId, TenantRebalanceObserver observer) {
        try {
            observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, rebalanceJobId);
            RebalanceResult result = this._pinotHelixResourceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
            if (result.getStatus().equals((Object)RebalanceResult.Status.DONE)) {
                observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null);
            } else {
                observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, result.getDescription());
            }
        }
        catch (Throwable t) {
            observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, String.format("Caught exception/error while rebalancing table: %s", tableName));
        }
    }
}

