package org.apache.iotdb.db.pipe.extractor.historical;

import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.class */
public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
    private PipeTaskMeta pipeTaskMeta;
    private ProgressIndex startIndex;
    private int dataRegionId;
    private String pattern;
    private long historicalDataExtractionStartTime;
    private long historicalDataExtractionEndTime;
    private long historicalDataExtractionTimeLowerBound;
    private Queue<PipeTsFileInsertionEvent> pendingQueue;

    public void validate(PipeParameterValidator pipeParameterValidator) {
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) {
        PipeTaskExtractorRuntimeEnvironment pipeTaskExtractorRuntimeEnvironment = (PipeTaskExtractorRuntimeEnvironment) pipeExtractorRuntimeConfiguration.getRuntimeEnvironment();
        this.pipeTaskMeta = pipeTaskExtractorRuntimeEnvironment.getPipeTaskMeta();
        this.startIndex = pipeTaskExtractorRuntimeEnvironment.getPipeTaskMeta().getProgressIndex();
        this.dataRegionId = pipeTaskExtractorRuntimeEnvironment.getRegionId();
        this.pattern = pipeParameters.getStringOrDefault(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root");
        boolean booleanOrDefault = pipeParameters.getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, true);
        this.historicalDataExtractionStartTime = (booleanOrDefault && pipeParameters.hasAttribute(PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME)) ? DateTimeUtils.convertDatetimeStrToLong(pipeParameters.getString(PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME), ZoneId.systemDefault()) : Long.MIN_VALUE;
        this.historicalDataExtractionEndTime = (booleanOrDefault && pipeParameters.hasAttribute(PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME)) ? DateTimeUtils.convertDatetimeStrToLong(pipeParameters.getString(PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME), ZoneId.systemDefault()) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
        this.historicalDataExtractionTimeLowerBound = pipeParameters.getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, true) ? Long.MIN_VALUE : pipeTaskExtractorRuntimeEnvironment.getCreationTime();
        if (this.historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
            flushDataRegionAllTsFiles();
        }
    }

    private void flushDataRegionAllTsFiles() {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
        if (dataRegion == null) {
            return;
        }
        dataRegion.writeLock("Pipe: create historical TsFile extractor");
        try {
            dataRegion.syncCloseAllWorkingTsFileProcessors();
        } finally {
            dataRegion.writeUnlock();
        }
    }

    public synchronized void start() {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
        if (dataRegion == null) {
            this.pendingQueue = new ArrayDeque();
            return;
        }
        dataRegion.writeLock("Pipe: start to extract historical TsFile");
        try {
            dataRegion.syncCloseAllWorkingTsFileProcessors();
            TsFileManager tsFileManager = dataRegion.getTsFileManager();
            tsFileManager.readLock();
            try {
                this.pendingQueue = new ArrayDeque(tsFileManager.size(true) + tsFileManager.size(false));
                this.pendingQueue.addAll((Collection) tsFileManager.getTsFileList(true).stream().filter(tsFileResource -> {
                    return !this.startIndex.isAfter(tsFileResource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(tsFileResource) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource);
                }).map(tsFileResource2 -> {
                    return new PipeTsFileInsertionEvent(tsFileResource2, this.pipeTaskMeta, this.pattern, this.historicalDataExtractionStartTime, this.historicalDataExtractionEndTime);
                }).collect(Collectors.toList()));
                this.pendingQueue.addAll((Collection) tsFileManager.getTsFileList(false).stream().filter(tsFileResource3 -> {
                    return !this.startIndex.isAfter(tsFileResource3.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(tsFileResource3) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource3);
                }).map(tsFileResource4 -> {
                    return new PipeTsFileInsertionEvent(tsFileResource4, this.pipeTaskMeta, this.pattern, this.historicalDataExtractionStartTime, this.historicalDataExtractionEndTime);
                }).collect(Collectors.toList()));
                this.pendingQueue.forEach(pipeTsFileInsertionEvent -> {
                    pipeTsFileInsertionEvent.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
                });
                tsFileManager.readUnlock();
            } catch (Throwable th) {
                tsFileManager.readUnlock();
                throw th;
            }
        } finally {
            dataRegion.writeUnlock();
        }
    }

    private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource tsFileResource) {
        return tsFileResource.getFileEndTime() >= this.historicalDataExtractionStartTime && this.historicalDataExtractionEndTime >= tsFileResource.getFileStartTime();
    }

    private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource tsFileResource) {
        try {
            return this.historicalDataExtractionTimeLowerBound <= TsFileNameGenerator.getTsFileName(tsFileResource.getTsFile().getName()).getTime();
        } catch (IOException e) {
            LOGGER.warn(String.format("failed to get the generation time of TsFile %s, extract it anyway", tsFileResource.getTsFilePath()), e);
            return true;
        }
    }

    public Event supply() {
        if (this.pendingQueue == null) {
            return null;
        }
        return this.pendingQueue.poll();
    }

    @Override // org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor
    public synchronized boolean hasConsumedAll() {
        return this.pendingQueue != null && this.pendingQueue.isEmpty();
    }

    public void close() {
        if (this.pendingQueue != null) {
            this.pendingQueue.clear();
            this.pendingQueue = null;
        }
    }
}
