/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.engine.merge.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MergeMultiChunkTask {
    private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
    private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
    private MergeLogger mergeLogger;
    private List<Path> unmergedSeries;
    private String taskName;
    private MergeResource resource;
    private TimeValuePair[] currTimeValuePairs;
    private boolean fullMerge;
    private MergeContext mergeContext;
    private AtomicInteger mergedChunkNum = new AtomicInteger();
    private AtomicInteger unmergedChunkNum = new AtomicInteger();
    private int mergedSeriesCnt;
    private double progress;
    private int concurrentMergeSeriesNum;
    private List<Path> currMergingPaths = new ArrayList<Path>();

    MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, MergeResource mergeResource, boolean fullMerge, List<Path> unmergedSeries, int concurrentMergeSeriesNum) {
        this.mergeContext = context;
        this.taskName = taskName;
        this.mergeLogger = mergeLogger;
        this.resource = mergeResource;
        this.fullMerge = fullMerge;
        this.unmergedSeries = unmergedSeries;
        this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
    }

    void mergeSeries() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("{} starts to merge {} series", (Object)this.taskName, (Object)this.unmergedSeries.size());
        }
        long startTime = System.currentTimeMillis();
        for (TsFileResource seqFile : this.resource.getSeqFiles()) {
            this.mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap());
        }
        List<List<Path>> devicePaths = MergeUtils.splitPathsByDevice(this.unmergedSeries);
        for (List<Path> pathList : devicePaths) {
            NaivePathSelector pathSelector = new NaivePathSelector(pathList, this.concurrentMergeSeriesNum);
            while (pathSelector.hasNext()) {
                this.currMergingPaths = (List)pathSelector.next();
                this.mergePaths();
                this.mergedSeriesCnt += this.currMergingPaths.size();
                this.logMergeProgress();
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} all series are merged after {}ms", (Object)this.taskName, (Object)(System.currentTimeMillis() - startTime));
        }
        this.mergeLogger.logAllTsEnd();
    }

    private void logMergeProgress() {
        double newProgress;
        if (logger.isInfoEnabled() && (newProgress = (double)(100 * this.mergedSeriesCnt) / (double)this.unmergedSeries.size()) - this.progress >= 1.0) {
            this.progress = newProgress;
            logger.info("{} has merged {}% series", (Object)this.taskName, (Object)this.progress);
        }
    }

    private void mergePaths() throws IOException {
        int i;
        this.mergeLogger.logTSStart(this.currMergingPaths);
        IPointReader[] unseqReaders = this.resource.getUnseqReaders(this.currMergingPaths);
        this.currTimeValuePairs = new TimeValuePair[this.currMergingPaths.size()];
        for (i = 0; i < this.currMergingPaths.size(); ++i) {
            if (!unseqReaders[i].hasNext()) continue;
            this.currTimeValuePairs[i] = unseqReaders[i].current();
        }
        for (i = 0; i < this.resource.getSeqFiles().size(); ++i) {
            this.pathsMergeOneFile(i, unseqReaders);
        }
        this.mergeLogger.logTSEnd();
    }

    private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException {
        TsFileResource currTsFile = this.resource.getSeqFiles().get(seqFileIdx);
        String deviceId = this.currMergingPaths.get(0).getDevice();
        Long currDeviceMinTime = currTsFile.getStartTimeMap().get(deviceId);
        if (currDeviceMinTime == null) {
            return;
        }
        for (Path path : this.currMergingPaths) {
            this.mergeContext.getUnmergedChunkStartTimes().get(currTsFile).put(path, new ArrayList());
        }
        for (TimeValuePair timeValuePair : this.currTimeValuePairs) {
            if (timeValuePair == null || timeValuePair.getTimestamp() >= currDeviceMinTime) continue;
            currDeviceMinTime = timeValuePair.getTimestamp();
        }
        boolean isLastFile = seqFileIdx + 1 == this.resource.getSeqFiles().size();
        TsFileSequenceReader fileSequenceReader = this.resource.getFileReader(currTsFile);
        List[] modifications = new List[this.currMergingPaths.size()];
        List[] seqChunkMeta = new List[this.currMergingPaths.size()];
        for (int i = 0; i < this.currMergingPaths.size(); ++i) {
            modifications[i] = this.resource.getModifications(currTsFile, this.currMergingPaths.get(i));
            seqChunkMeta[i] = this.resource.queryChunkMetadata(this.currMergingPaths.get(i), currTsFile);
            QueryUtils.modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
        }
        List<Integer> unskippedPathIndices = this.filterNoDataPaths(seqChunkMeta, seqFileIdx);
        if (unskippedPathIndices.isEmpty()) {
            return;
        }
        RestorableTsFileIOWriter mergeFileWriter = this.resource.getMergeFileWriter(currTsFile);
        for (Path path : this.currMergingPaths) {
            MeasurementSchema schema = this.resource.getSchema(path.getMeasurement());
            mergeFileWriter.addSchema(schema);
        }
        mergeFileWriter.startChunkGroup(deviceId);
        boolean dataWritten = this.mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders, mergeFileWriter, currTsFile);
        if (dataWritten) {
            mergeFileWriter.endChunkGroup(0L);
            this.mergeLogger.logFilePosition(mergeFileWriter.getFile());
            currTsFile.getStartTimeMap().put(deviceId, currDeviceMinTime);
        }
    }

    private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) {
        ArrayList<Integer> ret = new ArrayList<Integer>();
        for (int i = 0; i < this.currMergingPaths.size(); ++i) {
            if (seqChunkMeta[i].isEmpty() && (seqFileIdx + 1 != this.resource.getSeqFiles().size() || this.currTimeValuePairs[i] == null)) continue;
            ret.add(i);
        }
        return ret;
    }

    private boolean mergeChunks(List<ChunkMetaData>[] seqChunkMeta, boolean isLastFile, TsFileSequenceReader reader, IPointReader[] unseqReaders, RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile) throws IOException {
        int[] ptWrittens = new int[seqChunkMeta.length];
        int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
        PriorityQueue[] chunkMetaHeaps = new PriorityQueue[mergeChunkSubTaskNum];
        for (int i = 0; i < mergeChunkSubTaskNum; ++i) {
            chunkMetaHeaps[i] = new PriorityQueue();
        }
        int idx = 0;
        for (int i = 0; i < this.currMergingPaths.size(); ++i) {
            if (seqChunkMeta[i].isEmpty()) continue;
            MergeUtils.MetaListEntry entry = new MergeUtils.MetaListEntry(i, seqChunkMeta[i]);
            entry.next();
            chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry);
            ++idx;
            ptWrittens[i] = 0;
        }
        this.mergedChunkNum.set(0);
        this.unmergedChunkNum.set(0);
        ArrayList<Future> futures = new ArrayList<Future>();
        int i = 0;
        while (i < mergeChunkSubTaskNum) {
            int finalI = i++;
            futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
                this.mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader, mergeFileWriter, unseqReaders, currFile, isLastFile);
                return null;
            }));
        }
        for (i = 0; i < mergeChunkSubTaskNum; ++i) {
            try {
                ((Future)futures.get(i)).get();
                continue;
            }
            catch (InterruptedException | ExecutionException e) {
                throw new IOException(e);
            }
        }
        this.mergeContext.getMergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ? this.mergedChunkNum.get() : anInt + this.mergedChunkNum.get());
        this.mergeContext.getUnmergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ? this.unmergedChunkNum.get() : anInt + this.unmergedChunkNum.get());
        return this.mergedChunkNum.get() > 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeChunkHeap(PriorityQueue<MergeUtils.MetaListEntry> chunkMetaHeap, int[] ptWrittens, TsFileSequenceReader reader, RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile) throws IOException {
        while (!chunkMetaHeap.isEmpty()) {
            Chunk chunk;
            MergeUtils.MetaListEntry metaListEntry = chunkMetaHeap.poll();
            ChunkMetaData currMeta = metaListEntry.current();
            int pathIdx = metaListEntry.getPathId();
            boolean isLastChunk = !metaListEntry.hasNext();
            Path path = this.currMergingPaths.get(pathIdx);
            MeasurementSchema measurementSchema = this.resource.getSchema(path.getMeasurement());
            IChunkWriter chunkWriter = this.resource.getChunkWriter(measurementSchema);
            boolean chunkOverflowed = MergeUtils.isChunkOverflowed(this.currTimeValuePairs[pathIdx], currMeta);
            boolean chunkTooSmall = MergeUtils.isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
            TsFileSequenceReader tsFileSequenceReader = reader;
            synchronized (tsFileSequenceReader) {
                chunk = reader.readMemChunk(currMeta);
            }
            ptWrittens[pathIdx] = this.mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, ptWrittens[pathIdx], pathIdx, (TsFileIOWriter)mergeFileWriter, unseqReaders[pathIdx], chunkWriter, currFile);
            if (!isLastChunk) {
                metaListEntry.next();
                chunkMetaHeap.add(metaListEntry);
                continue;
            }
            if (isLastFile && this.currTimeValuePairs[pathIdx] != null) {
                int n = pathIdx;
                ptWrittens[n] = ptWrittens[n] + this.writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx);
                this.mergedChunkNum.incrementAndGet();
            }
            if (ptWrittens[pathIdx] <= 0) continue;
            tsFileSequenceReader = mergeFileWriter;
            synchronized (tsFileSequenceReader) {
                chunkWriter.writeToFileWriter((TsFileIOWriter)mergeFileWriter);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int mergeChunkV2(ChunkMetaData currMeta, boolean chunkOverflowed, boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx, TsFileIOWriter mergeFileWriter, IPointReader unseqReader, IChunkWriter chunkWriter, TsFileResource currFile) throws IOException {
        boolean chunkModified;
        int unclosedChunkPoint = lastUnclosedChunkPoint;
        boolean bl = chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
        if (!(this.fullMerge || lastUnclosedChunkPoint != 0 || chunkTooSmall || chunkOverflowed || chunkModified)) {
            this.unmergedChunkNum.incrementAndGet();
            this.mergeContext.getUnmergedChunkStartTimes().get(currFile).get(this.currMergingPaths.get(pathIdx)).add(currMeta.getStartTime());
            return 0;
        }
        if (this.fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed && !chunkModified) {
            TsFileIOWriter tsFileIOWriter = mergeFileWriter;
            synchronized (tsFileIOWriter) {
                mergeFileWriter.writeChunk(chunk, currMeta);
            }
            this.mergeContext.incTotalPointWritten(currMeta.getNumOfPoints());
            this.mergeContext.incTotalChunkWritten();
            this.mergedChunkNum.incrementAndGet();
            return 0;
        }
        if (!chunkOverflowed) {
            unclosedChunkPoint += MergeUtils.writeChunkWithoutUnseq(chunk, chunkWriter);
            this.mergedChunkNum.incrementAndGet();
        } else {
            unclosedChunkPoint += this.writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx);
            this.mergedChunkNum.incrementAndGet();
        }
        this.mergeContext.incTotalPointWritten(unclosedChunkPoint - lastUnclosedChunkPoint);
        if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum || unclosedChunkPoint > 0 && minChunkPointNum < 0) {
            TsFileIOWriter tsFileIOWriter = mergeFileWriter;
            synchronized (tsFileIOWriter) {
                chunkWriter.writeToFileWriter(mergeFileWriter);
            }
            unclosedChunkPoint = 0;
        }
        return unclosedChunkPoint;
    }

    private int writeRemainingUnseq(IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException {
        int ptWritten = 0;
        while (this.currTimeValuePairs[pathIdx] != null && this.currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
            MergeUtils.writeTVPair(this.currTimeValuePairs[pathIdx], chunkWriter);
            ++ptWritten;
            unseqReader.next();
            this.currTimeValuePairs[pathIdx] = unseqReader.hasNext() ? unseqReader.current() : null;
        }
        return ptWritten;
    }

    private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader, long chunkLimitTime, int pathIdx) throws IOException {
        int cnt = 0;
        ChunkReaderWithoutFilter chunkReader = new ChunkReaderWithoutFilter(chunk);
        while (chunkReader.hasNextBatch()) {
            BatchData batchData = chunkReader.nextBatch();
            cnt += this.mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
        }
        return cnt += this.writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
    }

    private int mergeWriteBatch(BatchData batchData, IChunkWriter chunkWriter, IPointReader unseqReader, int pathIdx) throws IOException {
        int cnt = 0;
        for (int i = 0; i < batchData.length(); ++i) {
            long time = batchData.getTimeByIndex(i);
            boolean overwriteSeqPoint = false;
            while (this.currTimeValuePairs[pathIdx] != null && this.currTimeValuePairs[pathIdx].getTimestamp() <= time) {
                MergeUtils.writeTVPair(this.currTimeValuePairs[pathIdx], chunkWriter);
                if (this.currTimeValuePairs[pathIdx].getTimestamp() == time) {
                    overwriteSeqPoint = true;
                }
                unseqReader.next();
                this.currTimeValuePairs[pathIdx] = unseqReader.hasNext() ? unseqReader.current() : null;
                ++cnt;
            }
            if (overwriteSeqPoint) continue;
            MergeUtils.writeBatchPoint(batchData, i, chunkWriter);
            ++cnt;
        }
        return cnt;
    }
}

