/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.minion.generator;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
public class RealtimeToOfflineSegmentsTaskGenerator
implements PinotTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
    private static final String DEFAULT_BUCKET_PERIOD = "1d";
    private static final String DEFAULT_BUFFER_PERIOD = "2d";
    private ClusterInfoAccessor _clusterInfoAccessor;

    @Override
    public void init(ClusterInfoAccessor clusterInfoAccessor) {
        this._clusterInfoAccessor = clusterInfoAccessor;
    }

    @Override
    public String getTaskType() {
        return "RealtimeToOfflineSegmentsTask";
    }

    @Override
    public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
        String taskType = "RealtimeToOfflineSegmentsTask";
        ArrayList<PinotTaskConfig> pinotTaskConfigs = new ArrayList<PinotTaskConfig>();
        for (TableConfig tableConfig : tableConfigs) {
            String collectorTypeConfig;
            String realtimeTableName = tableConfig.getTableName();
            if (tableConfig.getTableType() != TableType.REALTIME) {
                LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", (Object)taskType, (Object)realtimeTableName);
                continue;
            }
            StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig));
            if (streamConfig.hasHighLevelConsumerType()) {
                LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", (Object)taskType, (Object)realtimeTableName);
                continue;
            }
            LOGGER.info("Start generating task configs for table: {} for task: {}", (Object)realtimeTableName, (Object)taskType);
            Map<String, TaskState> incompleteTasks = TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, this._clusterInfoAccessor);
            if (!incompleteTasks.isEmpty()) {
                LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(), (Object)realtimeTableName);
                continue;
            }
            ArrayList<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<LLCRealtimeSegmentZKMetadata>();
            HashMap<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<Integer, String>();
            HashSet<Integer> allPartitions = new HashSet<Integer>();
            this.getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName, allPartitions);
            if (completedSegmentsMetadata.isEmpty()) {
                LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}", (Object)realtimeTableName, (Object)taskType);
                continue;
            }
            allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
            if (!allPartitions.isEmpty()) {
                LOGGER.info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", new Object[]{allPartitions, realtimeTableName, taskType});
                continue;
            }
            TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
            Preconditions.checkState((tableTaskConfig != null ? 1 : 0) != 0);
            Map taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
            Preconditions.checkState((taskConfigs != null ? 1 : 0) != 0, (String)"Task config shouldn't be null for table: {}", (Object)realtimeTableName);
            String bucketTimePeriod = taskConfigs.getOrDefault("bucketTimePeriod", DEFAULT_BUCKET_PERIOD);
            String bufferTimePeriod = taskConfigs.getOrDefault("bufferTimePeriod", DEFAULT_BUFFER_PERIOD);
            long bucketMs = TimeUtils.convertPeriodToMillis((String)bucketTimePeriod);
            long bufferMs = TimeUtils.convertPeriodToMillis((String)bufferTimePeriod);
            long windowStartMs = this.getWatermarkMs(realtimeTableName, completedSegmentsMetadata, bucketMs);
            long windowEndMs = windowStartMs + bucketMs;
            if (windowEndMs > System.currentTimeMillis() - bufferMs) {
                LOGGER.info("Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task generation: {}", new Object[]{windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType});
                continue;
            }
            ArrayList<String> segmentNames = new ArrayList<String>();
            ArrayList<String> downloadURLs = new ArrayList<String>();
            HashSet lastCompletedSegmentPerPartition = new HashSet(partitionToLatestCompletedSegmentName.values());
            boolean skipGenerate = false;
            for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
                String segmentName = realtimeSegmentZKMetadata.getSegmentName();
                long segmentStartTimeMs = realtimeSegmentZKMetadata.getStartTimeMs();
                long segmentEndTimeMs = realtimeSegmentZKMetadata.getEndTimeMs();
                if (windowStartMs > segmentEndTimeMs || segmentStartTimeMs >= windowEndMs) continue;
                if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
                    LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task generation: {}", (Object)segmentName, (Object)taskType);
                    skipGenerate = true;
                    break;
                }
                segmentNames.add(segmentName);
                downloadURLs.add(realtimeSegmentZKMetadata.getDownloadUrl());
            }
            if (segmentNames.isEmpty() || skipGenerate) {
                LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", new Object[]{taskType, windowStartMs, windowEndMs});
                continue;
            }
            HashMap<String, String> configs = new HashMap<String, String>();
            configs.put("tableName", realtimeTableName);
            configs.put("segmentName", StringUtils.join(segmentNames, (String)","));
            configs.put("downloadURL", StringUtils.join(downloadURLs, (String)","));
            configs.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
            configs.put("windowStartMs", String.valueOf(windowStartMs));
            configs.put("windowEndMs", String.valueOf(windowEndMs));
            String timeColumnTransformationConfig = (String)taskConfigs.get("timeColumnTransformFunction");
            if (timeColumnTransformationConfig != null) {
                configs.put("timeColumnTransformFunction", timeColumnTransformationConfig);
            }
            if ((collectorTypeConfig = (String)taskConfigs.get("collectorType")) != null) {
                configs.put("collectorType", collectorTypeConfig);
            }
            for (Map.Entry entry : taskConfigs.entrySet()) {
                if (!((String)entry.getKey()).endsWith(".aggregationType")) continue;
                configs.put((String)entry.getKey(), (String)entry.getValue());
            }
            String maxNumRecordsPerSegmentConfig = (String)taskConfigs.get("maxNumRecordsPerSegment");
            if (maxNumRecordsPerSegmentConfig != null) {
                configs.put("maxNumRecordsPerSegment", maxNumRecordsPerSegmentConfig);
            }
            pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
            LOGGER.info("Finished generating task configs for table: {} for task: {}", (Object)realtimeTableName, (Object)taskType);
        }
        return pinotTaskConfigs;
    }

    private void getCompletedSegmentsInfo(String realtimeTableName, List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList, Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer> allPartitions) {
        List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList = this._clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(realtimeTableName);
        HashMap<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<Integer, LLCSegmentName>();
        for (LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata : realtimeSegmentsMetadataList) {
            LLCSegmentName llcSegmentName = new LLCSegmentName(lLCRealtimeSegmentZKMetadata.getSegmentName());
            allPartitions.add(llcSegmentName.getPartitionId());
            if (!lLCRealtimeSegmentZKMetadata.getStatus().equals((Object)CommonConstants.Segment.Realtime.Status.DONE)) continue;
            completedSegmentsMetadataList.add(lLCRealtimeSegmentZKMetadata);
            latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
                if (latestLLCSegmentName == null) {
                    return llcSegmentName;
                }
                if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
                    return llcSegmentName;
                }
                return latestLLCSegmentName;
            });
        }
        for (Map.Entry entry : latestLLCSegmentNameMap.entrySet()) {
            partitionToLatestCompletedSegmentName.put((Integer)entry.getKey(), ((LLCSegmentName)entry.getValue()).getSegmentName());
        }
    }

    private long getWatermarkMs(String realtimeTableName, List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata, long bucketMs) {
        RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = this._clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
        if (realtimeToOfflineSegmentsTaskMetadata == null) {
            long minStartTimeMs = Long.MAX_VALUE;
            for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
                minStartTimeMs = Math.min(minStartTimeMs, realtimeSegmentZKMetadata.getStartTimeMs());
            }
            Preconditions.checkState((minStartTimeMs != Long.MAX_VALUE ? 1 : 0) != 0);
            long watermarkMs = minStartTimeMs / bucketMs * bucketMs;
            realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
            this._clusterInfoAccessor.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
        }
        return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
    }
}

