/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.minion;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.CronJobScheduleJob;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.TableTaskSchedulerUpdater;
import org.apache.pinot.controller.helix.core.minion.TaskTypeMetricsUpdater;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotTaskManager
extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
    public static final String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
    public static final String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
    public static final String SCHEDULE_KEY = "schedule";
    private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
    private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
    private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context";
    private final PinotHelixTaskResourceManager _helixTaskResourceManager;
    private final ClusterInfoAccessor _clusterInfoAccessor;
    private final TaskGeneratorRegistry _taskGeneratorRegistry;
    private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<String, Map<String, String>>();
    private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<String, TableTaskSchedulerUpdater>();
    private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap = new ConcurrentHashMap<String, TaskTypeMetricsUpdater>();
    private final Map<TaskState, Integer> _taskStateToCountMap = new ConcurrentHashMap<TaskState, Integer>();
    private Scheduler _scheduledExecutorService = null;

    public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        super(PINOT_TASK_MANAGER_KEY, controllerConf.getTaskManagerFrequencyInSeconds(), controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager, controllerMetrics);
        this._helixTaskResourceManager = helixTaskResourceManager;
        this._clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf);
        this._taskGeneratorRegistry = new TaskGeneratorRegistry(this._clusterInfoAccessor);
        if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
            try {
                this._scheduledExecutorService = new StdSchedulerFactory().getScheduler();
                this._scheduledExecutorService.start();
                LOGGER.info("Subscribe to tables change under PropertyStore path: {}", (Object)TABLE_CONFIG_PARENT_PATH);
                this._pinotHelixResourceManager.getPropertyStore().subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, (parentPath, currentChilds) -> {
                    HashSet tableToAdd = new HashSet(currentChilds);
                    tableToAdd.removeAll(this._tableTaskSchedulerUpdaterMap.keySet());
                    for (String tableWithType : tableToAdd) {
                        this.subscribeTableConfigChanges(tableWithType);
                    }
                    HashSet<String> tableToDelete = new HashSet<String>(this._tableTaskSchedulerUpdaterMap.keySet());
                    tableToDelete.removeAll(currentChilds);
                    if (!tableToDelete.isEmpty()) {
                        LOGGER.info("Found tables to clean up cron task scheduler: {}", tableToDelete);
                        for (String tableWithType : tableToDelete) {
                            this.cleanUpCronTaskSchedulerForTable(tableWithType);
                        }
                    }
                });
                for (String tableWithType : helixResourceManager.getAllTables()) {
                    this.subscribeTableConfigChanges(tableWithType);
                }
            }
            catch (SchedulerException e) {
                LOGGER.error("Unable to create a scheduler.", (Throwable)e);
                this._scheduledExecutorService = null;
            }
        }
    }

    private String getPropertyStorePathForTable(String tableWithType) {
        return TABLE_CONFIG_PATH_PREFIX + tableWithType;
    }

    private String getPropertyStorePathForTaskQueue(String taskType) {
        return String.format(TASK_QUEUE_PATH_PATTERN, taskType);
    }

    public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
        LOGGER.info("Cleaning up task in scheduler for table {}", (Object)tableWithType);
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = this._tableTaskSchedulerUpdaterMap.get(tableWithType);
        if (tableTaskSchedulerUpdater != null) {
            this._pinotHelixResourceManager.getPropertyStore().unsubscribeDataChanges(this.getPropertyStorePathForTable(tableWithType), (IZkDataListener)tableTaskSchedulerUpdater);
        }
        this.removeAllTasksFromCronExpressions(tableWithType);
        this._tableTaskSchedulerUpdaterMap.remove(tableWithType);
    }

    private synchronized void removeAllTasksFromCronExpressions(String tableWithType) {
        Set jobKeys;
        if (this._scheduledExecutorService == null) {
            return;
        }
        try {
            jobKeys = this._scheduledExecutorService.getJobKeys(GroupMatcher.anyJobGroup());
        }
        catch (SchedulerException e) {
            LOGGER.error("Got exception when fetching all jobKeys", (Throwable)e);
            return;
        }
        for (JobKey jobKey : jobKeys) {
            if (!jobKey.getName().equals(tableWithType)) continue;
            try {
                this._scheduledExecutorService.deleteJob(jobKey);
                this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, jobKey.getGroup()), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
            }
            catch (SchedulerException e) {
                LOGGER.error("Got exception when deleting the scheduled job - {}", (Object)jobKey, (Object)e);
            }
        }
        this._tableTaskTypeToCronExpressionMap.remove(tableWithType);
    }

    public static String getCronJobName(String tableWithType, String taskType) {
        return String.format("%s.%s", tableWithType, taskType);
    }

    public synchronized void subscribeTableConfigChanges(String tableWithType) {
        if (this._tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
            return;
        }
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
        this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(this.getPropertyStorePathForTable(tableWithType), (IZkDataListener)tableTaskSchedulerUpdater);
        this._tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
        try {
            this.updateCronTaskScheduler(tableWithType);
        }
        catch (Exception e) {
            LOGGER.error("Failed to create cron task in scheduler for table: {}", (Object)tableWithType, (Object)e);
        }
    }

    public synchronized void updateCronTaskScheduler(String tableWithType) {
        if (this._scheduledExecutorService == null) {
            return;
        }
        LOGGER.info("Trying to update task schedule for table: {}", (Object)tableWithType);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableWithType);
        if (tableConfig == null) {
            LOGGER.info("tableConfig is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
        if (taskConfig == null) {
            LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        Map taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
        if (taskTypeConfigsMap == null) {
            LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        Map<String, String> taskToCronExpressionMap = this.getTaskToCronExpressionMap(taskTypeConfigsMap);
        LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
        this.updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
    }

    private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
        if (this._scheduledExecutorService == null) {
            return;
        }
        Map<String, String> existingScheduledTasks = this._tableTaskTypeToCronExpressionMap.get(tableWithType);
        if (existingScheduledTasks != null && !existingScheduledTasks.isEmpty()) {
            for (String existingTaskType : existingScheduledTasks.keySet()) {
                if (!taskToCronExpressionMap.containsKey(existingTaskType)) {
                    try {
                        this._scheduledExecutorService.deleteJob(JobKey.jobKey((String)tableWithType, (String)existingTaskType));
                        this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, existingTaskType), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
                    }
                    catch (SchedulerException e) {
                        LOGGER.error("Failed to delete scheduled job for table {}, task type {}", new Object[]{tableWithType, existingScheduledTasks, e});
                    }
                    continue;
                }
                String existingCronExpression = existingScheduledTasks.get(existingTaskType);
                String newCronExpression = taskToCronExpressionMap.get(existingTaskType);
                if (existingCronExpression == null) {
                    try {
                        this.scheduleJob(tableWithType, existingTaskType, newCronExpression);
                    }
                    catch (SchedulerException e) {
                        LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", new Object[]{tableWithType, existingTaskType, newCronExpression, e});
                    }
                    continue;
                }
                if (existingCronExpression.equalsIgnoreCase(newCronExpression)) continue;
                try {
                    TriggerKey triggerKey = TriggerKey.triggerKey((String)tableWithType, (String)existingTaskType);
                    Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)newCronExpression)).build();
                    this._scheduledExecutorService.rescheduleJob(triggerKey, trigger);
                }
                catch (SchedulerException e) {
                    LOGGER.error("Failed to delete scheduled job for table {}, task type {}", new Object[]{tableWithType, existingScheduledTasks, e});
                }
            }
        } else {
            for (String taskType : taskToCronExpressionMap.keySet()) {
                String cronExpr = taskToCronExpressionMap.get(taskType);
                try {
                    this.scheduleJob(tableWithType, taskType, cronExpr);
                }
                catch (SchedulerException e) {
                    LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", new Object[]{tableWithType, taskType, cronExpr, e});
                }
            }
        }
        this._tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
    }

    private void scheduleJob(String tableWithType, String taskType, String cronExprStr) throws SchedulerException {
        if (this._scheduledExecutorService == null) {
            return;
        }
        boolean exists = false;
        try {
            exists = this._scheduledExecutorService.checkExists(JobKey.jobKey((String)tableWithType, (String)taskType));
        }
        catch (SchedulerException e) {
            LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", new Object[]{tableWithType, taskType, e});
        }
        if (!exists) {
            LOGGER.info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", new Object[]{cronExprStr, tableWithType, taskType});
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey((String)tableWithType, (String)taskType)).withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)cronExprStr)).build();
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put(PINOT_TASK_MANAGER_KEY, (Object)this);
            jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, (Object)this._leadControllerManager);
            JobDetail jobDetail = JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap).build();
            try {
                this._scheduledExecutorService.scheduleJob(jobDetail, trigger);
                this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, taskType), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, 1L);
            }
            catch (Exception e) {
                LOGGER.error("Failed to parse Cron expression - " + cronExprStr, (Throwable)e);
                throw e;
            }
            Date nextRuntime = trigger.getNextFireTime();
            LOGGER.info("Scheduled task for table: {}, task type: {}, next runtime: {}", new Object[]{tableWithType, taskType, nextRuntime});
        }
    }

    private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
        HashMap<String, String> taskToCronExpressionMap = new HashMap<String, String>();
        for (String taskType : taskTypeConfigsMap.keySet()) {
            String cronExprStr;
            Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
            if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY) || (cronExprStr = taskTypeConfig.get(SCHEDULE_KEY)) == null) continue;
            taskToCronExpressionMap.put(taskType, cronExprStr);
        }
        return taskToCronExpressionMap;
    }

    public ClusterInfoAccessor getClusterInfoAccessor() {
        return this._clusterInfoAccessor;
    }

    public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
        this._taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
    }

    public synchronized Map<String, String> scheduleTasks() {
        return this.scheduleTasks(this._pinotHelixResourceManager.getAllTables(), false);
    }

    private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
        this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
        HashMap<String, List> enabledTableConfigMap = new HashMap<String, List>();
        for (String tableNameWithType : tableNamesWithType) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            if (tableConfig == null || tableConfig.getTaskConfig() == null) continue;
            Set enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
            for (String enabledTaskType : enabledTaskTypes) {
                enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList()).add(tableConfig);
            }
        }
        HashMap<String, String> tasksScheduled = new HashMap<String, String>();
        for (Map.Entry entry : enabledTableConfigMap.entrySet()) {
            String taskType = (String)entry.getKey();
            List enabledTableConfigs = (List)entry.getValue();
            PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
            if (taskGenerator != null) {
                this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
                this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
                tasksScheduled.put(taskType, this.scheduleTask(taskGenerator, enabledTableConfigs, isLeader));
                continue;
            }
            ArrayList<String> enabledTables = new ArrayList<String>(enabledTableConfigs.size());
            for (TableConfig enabledTableConfig : enabledTableConfigs) {
                enabledTables.add(enabledTableConfig.getTableName());
            }
            LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", (Object)taskType, enabledTables);
            tasksScheduled.put(taskType, null);
        }
        return tasksScheduled;
    }

    @Nullable
    private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, boolean isLeader) {
        LOGGER.info("Trying to schedule task type: {}, with table config: {}, isLeader: {}", new Object[]{taskGenerator.getTaskType(), enabledTableConfigs, isLeader});
        List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
        if (!isLeader) {
            taskGenerator.nonLeaderCleanUp();
        }
        String taskType = taskGenerator.getTaskType();
        int numTasks = pinotTaskConfigs.size();
        if (numTasks > 0) {
            LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", new Object[]{numTasks, taskType, pinotTaskConfigs});
            this._controllerMetrics.addMeteredTableValue(taskType, (AbstractMetrics.Meter)ControllerMeter.NUMBER_TASKS_SUBMITTED, (long)numTasks);
            return this._helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance());
        }
        LOGGER.info("No task to schedule for task type: {}", (Object)taskType);
        return null;
    }

    @Nullable
    public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
        return this.scheduleTasks(Collections.singletonList(tableNameWithType), false);
    }

    @Nullable
    public synchronized String scheduleTask(String taskType) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
        Preconditions.checkState((taskGenerator != null ? 1 : 0) != 0, (String)"Task type: %s is not registered", (Object)taskType);
        ArrayList<TableConfig> enabledTableConfigs = new ArrayList<TableConfig>();
        for (String tableNameWithType : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            if (tableConfig == null || tableConfig.getTaskConfig() == null || !tableConfig.getTaskConfig().isTaskTypeEnabled(taskType)) continue;
            enabledTableConfigs.add(tableConfig);
        }
        this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
        this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
        return this.scheduleTask(taskGenerator, enabledTableConfigs, false);
    }

    @Nullable
    public synchronized String scheduleTask(String taskType, String tableNameWithType) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
        Preconditions.checkState((taskGenerator != null ? 1 : 0) != 0, (String)"Task type: %s is not registered", (Object)taskType);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
        Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (String)"Failed to find table config for table: %s", (Object)tableNameWithType);
        Preconditions.checkState((tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType) ? 1 : 0) != 0, (String)"Table: %s does not have task type: %s enabled", (Object)tableNameWithType, (Object)taskType);
        this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
        this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
        return this.scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
    }

    @Override
    protected void processTables(List<String> tableNamesWithType) {
        this.scheduleTasks(tableNamesWithType, true);
    }

    public void cleanUpTask() {
        LOGGER.info("Cleaning up all task generators");
        for (String taskType : this._taskGeneratorRegistry.getAllTaskTypes()) {
            this._taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
        }
    }

    public Scheduler getScheduler() {
        return this._scheduledExecutorService;
    }

    public synchronized void reportMetrics(String taskType) {
        Map<String, TaskState> taskStates = this._helixTaskResourceManager.getTaskStates(taskType);
        HashMap<TaskState, Integer> taskStateToCountMap = new HashMap<TaskState, Integer>();
        for (TaskState taskState : taskStates.values()) {
            taskStateToCountMap.merge(taskState, 1, Integer::sum);
        }
        for (TaskState taskState : this._taskStateToCountMap.keySet()) {
            this._taskStateToCountMap.put(taskState, 0);
        }
        this._taskStateToCountMap.putAll(taskStateToCountMap);
        for (Map.Entry entry : this._taskStateToCountMap.entrySet()) {
            this._controllerMetrics.setValueOfTableGauge(String.format("%s.%s", taskType, entry.getKey()), (AbstractMetrics.Gauge)ControllerGauge.TASK_STATUS, (long)((Integer)entry.getValue()).intValue());
        }
    }

    private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String taskType) {
        if (!this._taskTypeMetricsUpdaterMap.containsKey(taskType)) {
            TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(taskType, this);
            this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(this.getPropertyStorePathForTaskQueue(taskType), (IZkDataListener)taskTypeMetricsUpdater);
            this._taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
        }
    }
}

