/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.minion;

import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.CronJobScheduleJob;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.TableTaskSchedulerUpdater;
import org.apache.pinot.controller.helix.core.minion.TaskTypeMetricsUpdater;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotTaskManager
extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
    public static final String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
    public static final String SKIP_LATE_CRON_SCHEDULE = "SkipLateCronSchedule";
    public static final String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
    public static final String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
    public static final String SCHEDULE_KEY = "schedule";
    private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
    private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
    private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context";
    private final PinotHelixTaskResourceManager _helixTaskResourceManager;
    private final ClusterInfoAccessor _clusterInfoAccessor;
    private final TaskGeneratorRegistry _taskGeneratorRegistry;
    private final Scheduler _scheduler;
    private final boolean _skipLateCronSchedule;
    private final int _maxCronScheduleDelayInSeconds;
    private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap;
    private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap;
    private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap;
    private final Map<TaskState, Integer> _taskStateToCountMap;
    private final ZkTableConfigChangeListener _zkTableConfigChangeListener;
    private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor, PoolingHttpClientConnectionManager connectionManager) {
        block7: {
            super(PINOT_TASK_MANAGER_KEY, controllerConf.getTaskManagerFrequencyInSeconds(), controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager, controllerMetrics);
            this._tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<String, Map<String, String>>();
            this._tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<String, TableTaskSchedulerUpdater>();
            this._taskTypeMetricsUpdaterMap = new ConcurrentHashMap<String, TaskTypeMetricsUpdater>();
            this._taskStateToCountMap = new ConcurrentHashMap<TaskState, Integer>();
            this._zkTableConfigChangeListener = new ZkTableConfigChangeListener();
            this._helixTaskResourceManager = helixTaskResourceManager;
            this._taskManagerStatusCache = taskManagerStatusCache;
            this._clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics, leadControllerManager, executor, connectionManager);
            this._taskGeneratorRegistry = new TaskGeneratorRegistry(this._clusterInfoAccessor);
            this._skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
            this._maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
            if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
                try {
                    this._scheduler = new StdSchedulerFactory().getScheduler();
                    this._scheduler.start();
                    ZkTableConfigChangeListener zkTableConfigChangeListener = this._zkTableConfigChangeListener;
                    synchronized (zkTableConfigChangeListener) {
                        LOGGER.info("Check and subscribe to tables change under PropertyStore path: {}", (Object)TABLE_CONFIG_PARENT_PATH);
                        this._pinotHelixResourceManager.getPropertyStore().subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, (IZkChildListener)this._zkTableConfigChangeListener);
                        List tables = this._pinotHelixResourceManager.getPropertyStore().getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
                        if (CollectionUtils.isNotEmpty((Collection)tables)) {
                            this.checkTableConfigChanges(tables);
                        }
                        break block7;
                    }
                }
                catch (SchedulerException e) {
                    throw new RuntimeException("Caught exception while setting up the scheduler", e);
                }
            }
            this._scheduler = null;
        }
    }

    public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName, Map<String, String> taskConfigs) throws Exception {
        if (taskName == null) {
            taskName = tableName + "_" + UUID.randomUUID();
            LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
        }
        String minionInstanceTag = taskConfigs.getOrDefault("minionInstanceTag", "minion_untagged");
        this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
        this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
        String parentTaskName = this._helixTaskResourceManager.getParentTaskName(taskType, (String)taskName);
        TaskState taskState = this._helixTaskResourceManager.getTaskState(parentTaskName);
        if (taskState != null) {
            throw new TaskAlreadyExistsException("Task [" + (String)taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
        }
        ArrayList<String> tableNameWithTypes = new ArrayList<String>();
        if (TableNameBuilder.getTableTypeFromTableName((String)tableName) == null) {
            String realtimeTableName;
            String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
            if (this._pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
                tableNameWithTypes.add(offlineTableName);
            }
            if (this._pinotHelixResourceManager.hasRealtimeTable(realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName))) {
                tableNameWithTypes.add(realtimeTableName);
            }
        } else if (this._pinotHelixResourceManager.hasTable(tableName)) {
            tableNameWithTypes.add(tableName);
        }
        if (tableNameWithTypes.isEmpty()) {
            throw new TableNotFoundException("'tableName' " + tableName + " is not found");
        }
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
        if (taskGenerator == null) {
            throw new UnknownTaskTypeException("Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
        }
        HashMap<String, String> responseMap = new HashMap<String, String>();
        for (String tableNameWithType : tableNameWithTypes) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            LOGGER.info("Trying to create tasks of type: {}, table: {}", (Object)taskType, (Object)tableNameWithType);
            List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
            if (pinotTaskConfigs.isEmpty()) {
                LOGGER.warn("No ad-hoc task generated for task type: {}", (Object)taskType);
                continue;
            }
            LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", (Object)taskType, pinotTaskConfigs);
            this._controllerMetrics.addMeteredTableValue(taskType, (AbstractMetrics.Meter)ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1L);
            responseMap.put(tableNameWithType, this._helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask()));
        }
        if (responseMap.isEmpty()) {
            throw new NoTaskScheduledException("No task scheduled for 'tableName': " + tableName);
        }
        return responseMap;
    }

    private void checkTableConfigChanges(List<String> tableNamesWithType) {
        block5: {
            block4: {
                LOGGER.info("Checking task config changes in table configs");
                if (!this._tableTaskSchedulerUpdaterMap.isEmpty()) break block4;
                for (String tableNameWithType : tableNamesWithType) {
                    this.subscribeTableConfigChanges(tableNameWithType);
                }
                break block5;
            }
            HashSet<String> existingTables = new HashSet<String>(this._tableTaskSchedulerUpdaterMap.keySet());
            HashSet<String> tablesToAdd = new HashSet<String>();
            for (String tableNameWithType : tableNamesWithType) {
                if (existingTables.remove(tableNameWithType)) continue;
                tablesToAdd.add(tableNameWithType);
            }
            for (String tableNameWithType : tablesToAdd) {
                this.subscribeTableConfigChanges(tableNameWithType);
            }
            if (existingTables.isEmpty()) break block5;
            LOGGER.info("Found tables to clean up cron task scheduler: {}", existingTables);
            for (String tableNameWithType : existingTables) {
                this.cleanUpCronTaskSchedulerForTable(tableNameWithType);
            }
        }
    }

    private String getPropertyStorePathForTable(String tableWithType) {
        return TABLE_CONFIG_PATH_PREFIX + tableWithType;
    }

    private String getPropertyStorePathForTaskQueue(String taskType) {
        return String.format(TASK_QUEUE_PATH_PATTERN, taskType);
    }

    public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
        LOGGER.info("Cleaning up task in scheduler for table {}", (Object)tableWithType);
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = this._tableTaskSchedulerUpdaterMap.get(tableWithType);
        if (tableTaskSchedulerUpdater != null) {
            this._pinotHelixResourceManager.getPropertyStore().unsubscribeDataChanges(this.getPropertyStorePathForTable(tableWithType), (IZkDataListener)tableTaskSchedulerUpdater);
        }
        this.removeAllTasksFromCronExpressions(tableWithType);
        this._tableTaskSchedulerUpdaterMap.remove(tableWithType);
    }

    private synchronized void removeAllTasksFromCronExpressions(String tableWithType) {
        Set jobKeys;
        try {
            jobKeys = this._scheduler.getJobKeys(GroupMatcher.anyJobGroup());
        }
        catch (SchedulerException e) {
            LOGGER.error("Got exception when fetching all jobKeys", (Throwable)e);
            return;
        }
        for (JobKey jobKey : jobKeys) {
            if (!jobKey.getName().equals(tableWithType)) continue;
            try {
                this._scheduler.deleteJob(jobKey);
                this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, jobKey.getGroup()), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
            }
            catch (SchedulerException e) {
                LOGGER.error("Got exception when deleting the scheduled job - {}", (Object)jobKey, (Object)e);
            }
        }
        this._tableTaskTypeToCronExpressionMap.remove(tableWithType);
    }

    public static String getCronJobName(String tableWithType, String taskType) {
        return String.format("%s.%s", tableWithType, taskType);
    }

    public synchronized void subscribeTableConfigChanges(String tableWithType) {
        if (this._tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
            return;
        }
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
        this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(this.getPropertyStorePathForTable(tableWithType), (IZkDataListener)tableTaskSchedulerUpdater);
        this._tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
        try {
            this.updateCronTaskScheduler(tableWithType);
        }
        catch (Exception e) {
            LOGGER.error("Failed to create cron task in scheduler for table: {}", (Object)tableWithType, (Object)e);
        }
    }

    public synchronized void updateCronTaskScheduler(String tableWithType) {
        LOGGER.info("Trying to update task schedule for table: {}", (Object)tableWithType);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableWithType);
        if (tableConfig == null) {
            LOGGER.info("tableConfig is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
        if (taskConfig == null) {
            LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        Map taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
        if (taskTypeConfigsMap == null) {
            LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", (Object)tableWithType);
            this.removeAllTasksFromCronExpressions(tableWithType);
            return;
        }
        Map<String, String> taskToCronExpressionMap = this.getTaskToCronExpressionMap(taskTypeConfigsMap);
        LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
        this.updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
    }

    private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
        Map<String, String> existingScheduledTasks = this._tableTaskTypeToCronExpressionMap.get(tableWithType);
        if (existingScheduledTasks != null && !existingScheduledTasks.isEmpty()) {
            String newCronExpression;
            for (Map.Entry<String, String> entry : existingScheduledTasks.entrySet()) {
                String existingTaskType = entry.getKey();
                newCronExpression = taskToCronExpressionMap.get(existingTaskType);
                if (newCronExpression == null) {
                    try {
                        this._scheduler.deleteJob(JobKey.jobKey((String)tableWithType, (String)existingTaskType));
                        this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, existingTaskType), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
                    }
                    catch (SchedulerException e) {
                        LOGGER.error("Failed to delete scheduled job for table {}, task type {}", new Object[]{tableWithType, existingTaskType, e});
                    }
                    continue;
                }
                if (entry.getValue().equalsIgnoreCase(newCronExpression)) continue;
                try {
                    TriggerKey triggerKey = TriggerKey.triggerKey((String)tableWithType, (String)existingTaskType);
                    Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)newCronExpression)).build();
                    this._scheduler.rescheduleJob(triggerKey, trigger);
                }
                catch (SchedulerException e) {
                    LOGGER.error("Failed to update scheduled job for table {}, task type {}", new Object[]{tableWithType, existingTaskType, e});
                }
            }
            for (Map.Entry<String, String> entry : taskToCronExpressionMap.entrySet()) {
                String newTaskType = entry.getKey();
                if (existingScheduledTasks.containsKey(newTaskType)) continue;
                newCronExpression = entry.getValue();
                try {
                    this.scheduleJob(tableWithType, newTaskType, newCronExpression);
                }
                catch (SchedulerException e) {
                    LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", new Object[]{tableWithType, newTaskType, newCronExpression, e});
                }
            }
        } else {
            for (String taskType : taskToCronExpressionMap.keySet()) {
                String cronExpr = taskToCronExpressionMap.get(taskType);
                try {
                    this.scheduleJob(tableWithType, taskType, cronExpr);
                }
                catch (SchedulerException e) {
                    LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", new Object[]{tableWithType, taskType, cronExpr, e});
                }
            }
        }
        this._tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
    }

    private void scheduleJob(String tableWithType, String taskType, String cronExprStr) throws SchedulerException {
        boolean exists = false;
        try {
            exists = this._scheduler.checkExists(JobKey.jobKey((String)tableWithType, (String)taskType));
        }
        catch (SchedulerException e) {
            LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", new Object[]{tableWithType, taskType, e});
        }
        if (!exists) {
            LOGGER.info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", new Object[]{cronExprStr, tableWithType, taskType});
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey((String)tableWithType, (String)taskType)).withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)cronExprStr)).build();
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put(PINOT_TASK_MANAGER_KEY, (Object)this);
            jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, (Object)this._leadControllerManager);
            jobDataMap.put(SKIP_LATE_CRON_SCHEDULE, this._skipLateCronSchedule);
            jobDataMap.put(MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, this._maxCronScheduleDelayInSeconds);
            JobDetail jobDetail = JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap).build();
            try {
                this._scheduler.scheduleJob(jobDetail, trigger);
                this._controllerMetrics.addValueToTableGauge(PinotTaskManager.getCronJobName(tableWithType, taskType), (AbstractMetrics.Gauge)ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, 1L);
            }
            catch (Exception e) {
                LOGGER.error("Failed to parse Cron expression - " + cronExprStr, (Throwable)e);
                throw e;
            }
            Date nextRuntime = trigger.getNextFireTime();
            LOGGER.info("Scheduled task for table: {}, task type: {}, next runtime: {}", new Object[]{tableWithType, taskType, nextRuntime});
        }
    }

    private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
        HashMap<String, String> taskToCronExpressionMap = new HashMap<String, String>();
        for (String taskType : taskTypeConfigsMap.keySet()) {
            String cronExprStr;
            Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
            if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY) || (cronExprStr = taskTypeConfig.get(SCHEDULE_KEY)) == null) continue;
            taskToCronExpressionMap.put(taskType, cronExprStr);
        }
        return taskToCronExpressionMap;
    }

    public ClusterInfoAccessor getClusterInfoAccessor() {
        return this._clusterInfoAccessor;
    }

    public TaskGeneratorRegistry getTaskGeneratorRegistry() {
        return this._taskGeneratorRegistry;
    }

    public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
        this._taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
    }

    public synchronized Map<String, String> scheduleTasks() {
        return this.scheduleTasks(this._pinotHelixResourceManager.getAllTables(), false);
    }

    private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
        this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
        HashMap<String, List> enabledTableConfigMap = new HashMap<String, List>();
        for (String tableNameWithType : tableNamesWithType) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            if (tableConfig == null || tableConfig.getTaskConfig() == null) continue;
            Set enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
            for (String enabledTaskType : enabledTaskTypes) {
                enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList()).add(tableConfig);
            }
        }
        HashMap<String, String> tasksScheduled = new HashMap<String, String>();
        for (Map.Entry entry : enabledTableConfigMap.entrySet()) {
            String taskType = (String)entry.getKey();
            List enabledTableConfigs = (List)entry.getValue();
            PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
            if (taskGenerator != null) {
                this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
                this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
                tasksScheduled.put(taskType, this.scheduleTask(taskGenerator, enabledTableConfigs, isLeader));
                continue;
            }
            ArrayList<String> enabledTables = new ArrayList<String>(enabledTableConfigs.size());
            for (TableConfig enabledTableConfig : enabledTableConfigs) {
                enabledTables.add(enabledTableConfig.getTableName());
            }
            LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", (Object)taskType, enabledTables);
            tasksScheduled.put(taskType, null);
        }
        return tasksScheduled;
    }

    @Nullable
    private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, boolean isLeader) {
        List<PinotTaskConfig> pinotTaskConfigs;
        LOGGER.info("Trying to schedule task type: {}, isLeader: {}", (Object)taskGenerator.getTaskType(), (Object)isLeader);
        try {
            pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
            long successRunTimestamp = System.currentTimeMillis();
            for (TableConfig tableConfig : enabledTableConfigs) {
                this._taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(), taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
                this._controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(), (AbstractMetrics.Gauge)ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION, () -> System.currentTimeMillis() - successRunTimestamp);
                this._controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(), (AbstractMetrics.Gauge)ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
            }
        }
        catch (Exception e) {
            StringWriter errors = new StringWriter();
            try (PrintWriter pw = new PrintWriter(errors);){
                e.printStackTrace(pw);
            }
            long successRunTimestamp = System.currentTimeMillis();
            for (TableConfig tableConfig : enabledTableConfigs) {
                this._taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(), taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp, errors.toString()));
                this._controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(), (AbstractMetrics.Gauge)ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 1L);
            }
            throw e;
        }
        if (!isLeader) {
            taskGenerator.nonLeaderCleanUp();
        }
        String taskType = taskGenerator.getTaskType();
        int numTasks = pinotTaskConfigs.size();
        if (numTasks > 0) {
            LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", new Object[]{numTasks, taskType, pinotTaskConfigs});
            this._controllerMetrics.addMeteredTableValue(taskType, (AbstractMetrics.Meter)ControllerMeter.NUMBER_TASKS_SUBMITTED, (long)numTasks);
            return this._helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask());
        }
        LOGGER.info("No task to schedule for task type: {}", (Object)taskType);
        return null;
    }

    public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
        return this.scheduleTasks(Collections.singletonList(tableNameWithType), false);
    }

    @Nullable
    public synchronized String scheduleTask(String taskType) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
        Preconditions.checkState((taskGenerator != null ? 1 : 0) != 0, (String)"Task type: %s is not registered", (Object)taskType);
        ArrayList<TableConfig> enabledTableConfigs = new ArrayList<TableConfig>();
        for (String tableNameWithType : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            if (tableConfig == null || tableConfig.getTaskConfig() == null || !tableConfig.getTaskConfig().isTaskTypeEnabled(taskType)) continue;
            enabledTableConfigs.add(tableConfig);
        }
        this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
        this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
        return this.scheduleTask(taskGenerator, enabledTableConfigs, false);
    }

    @Nullable
    public synchronized String scheduleTask(String taskType, String tableNameWithType) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(taskType);
        Preconditions.checkState((taskGenerator != null ? 1 : 0) != 0, (String)"Task type: %s is not registered", (Object)taskType);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
        Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (String)"Failed to find table config for table: %s", (Object)tableNameWithType);
        Preconditions.checkState((tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType) ? 1 : 0) != 0, (String)"Table: %s does not have task type: %s enabled", (Object)tableNameWithType, (Object)taskType);
        this._helixTaskResourceManager.ensureTaskQueueExists(taskType);
        this.addTaskTypeMetricsUpdaterIfNeeded(taskType);
        return this.scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
    }

    @Override
    protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
        this.scheduleTasks(tableNamesWithType, true);
    }

    public void cleanUpTask() {
        LOGGER.info("Cleaning up all task generators");
        for (String taskType : this._taskGeneratorRegistry.getAllTaskTypes()) {
            this._taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
        }
    }

    @Nullable
    public Scheduler getScheduler() {
        return this._scheduler;
    }

    public synchronized void reportMetrics(String taskType) {
        for (Map.Entry<TaskState, Integer> entry : this._taskStateToCountMap.entrySet()) {
            entry.setValue(0);
        }
        if (this._helixTaskResourceManager.getTaskTypes().contains(taskType)) {
            Map<String, TaskState> taskStates = this._helixTaskResourceManager.getTaskStates(taskType);
            for (TaskState taskState : taskStates.values()) {
                this._taskStateToCountMap.merge(taskState, 1, Integer::sum);
            }
        }
        for (Map.Entry<TaskState, Integer> entry : this._taskStateToCountMap.entrySet()) {
            this._controllerMetrics.setValueOfTableGauge(String.format("%s.%s", taskType, entry.getKey()), (AbstractMetrics.Gauge)ControllerGauge.TASK_STATUS, (long)entry.getValue().intValue());
        }
    }

    private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String taskType) {
        if (!this._taskTypeMetricsUpdaterMap.containsKey(taskType)) {
            TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(taskType, this);
            this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(this.getPropertyStorePathForTaskQueue(taskType), (IZkDataListener)taskTypeMetricsUpdater);
            this._taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
        }
    }

    private class ZkTableConfigChangeListener
    implements IZkChildListener {
        private ZkTableConfigChangeListener() {
        }

        public synchronized void handleChildChange(String path, List<String> tableNamesWithType) {
            PinotTaskManager.this.checkTableConfigChanges(tableNamesWithType);
        }
    }
}

