/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.minion.generator;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
public class SegmentGenerationAndPushTaskGenerator
implements PinotTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
    private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = BatchConfigProperties.SegmentPushType.TAR;
    private ClusterInfoAccessor _clusterInfoAccessor;

    @Override
    public void init(ClusterInfoAccessor clusterInfoAccessor) {
        this._clusterInfoAccessor = clusterInfoAccessor;
    }

    @Override
    public String getTaskType() {
        return "SegmentGenerationAndPushTask";
    }

    @Override
    public int getNumConcurrentTasksPerInstance() {
        String numConcurrentTasksPerInstanceStr = this._clusterInfoAccessor.getClusterConfig("SegmentGenerationAndPushTask.numConcurrentTasksPerInstance");
        if (numConcurrentTasksPerInstanceStr != null) {
            try {
                return Integer.parseInt(numConcurrentTasksPerInstanceStr);
            }
            catch (Exception e) {
                LOGGER.error("Failed to parse cluster config: {}", (Object)"SegmentGenerationAndPushTask.numConcurrentTasksPerInstance", (Object)e);
            }
        }
        return 1;
    }

    @Override
    public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
        ArrayList<PinotTaskConfig> pinotTaskConfigs = new ArrayList<PinotTaskConfig>();
        for (TableConfig tableConfig : tableConfigs) {
            int tableNumTasks;
            int tableMaxNumTasks;
            String offlineTableName = tableConfig.getTableName();
            if (tableConfig.getTableType() != TableType.OFFLINE) {
                LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", (Object)offlineTableName);
                continue;
            }
            TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
            Preconditions.checkNotNull((Object)tableTaskConfig);
            Map taskConfigs = tableTaskConfig.getConfigsForTaskType("SegmentGenerationAndPushTask");
            Preconditions.checkNotNull((Object)taskConfigs, (String)"Task config shouldn't be null for Table: {}", (Object)offlineTableName);
            String tableMaxNumTasksConfig = (String)taskConfigs.get("tableMaxNumTasks");
            if (tableMaxNumTasksConfig != null) {
                try {
                    tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
                }
                catch (NumberFormatException e) {
                    tableMaxNumTasks = Integer.MAX_VALUE;
                }
            } else {
                tableMaxNumTasks = Integer.MAX_VALUE;
            }
            if ((tableNumTasks = 0) == tableMaxNumTasks) break;
            String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType((TableConfig)tableConfig);
            String batchSegmentIngestionFrequency = IngestionConfigUtils.getBatchSegmentIngestionFrequency((TableConfig)tableConfig);
            BatchIngestionConfig batchIngestionConfig = tableConfig.getIngestionConfig().getBatchIngestionConfig();
            List batchConfigMaps = batchIngestionConfig.getBatchConfigMaps();
            for (Map batchConfigMap : batchConfigMaps) {
                try {
                    URI inputDirURI = SegmentGenerationUtils.getDirectoryURI((String)((String)batchConfigMap.get("inputDirURI")));
                    this.updateRecordReaderConfigs(batchConfigMap);
                    List<OfflineSegmentZKMetadata> offlineSegmentsMetadata = Collections.emptyList();
                    if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) {
                        offlineSegmentsMetadata = this._clusterInfoAccessor.getOfflineSegmentsMetadata(offlineTableName);
                    }
                    Set<String> existingSegmentInputFiles = this.getExistingSegmentInputFiles(offlineSegmentsMetadata);
                    Set<String> inputFilesFromRunningTasks = this.getInputFilesFromRunningTasks();
                    existingSegmentInputFiles.addAll(inputFilesFromRunningTasks);
                    LOGGER.info("Trying to extract input files from path: {}, and exclude input files from existing segments metadata: {}, and input files from running tasks: {}", new Object[]{inputDirURI, existingSegmentInputFiles, inputFilesFromRunningTasks});
                    List<URI> inputFileURIs = this.getInputFilesFromDirectory(batchConfigMap, inputDirURI, existingSegmentInputFiles);
                    LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
                    for (URI inputFileURI : inputFileURIs) {
                        Map<String, String> singleFileGenerationTaskConfig = this.getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI);
                        pinotTaskConfigs.add(new PinotTaskConfig("SegmentGenerationAndPushTask", singleFileGenerationTaskConfig));
                        if (++tableNumTasks != tableMaxNumTasks) continue;
                    }
                }
                catch (Exception e) {
                    LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]", new Object[]{tableConfig, taskConfigs, e});
                }
            }
        }
        return pinotTaskConfigs;
    }

    private Set<String> getInputFilesFromRunningTasks() {
        HashSet<String> inputFilesFromRunningTasks = new HashSet<String>();
        Map<String, TaskState> taskStates = this._clusterInfoAccessor.getTaskStates("SegmentGenerationAndPushTask");
        block3: for (String taskName : taskStates.keySet()) {
            switch (taskStates.get(taskName)) {
                case FAILED: 
                case ABORTED: 
                case STOPPED: 
                case COMPLETED: {
                    continue block3;
                }
            }
            List<PinotTaskConfig> taskConfigs = this._clusterInfoAccessor.getTaskConfigs(taskName);
            for (PinotTaskConfig taskConfig : taskConfigs) {
                String inputFileURI;
                if (!"SegmentGenerationAndPushTask".equalsIgnoreCase(taskConfig.getTaskType()) || (inputFileURI = (String)taskConfig.getConfigs().get("input.data.file.uri")) == null) continue;
                inputFilesFromRunningTasks.add(inputFileURI);
            }
        }
        return inputFilesFromRunningTasks;
    }

    private Map<String, String> getSingleFileGenerationTaskConfig(String offlineTableName, int sequenceID, Map<String, String> batchConfigMap, URI inputFileURI) throws URISyntaxException {
        URI inputDirURI = SegmentGenerationUtils.getDirectoryURI((String)batchConfigMap.get("inputDirURI"));
        URI outputDirURI = null;
        if (batchConfigMap.containsKey("outputDirURI")) {
            outputDirURI = SegmentGenerationUtils.getDirectoryURI((String)batchConfigMap.get("outputDirURI"));
        }
        String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
        HashMap<String, String> singleFileGenerationTaskConfig = new HashMap<String, String>(batchConfigMap);
        singleFileGenerationTaskConfig.put("tableName", TableNameBuilder.OFFLINE.tableNameWithType(offlineTableName));
        singleFileGenerationTaskConfig.put("input.data.file.uri", inputFileURI.toString());
        if (outputDirURI != null) {
            URI outputSegmentDirURI = SegmentGenerationUtils.getRelativeOutputPath((URI)inputDirURI, (URI)inputFileURI, (URI)outputDirURI);
            singleFileGenerationTaskConfig.put("output.segment.dir.uri", outputSegmentDirURI.toString());
        }
        singleFileGenerationTaskConfig.put("sequenceId", String.valueOf(sequenceID));
        singleFileGenerationTaskConfig.put("segmentNameGenerator.type", "simple");
        if (outputDirURI == null || pushMode == null) {
            singleFileGenerationTaskConfig.put("push.mode", DEFAULT_SEGMENT_PUSH_TYPE.toString());
        } else {
            singleFileGenerationTaskConfig.put("push.mode", pushMode);
        }
        singleFileGenerationTaskConfig.put("push.controllerUri", this._clusterInfoAccessor.getVipUrl());
        return singleFileGenerationTaskConfig;
    }

    private void updateRecordReaderConfigs(Map<String, String> batchConfigMap) {
        String recordReaderConfigClassName;
        String inputFormat = batchConfigMap.get("inputFormat");
        String recordReaderClassName = PluginManager.get().getRecordReaderClassName(inputFormat);
        if (recordReaderClassName != null) {
            batchConfigMap.putIfAbsent("recordReader.className", recordReaderClassName);
        }
        if ((recordReaderConfigClassName = PluginManager.get().getRecordReaderConfigClassName(inputFormat)) != null) {
            batchConfigMap.putIfAbsent("recordReader.configClassName", recordReaderConfigClassName);
        }
    }

    private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI, Set<String> existingSegmentInputFileURIs) {
        String[] files;
        String inputDirURIScheme = inputDirURI.getScheme();
        if (!PinotFSFactory.isSchemeSupported((String)inputDirURIScheme)) {
            String fsClass = batchConfigMap.get("input.fs.className");
            PinotConfiguration fsProps = IngestionConfigUtils.getInputFsProps(batchConfigMap);
            PinotFSFactory.register((String)inputDirURIScheme, (String)fsClass, (PinotConfiguration)fsProps);
        }
        PinotFS inputDirFS = PinotFSFactory.create((String)inputDirURIScheme);
        String includeFileNamePattern = batchConfigMap.get("includeFileNamePattern");
        String excludeFileNamePattern = batchConfigMap.get("excludeFileNamePattern");
        try {
            files = inputDirFS.listFiles(inputDirURI, true);
        }
        catch (IOException e) {
            LOGGER.error("Unable to list files under URI: " + inputDirURI, (Throwable)e);
            return Collections.emptyList();
        }
        PathMatcher includeFilePathMatcher = null;
        if (includeFileNamePattern != null) {
            includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includeFileNamePattern);
        }
        PathMatcher excludeFilePathMatcher = null;
        if (excludeFileNamePattern != null) {
            excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludeFileNamePattern);
        }
        ArrayList<URI> inputFileURIs = new ArrayList<URI>();
        for (String file : files) {
            LOGGER.debug("Processing file: {}", (Object)file);
            if (includeFilePathMatcher != null && !includeFilePathMatcher.matches(Paths.get(file, new String[0]))) {
                LOGGER.debug("Exclude file {} as it's not matching includeFilePathMatcher: {}", (Object)file, (Object)includeFileNamePattern);
                continue;
            }
            if (excludeFilePathMatcher != null && excludeFilePathMatcher.matches(Paths.get(file, new String[0]))) {
                LOGGER.debug("Exclude file {} as it's matching excludeFilePathMatcher: {}", (Object)file, (Object)excludeFileNamePattern);
                continue;
            }
            try {
                URI inputFileURI = SegmentGenerationUtils.getFileURI((String)file, (URI)inputDirURI);
                if (existingSegmentInputFileURIs.contains(inputFileURI.toString())) {
                    LOGGER.debug("Skipping already processed inputFileURI: {}", (Object)inputFileURI);
                    continue;
                }
                if (inputDirFS.isDirectory(inputFileURI)) {
                    LOGGER.debug("Skipping directory: {}", (Object)inputFileURI);
                    continue;
                }
                inputFileURIs.add(inputFileURI);
            }
            catch (Exception e) {
                LOGGER.error("Failed to construct inputFileURI for path: {}, parent directory URI: {}", new Object[]{file, inputDirURI, e});
            }
        }
        return inputFileURIs;
    }

    private Set<String> getExistingSegmentInputFiles(List<OfflineSegmentZKMetadata> offlineSegmentsMetadata) {
        HashSet<String> existingSegmentInputFiles = new HashSet<String>();
        for (OfflineSegmentZKMetadata metadata : offlineSegmentsMetadata) {
            if (metadata.getCustomMap() == null || !metadata.getCustomMap().containsKey("input.data.file.uri")) continue;
            existingSegmentInputFiles.add((String)metadata.getCustomMap().get("input.data.file.uri"));
        }
        return existingSegmentInputFiles;
    }
}

