package org.apache.iotdb.db.pipe.extractor.dataregion.historical;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.class */
public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
    private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap();
    private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
    private String pipeName;
    private PipeTaskMeta pipeTaskMeta;
    private ProgressIndex startIndex;
    private int dataRegionId;
    private PipePattern pipePattern;
    private boolean isDbNameCoveredByPattern = false;
    private boolean isHistoricalExtractorEnabled = false;
    private long historicalDataExtractionStartTime = Long.MIN_VALUE;
    private long historicalDataExtractionEndTime = WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
    private long historicalDataExtractionTimeLowerBound;
    private boolean sloppyTimeRange;
    private boolean shouldExtractInsertion;
    private boolean shouldTransferModFile;
    private Queue<TsFileResource> pendingQueue;

    public void validate(PipeParameterValidator pipeParameterValidator) {
        PipeParameters parameters = pipeParameterValidator.getParameters();
        if (parameters.hasAnyAttributes(new String[]{"source.start-time", "extractor.start-time", "source.end-time", "extractor.end-time"})) {
            this.isHistoricalExtractorEnabled = true;
            try {
                this.historicalDataExtractionStartTime = parameters.hasAnyAttributes(new String[]{"source.start-time", "extractor.start-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.start-time", "extractor.start-time"})) : Long.MIN_VALUE;
                this.historicalDataExtractionEndTime = parameters.hasAnyAttributes(new String[]{"source.end-time", "extractor.end-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.end-time", "extractor.end-time"})) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
                if (this.historicalDataExtractionStartTime > this.historicalDataExtractionEndTime) {
                    throw new PipeParameterNotValidException(String.format("%s or %s should be less than or equal to %s or %s.", "source.start-time", "extractor.start-time", "source.end-time", "extractor.end-time"));
                }
                return;
            } catch (Exception e) {
                throw new PipeParameterNotValidException(e.getMessage());
            }
        }
        this.isHistoricalExtractorEnabled = parameters.getBooleanOrDefault("__system.restart", false) || parameters.getBooleanOrDefault(Arrays.asList("extractor.history.enable", "source.history.enable"), true);
        try {
            this.historicalDataExtractionStartTime = (this.isHistoricalExtractorEnabled && parameters.hasAnyAttributes(new String[]{"extractor.history.start-time", "source.history.start-time"})) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"extractor.history.start-time", "source.history.start-time"})) : Long.MIN_VALUE;
            this.historicalDataExtractionEndTime = (this.isHistoricalExtractorEnabled && parameters.hasAnyAttributes(new String[]{"extractor.history.end-time", "source.history.end-time"})) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"extractor.history.end-time", "source.history.end-time"})) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
            if (this.historicalDataExtractionStartTime > this.historicalDataExtractionEndTime) {
                throw new PipeParameterNotValidException(String.format("%s (%s) should be less than or equal to %s (%s).", "extractor.history.start-time", "source.history.start-time", "extractor.history.end-time", "source.history.end-time"));
            }
            this.shouldTransferModFile = parameters.getBooleanOrDefault(Arrays.asList("source.mods.enable", "extractor.mods.enable"), ((Boolean) DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters).getRight()).booleanValue());
        } catch (Exception e2) {
            throw new PipeParameterNotValidException(e2.getMessage());
        }
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws IllegalPathException {
        this.shouldExtractInsertion = ((Boolean) DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeParameters).getLeft()).booleanValue();
        if (this.shouldExtractInsertion) {
            PipeTaskExtractorRuntimeEnvironment runtimeEnvironment = pipeExtractorRuntimeConfiguration.getRuntimeEnvironment();
            this.pipeName = runtimeEnvironment.getPipeName();
            this.pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
            this.startIndex = runtimeEnvironment.getPipeTaskMeta().getProgressIndex();
            this.dataRegionId = runtimeEnvironment.getRegionId();
            synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
                DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(Integer.valueOf(this.dataRegionId), 0L);
            }
            this.pipePattern = PipePattern.parsePipePatternFromSourceParameters(pipeParameters);
            DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(runtimeEnvironment.getRegionId()));
            if (Objects.nonNull(dataRegion)) {
                String databaseName = dataRegion.getDatabaseName();
                if (Objects.nonNull(databaseName)) {
                    this.isDbNameCoveredByPattern = this.pipePattern.coversDb(databaseName);
                }
            }
            this.historicalDataExtractionTimeLowerBound = this.isHistoricalExtractorEnabled ? Long.MIN_VALUE : runtimeEnvironment.getCreationTime();
            if (this.historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
                synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
                    if (System.currentTimeMillis() - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(Integer.valueOf(this.dataRegionId)).longValue() >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
                        flushDataRegionAllTsFiles();
                        DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis()));
                    }
                }
            }
            this.sloppyTimeRange = ((Set) Arrays.stream(pipeParameters.getStringOrDefault(Arrays.asList("extractor.history.loose-range", "source.history.loose-range"), SubStringFunctionColumnTransformer.EMPTY_STRING).split(",")).map((v0) -> {
                return v0.trim();
            }).map((v0) -> {
                return v0.toLowerCase();
            }).collect(Collectors.toSet())).contains(SqlConstant.RESERVED_TIME);
            LOGGER.info("Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), DateTimeUtils.convertLongToDate(this.historicalDataExtractionStartTime), Long.valueOf(this.historicalDataExtractionStartTime), DateTimeUtils.convertLongToDate(this.historicalDataExtractionEndTime), Long.valueOf(this.historicalDataExtractionEndTime), Boolean.valueOf(this.sloppyTimeRange)});
        }
    }

    private void flushDataRegionAllTsFiles() {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
        if (Objects.isNull(dataRegion)) {
            return;
        }
        dataRegion.writeLock("Pipe: create historical TsFile extractor");
        try {
            dataRegion.syncCloseAllWorkingTsFileProcessors();
        } finally {
            dataRegion.writeUnlock();
        }
    }

    public synchronized void start() {
        if (this.shouldExtractInsertion) {
            DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
            if (Objects.isNull(dataRegion)) {
                this.pendingQueue = new ArrayDeque();
                return;
            }
            dataRegion.writeLock("Pipe: start to extract historical TsFile");
            long currentTimeMillis = System.currentTimeMillis();
            try {
                LOGGER.info("Pipe {}@{}: start to flush data region", this.pipeName, Integer.valueOf(this.dataRegionId));
                synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
                    long longValue = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(Integer.valueOf(this.dataRegionId)).longValue();
                    if (System.currentTimeMillis() - longValue >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
                        dataRegion.syncCloseAllWorkingTsFileProcessors();
                        DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis()));
                        LOGGER.info("Pipe {}@{}: finish to flush data region, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                    } else {
                        LOGGER.info("Pipe {}@{}: skip to flush data region, last flushed time {} ms ago", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - longValue)});
                    }
                }
                TsFileManager tsFileManager = dataRegion.getTsFileManager();
                tsFileManager.readLock();
                try {
                    int size = tsFileManager.size(true);
                    int size2 = tsFileManager.size(false);
                    ArrayList arrayList = new ArrayList(size + size2);
                    LOGGER.info("Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, original unsequence file count {}, start progress index {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Integer.valueOf(size), Integer.valueOf(size2), this.startIndex});
                    Collection collection = (Collection) tsFileManager.getTsFileList(true).stream().filter(tsFileResource -> {
                        return !tsFileResource.isClosed() || (mayTsFileContainUnprocessedData(tsFileResource) && isTsFileResourceOverlappedWithTimeRange(tsFileResource) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource));
                    }).collect(Collectors.toList());
                    arrayList.addAll(collection);
                    Collection collection2 = (Collection) tsFileManager.getTsFileList(false).stream().filter(tsFileResource2 -> {
                        return !tsFileResource2.isClosed() || (mayTsFileContainUnprocessedData(tsFileResource2) && isTsFileResourceOverlappedWithTimeRange(tsFileResource2) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource2));
                    }).collect(Collectors.toList());
                    arrayList.addAll(collection2);
                    arrayList.forEach(tsFileResource3 -> {
                        try {
                            PipeResourceManager.tsfile().pinTsFileResource(tsFileResource3, this.shouldTransferModFile);
                        } catch (IOException e) {
                            LOGGER.warn("Pipe: failed to pin TsFileResource {}", tsFileResource3.getTsFilePath());
                        }
                    });
                    arrayList.sort((tsFileResource4, tsFileResource5) -> {
                        return this.startIndex instanceof TimeWindowStateProgressIndex ? Long.compare(tsFileResource4.getFileStartTime(), tsFileResource5.getFileStartTime()) : tsFileResource4.getMaxProgressIndex().topologicalCompareTo(tsFileResource5.getMaxProgressIndex());
                    });
                    this.pendingQueue = new ArrayDeque(arrayList);
                    LOGGER.info("Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, extracted unsequence file count {}/{}, extracted file count {}/{}, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Integer.valueOf(collection.size()), Integer.valueOf(size), Integer.valueOf(collection2.size()), Integer.valueOf(size2), Integer.valueOf(arrayList.size()), Integer.valueOf(size + size2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                    tsFileManager.readUnlock();
                } catch (Throwable th) {
                    tsFileManager.readUnlock();
                    throw th;
                }
            } finally {
                dataRegion.writeUnlock();
            }
        }
    }

    private boolean mayTsFileContainUnprocessedData(TsFileResource tsFileResource) {
        if (this.startIndex instanceof TimeWindowStateProgressIndex) {
            return this.startIndex.getMinTime() <= tsFileResource.getFileEndTime();
        }
        if (!(this.startIndex instanceof StateProgressIndex)) {
            return !this.startIndex.isAfter(tsFileResource.getMaxProgressIndexAfterClose());
        }
        ProgressIndex innerProgressIndex = this.startIndex.getInnerProgressIndex();
        return (innerProgressIndex.isAfter(tsFileResource.getMaxProgressIndexAfterClose()) || innerProgressIndex.equals(tsFileResource.getMaxProgressIndexAfterClose())) ? false : true;
    }

    private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource tsFileResource) {
        return tsFileResource.getFileEndTime() >= this.historicalDataExtractionStartTime && this.historicalDataExtractionEndTime >= tsFileResource.getFileStartTime();
    }

    private boolean isTsFileResourceCoveredByTimeRange(TsFileResource tsFileResource) {
        return this.historicalDataExtractionStartTime <= tsFileResource.getFileStartTime() && this.historicalDataExtractionEndTime >= tsFileResource.getFileEndTime();
    }

    private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource tsFileResource) {
        try {
            return this.historicalDataExtractionTimeLowerBound <= TsFileNameGenerator.getTsFileName(tsFileResource.getTsFile().getName()).getTime();
        } catch (IOException e) {
            LOGGER.warn("Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway (historical data extraction time lower bound: {})", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath(), Long.valueOf(this.historicalDataExtractionTimeLowerBound), e});
            return true;
        }
    }

    public synchronized Event supply() {
        TsFileResource poll;
        if (Objects.isNull(this.pendingQueue) || (poll = this.pendingQueue.poll()) == null) {
            return null;
        }
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = new PipeTsFileInsertionEvent(poll, this.shouldTransferModFile, false, false, this.pipeName, this.pipeTaskMeta, this.pipePattern, this.historicalDataExtractionStartTime, this.historicalDataExtractionEndTime);
        if (this.isDbNameCoveredByPattern) {
            pipeTsFileInsertionEvent.skipParsingPattern();
        }
        if (this.sloppyTimeRange || isTsFileResourceCoveredByTimeRange(poll)) {
            pipeTsFileInsertionEvent.skipParsingTime();
        }
        pipeTsFileInsertionEvent.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
        try {
            PipeResourceManager.tsfile().unpinTsFileResource(poll);
        } catch (IOException e) {
            LOGGER.warn("Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), poll.getTsFilePath()});
        }
        return pipeTsFileInsertionEvent;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor
    public synchronized boolean hasConsumedAll() {
        return Objects.isNull(this.pendingQueue) || this.pendingQueue.isEmpty();
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor
    public int getPendingQueueSize() {
        if (Objects.nonNull(this.pendingQueue)) {
            return this.pendingQueue.size();
        }
        return 0;
    }

    public synchronized void close() {
        if (Objects.nonNull(this.pendingQueue)) {
            this.pendingQueue.forEach(tsFileResource -> {
                try {
                    PipeResourceManager.tsfile().unpinTsFileResource(tsFileResource);
                } catch (IOException e) {
                    LOGGER.warn("Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath()});
                }
            });
            this.pendingQueue.clear();
            this.pendingQueue = null;
        }
    }
}
