package org.apache.iotdb.db.engine.merge.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.class */
class MergeMultiChunkTask {
    private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
    private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
    private MergeLogger mergeLogger;
    private List<Path> unmergedSeries;
    private String taskName;
    private MergeResource resource;
    private TimeValuePair[] currTimeValuePairs;
    private boolean fullMerge;
    private MergeContext mergeContext;
    private int mergedSeriesCnt;
    private double progress;
    private int concurrentMergeSeriesNum;
    private AtomicInteger mergedChunkNum = new AtomicInteger();
    private AtomicInteger unmergedChunkNum = new AtomicInteger();
    private List<Path> currMergingPaths = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public MergeMultiChunkTask(MergeContext mergeContext, String str, MergeLogger mergeLogger, MergeResource mergeResource, boolean z, List<Path> list, int i) {
        this.mergeContext = mergeContext;
        this.taskName = str;
        this.mergeLogger = mergeLogger;
        this.resource = mergeResource;
        this.fullMerge = z;
        this.unmergedSeries = list;
        this.concurrentMergeSeriesNum = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void mergeSeries() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("{} starts to merge {} series", this.taskName, Integer.valueOf(this.unmergedSeries.size()));
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<TsFileResource> it = this.resource.getSeqFiles().iterator();
        while (it.hasNext()) {
            this.mergeContext.getUnmergedChunkStartTimes().put(it.next(), new HashMap());
        }
        Iterator<List<Path>> it2 = MergeUtils.splitPathsByDevice(this.unmergedSeries).iterator();
        while (it2.hasNext()) {
            NaivePathSelector naivePathSelector = new NaivePathSelector(it2.next(), this.concurrentMergeSeriesNum);
            while (naivePathSelector.hasNext()) {
                this.currMergingPaths = naivePathSelector.next();
                mergePaths();
                this.mergedSeriesCnt += this.currMergingPaths.size();
                logMergeProgress();
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} all series are merged after {}ms", this.taskName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
        this.mergeLogger.logAllTsEnd();
    }

    private void logMergeProgress() {
        if (logger.isInfoEnabled()) {
            double size = (100 * this.mergedSeriesCnt) / this.unmergedSeries.size();
            if (size - this.progress >= 1.0d) {
                this.progress = size;
                logger.info("{} has merged {}% series", this.taskName, Double.valueOf(this.progress));
            }
        }
    }

    private void mergePaths() throws IOException {
        this.mergeLogger.logTSStart(this.currMergingPaths);
        IPointReader[] unseqReaders = this.resource.getUnseqReaders(this.currMergingPaths);
        this.currTimeValuePairs = new TimeValuePair[this.currMergingPaths.size()];
        for (int i = 0; i < this.currMergingPaths.size(); i++) {
            if (unseqReaders[i].hasNext()) {
                this.currTimeValuePairs[i] = unseqReaders[i].current();
            }
        }
        for (int i2 = 0; i2 < this.resource.getSeqFiles().size(); i2++) {
            pathsMergeOneFile(i2, unseqReaders);
        }
        this.mergeLogger.logTSEnd();
    }

    private void pathsMergeOneFile(int i, IPointReader[] iPointReaderArr) throws IOException {
        TsFileResource tsFileResource = this.resource.getSeqFiles().get(i);
        String device = this.currMergingPaths.get(0).getDevice();
        Long l = tsFileResource.getStartTimeMap().get(device);
        if (l == null) {
            return;
        }
        Iterator<Path> it = this.currMergingPaths.iterator();
        while (it.hasNext()) {
            this.mergeContext.getUnmergedChunkStartTimes().get(tsFileResource).put(it.next(), new ArrayList());
        }
        for (TimeValuePair timeValuePair : this.currTimeValuePairs) {
            if (timeValuePair != null && timeValuePair.getTimestamp() < l.longValue()) {
                l = Long.valueOf(timeValuePair.getTimestamp());
            }
        }
        boolean z = i + 1 == this.resource.getSeqFiles().size();
        TsFileSequenceReader fileReader = this.resource.getFileReader(tsFileResource);
        List[] listArr = new List[this.currMergingPaths.size()];
        List[] listArr2 = new List[this.currMergingPaths.size()];
        for (int i2 = 0; i2 < this.currMergingPaths.size(); i2++) {
            listArr[i2] = this.resource.getModifications(tsFileResource, this.currMergingPaths.get(i2));
            listArr2[i2] = this.resource.queryChunkMetadata(this.currMergingPaths.get(i2), tsFileResource);
            QueryUtils.modifyChunkMetaData(listArr2[i2], listArr[i2]);
        }
        if (filterNoDataPaths(listArr2, i).isEmpty()) {
            return;
        }
        RestorableTsFileIOWriter mergeFileWriter = this.resource.getMergeFileWriter(tsFileResource);
        Iterator<Path> it2 = this.currMergingPaths.iterator();
        while (it2.hasNext()) {
            mergeFileWriter.addSchema(this.resource.getSchema(it2.next().getMeasurement()));
        }
        mergeFileWriter.startChunkGroup(device);
        if (mergeChunks(listArr2, z, fileReader, iPointReaderArr, mergeFileWriter, tsFileResource)) {
            mergeFileWriter.endChunkGroup(0L);
            this.mergeLogger.logFilePosition(mergeFileWriter.getFile());
            tsFileResource.getStartTimeMap().put(device, l);
        }
    }

    private List<Integer> filterNoDataPaths(List[] listArr, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < this.currMergingPaths.size(); i2++) {
            if (!listArr[i2].isEmpty() || (i + 1 == this.resource.getSeqFiles().size() && this.currTimeValuePairs[i2] != null)) {
                arrayList.add(Integer.valueOf(i2));
            }
        }
        return arrayList;
    }

    private boolean mergeChunks(List<ChunkMetaData>[] listArr, boolean z, TsFileSequenceReader tsFileSequenceReader, IPointReader[] iPointReaderArr, RestorableTsFileIOWriter restorableTsFileIOWriter, TsFileResource tsFileResource) throws IOException {
        int[] iArr = new int[listArr.length];
        int mergeChunkSubThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
        PriorityQueue[] priorityQueueArr = new PriorityQueue[mergeChunkSubThreadNum];
        for (int i = 0; i < mergeChunkSubThreadNum; i++) {
            priorityQueueArr[i] = new PriorityQueue();
        }
        int i2 = 0;
        for (int i3 = 0; i3 < this.currMergingPaths.size(); i3++) {
            if (!listArr[i3].isEmpty()) {
                MergeUtils.MetaListEntry metaListEntry = new MergeUtils.MetaListEntry(i3, listArr[i3]);
                metaListEntry.next();
                priorityQueueArr[i2 % mergeChunkSubThreadNum].add(metaListEntry);
                i2++;
                iArr[i3] = 0;
            }
        }
        this.mergedChunkNum.set(0);
        this.unmergedChunkNum.set(0);
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < mergeChunkSubThreadNum; i4++) {
            int i5 = i4;
            arrayList.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
                mergeChunkHeap(priorityQueueArr[i5], iArr, tsFileSequenceReader, restorableTsFileIOWriter, iPointReaderArr, tsFileResource, z);
                return null;
            }));
        }
        for (int i6 = 0; i6 < mergeChunkSubThreadNum; i6++) {
            try {
                ((Future) arrayList.get(i6)).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new IOException(e);
            }
        }
        this.mergeContext.getMergedChunkCnt().compute(tsFileResource, (tsFileResource2, num) -> {
            return Integer.valueOf(num == null ? this.mergedChunkNum.get() : num.intValue() + this.mergedChunkNum.get());
        });
        this.mergeContext.getUnmergedChunkCnt().compute(tsFileResource, (tsFileResource3, num2) -> {
            return Integer.valueOf(num2 == null ? this.unmergedChunkNum.get() : num2.intValue() + this.unmergedChunkNum.get());
        });
        return this.mergedChunkNum.get() > 0;
    }

    private void mergeChunkHeap(PriorityQueue<MergeUtils.MetaListEntry> priorityQueue, int[] iArr, TsFileSequenceReader tsFileSequenceReader, RestorableTsFileIOWriter restorableTsFileIOWriter, IPointReader[] iPointReaderArr, TsFileResource tsFileResource, boolean z) throws IOException {
        Chunk readMemChunk;
        while (!priorityQueue.isEmpty()) {
            MergeUtils.MetaListEntry poll = priorityQueue.poll();
            ChunkMetaData current = poll.current();
            int pathId = poll.getPathId();
            boolean z2 = !poll.hasNext();
            IChunkWriter chunkWriter = this.resource.getChunkWriter(this.resource.getSchema(this.currMergingPaths.get(pathId).getMeasurement()));
            boolean isChunkOverflowed = MergeUtils.isChunkOverflowed(this.currTimeValuePairs[pathId], current);
            boolean isChunkTooSmall = MergeUtils.isChunkTooSmall(iArr[pathId], current, z2, minChunkPointNum);
            synchronized (tsFileSequenceReader) {
                readMemChunk = tsFileSequenceReader.readMemChunk(current);
            }
            iArr[pathId] = mergeChunkV2(current, isChunkOverflowed, isChunkTooSmall, readMemChunk, iArr[pathId], pathId, restorableTsFileIOWriter, iPointReaderArr[pathId], chunkWriter, tsFileResource);
            if (z2) {
                if (z && this.currTimeValuePairs[pathId] != null) {
                    iArr[pathId] = iArr[pathId] + writeRemainingUnseq(chunkWriter, iPointReaderArr[pathId], Long.MAX_VALUE, pathId);
                    this.mergedChunkNum.incrementAndGet();
                }
                if (iArr[pathId] > 0) {
                    synchronized (restorableTsFileIOWriter) {
                        chunkWriter.writeToFileWriter(restorableTsFileIOWriter);
                    }
                } else {
                    continue;
                }
            } else {
                poll.next();
                priorityQueue.add(poll);
            }
        }
    }

    private int mergeChunkV2(ChunkMetaData chunkMetaData, boolean z, boolean z2, Chunk chunk, int i, int i2, TsFileIOWriter tsFileIOWriter, IPointReader iPointReader, IChunkWriter iChunkWriter, TsFileResource tsFileResource) throws IOException {
        int writeChunkWithUnseq;
        boolean z3 = chunkMetaData.getDeletedAt() > Long.MIN_VALUE;
        if (!this.fullMerge && i == 0 && !z2 && !z && !z3) {
            this.unmergedChunkNum.incrementAndGet();
            this.mergeContext.getUnmergedChunkStartTimes().get(tsFileResource).get(this.currMergingPaths.get(i2)).add(Long.valueOf(chunkMetaData.getStartTime()));
            return 0;
        }
        if (this.fullMerge && i == 0 && !z2 && !z && !z3) {
            synchronized (tsFileIOWriter) {
                tsFileIOWriter.writeChunk(chunk, chunkMetaData);
            }
            this.mergeContext.incTotalPointWritten(chunkMetaData.getNumOfPoints());
            this.mergeContext.incTotalChunkWritten();
            this.mergedChunkNum.incrementAndGet();
            return 0;
        }
        if (z) {
            writeChunkWithUnseq = i + writeChunkWithUnseq(chunk, iChunkWriter, iPointReader, chunkMetaData.getEndTime(), i2);
            this.mergedChunkNum.incrementAndGet();
        } else {
            writeChunkWithUnseq = i + MergeUtils.writeChunkWithoutUnseq(chunk, iChunkWriter);
            this.mergedChunkNum.incrementAndGet();
        }
        this.mergeContext.incTotalPointWritten(writeChunkWithUnseq - i);
        if ((minChunkPointNum > 0 && writeChunkWithUnseq >= minChunkPointNum) || (writeChunkWithUnseq > 0 && minChunkPointNum < 0)) {
            synchronized (tsFileIOWriter) {
                iChunkWriter.writeToFileWriter(tsFileIOWriter);
            }
            writeChunkWithUnseq = 0;
        }
        return writeChunkWithUnseq;
    }

    private int writeRemainingUnseq(IChunkWriter iChunkWriter, IPointReader iPointReader, long j, int i) throws IOException {
        int i2 = 0;
        while (this.currTimeValuePairs[i] != null && this.currTimeValuePairs[i].getTimestamp() < j) {
            MergeUtils.writeTVPair(this.currTimeValuePairs[i], iChunkWriter);
            i2++;
            iPointReader.next();
            this.currTimeValuePairs[i] = iPointReader.hasNext() ? iPointReader.current() : null;
        }
        return i2;
    }

    private int writeChunkWithUnseq(Chunk chunk, IChunkWriter iChunkWriter, IPointReader iPointReader, long j, int i) throws IOException {
        int i2 = 0;
        ChunkReaderWithoutFilter chunkReaderWithoutFilter = new ChunkReaderWithoutFilter(chunk);
        while (chunkReaderWithoutFilter.hasNextBatch()) {
            i2 += mergeWriteBatch(chunkReaderWithoutFilter.nextBatch(), iChunkWriter, iPointReader, i);
        }
        return i2 + writeRemainingUnseq(iChunkWriter, iPointReader, j, i);
    }

    private int mergeWriteBatch(BatchData batchData, IChunkWriter iChunkWriter, IPointReader iPointReader, int i) throws IOException {
        int i2 = 0;
        for (int i3 = 0; i3 < batchData.length(); i3++) {
            long timeByIndex = batchData.getTimeByIndex(i3);
            boolean z = false;
            while (this.currTimeValuePairs[i] != null && this.currTimeValuePairs[i].getTimestamp() <= timeByIndex) {
                MergeUtils.writeTVPair(this.currTimeValuePairs[i], iChunkWriter);
                if (this.currTimeValuePairs[i].getTimestamp() == timeByIndex) {
                    z = true;
                }
                iPointReader.next();
                this.currTimeValuePairs[i] = iPointReader.hasNext() ? iPointReader.current() : null;
                i2++;
            }
            if (!z) {
                MergeUtils.writeBatchPoint(batchData, i3, iChunkWriter);
                i2++;
            }
        }
        return i2;
    }
}
