/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.engine.merge.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeMultiChunkTask {
    private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
    private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
    private MergeLogger mergeLogger;
    private List<PartialPath> unmergedSeries;
    private String taskName;
    private MergeResource resource;
    private TimeValuePair[] currTimeValuePairs;
    private boolean fullMerge;
    private MergeContext mergeContext;
    private AtomicInteger mergedChunkNum = new AtomicInteger();
    private AtomicInteger unmergedChunkNum = new AtomicInteger();
    private int mergedSeriesCnt;
    private double progress;
    private int concurrentMergeSeriesNum;
    private List<PartialPath> currMergingPaths = new ArrayList<PartialPath>();
    private String storageGroupName;

    public MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, MergeResource mergeResource, boolean fullMerge, List<PartialPath> unmergedSeries, int concurrentMergeSeriesNum, String storageGroupName) {
        this.mergeContext = context;
        this.taskName = taskName;
        this.mergeLogger = mergeLogger;
        this.resource = mergeResource;
        this.fullMerge = fullMerge;
        this.unmergedSeries = unmergedSeries;
        this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
        this.storageGroupName = storageGroupName;
    }

    void mergeSeries() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("{} starts to merge {} series", (Object)this.taskName, (Object)this.unmergedSeries.size());
        }
        long startTime = System.currentTimeMillis();
        for (TsFileResource seqFile : this.resource.getSeqFiles()) {
            this.mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap());
        }
        List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(this.unmergedSeries);
        for (List<PartialPath> pathList : devicePaths) {
            NaivePathSelector pathSelector = new NaivePathSelector(pathList, this.concurrentMergeSeriesNum);
            while (pathSelector.hasNext()) {
                this.currMergingPaths = (List)pathSelector.next();
                this.mergePaths();
                this.resource.clearChunkWriterCache();
                if (Thread.interrupted()) {
                    logger.info("MergeMultiChunkTask {} aborted", (Object)this.taskName);
                    Thread.currentThread().interrupt();
                    return;
                }
                this.mergedSeriesCnt += this.currMergingPaths.size();
                this.logMergeProgress();
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} all series are merged after {}ms", (Object)this.taskName, (Object)(System.currentTimeMillis() - startTime));
        }
        this.mergeLogger.logAllTsEnd();
    }

    private void logMergeProgress() {
        double newProgress;
        if (logger.isInfoEnabled() && (newProgress = (double)(100 * this.mergedSeriesCnt) / (double)this.unmergedSeries.size()) - this.progress >= 1.0) {
            this.progress = newProgress;
            logger.info("{} has merged {}% series", (Object)this.taskName, (Object)this.progress);
        }
    }

    public String getProgress() {
        return String.format("Processed %d/%d series", this.mergedSeriesCnt, this.unmergedSeries.size());
    }

    private void mergePaths() throws IOException {
        int i;
        this.mergeLogger.logTSStart(this.currMergingPaths);
        IPointReader[] unseqReaders = this.resource.getUnseqReaders(this.currMergingPaths);
        this.currTimeValuePairs = new TimeValuePair[this.currMergingPaths.size()];
        for (i = 0; i < this.currMergingPaths.size(); ++i) {
            if (!unseqReaders[i].hasNextTimeValuePair()) continue;
            this.currTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair();
        }
        for (i = 0; i < this.resource.getSeqFiles().size(); ++i) {
            this.pathsMergeOneFile(i, unseqReaders);
            if (!Thread.interrupted()) continue;
            Thread.currentThread().interrupt();
            return;
        }
        this.mergeLogger.logTSEnd();
    }

    private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException {
        String deviceId;
        TsFileResource currTsFile = this.resource.getSeqFiles().get(seqFileIdx);
        long currDeviceMinTime = currTsFile.getStartTime(deviceId = this.currMergingPaths.get(0).getDevice());
        if (currDeviceMinTime == Long.MAX_VALUE) {
            return;
        }
        for (PartialPath path : this.currMergingPaths) {
            this.mergeContext.getUnmergedChunkStartTimes().get(currTsFile).put(path, new ArrayList());
        }
        for (TimeValuePair timeValuePair : this.currTimeValuePairs) {
            if (timeValuePair == null || timeValuePair.getTimestamp() >= currDeviceMinTime) continue;
            currDeviceMinTime = timeValuePair.getTimestamp();
        }
        boolean isLastFile = seqFileIdx + 1 == this.resource.getSeqFiles().size();
        TsFileSequenceReader fileSequenceReader = this.resource.getFileReader(currTsFile);
        List[] modifications = new List[this.currMergingPaths.size()];
        List[] seqChunkMeta = new List[this.currMergingPaths.size()];
        for (int i = 0; i < this.currMergingPaths.size(); ++i) {
            modifications[i] = this.resource.getModifications(currTsFile, this.currMergingPaths.get(i));
            seqChunkMeta[i] = this.resource.queryChunkMetadata(this.currMergingPaths.get(i), currTsFile);
            QueryUtils.modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
            if (!Thread.interrupted()) continue;
            Thread.currentThread().interrupt();
            return;
        }
        List<Integer> unskippedPathIndices = this.filterNoDataPaths(seqChunkMeta, seqFileIdx);
        if (unskippedPathIndices.isEmpty()) {
            return;
        }
        RestorableTsFileIOWriter mergeFileWriter = this.resource.getMergeFileWriter(currTsFile);
        for (PartialPath path : this.currMergingPaths) {
            MeasurementSchema schema = this.resource.getSchema(path);
            mergeFileWriter.addSchema((Path)path, schema);
        }
        mergeFileWriter.startChunkGroup(deviceId);
        boolean dataWritten = this.mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders, mergeFileWriter, currTsFile);
        if (dataWritten) {
            mergeFileWriter.writeVersion(0L);
            mergeFileWriter.endChunkGroup();
            this.mergeLogger.logFilePosition(mergeFileWriter.getFile());
            currTsFile.putStartTime(deviceId, currDeviceMinTime);
        }
    }

    private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) {
        ArrayList<Integer> ret = new ArrayList<Integer>();
        for (int i = 0; i < this.currMergingPaths.size(); ++i) {
            if (seqChunkMeta[i].isEmpty() && (seqFileIdx + 1 != this.resource.getSeqFiles().size() || this.currTimeValuePairs[i] == null)) continue;
            ret.add(i);
        }
        return ret;
    }

    private boolean mergeChunks(List<ChunkMetadata>[] seqChunkMeta, boolean isLastFile, TsFileSequenceReader reader, IPointReader[] unseqReaders, RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile) throws IOException {
        int i;
        int[] ptWrittens = new int[seqChunkMeta.length];
        int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
        MergeUtils.MetaListEntry[] metaListEntries = new MergeUtils.MetaListEntry[this.currMergingPaths.size()];
        PriorityQueue[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum];
        for (int i2 = 0; i2 < mergeChunkSubTaskNum; ++i2) {
            chunkIdxHeaps[i2] = new PriorityQueue();
        }
        int idx = 0;
        for (int i3 = 0; i3 < this.currMergingPaths.size(); ++i3) {
            chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i3);
            if (seqChunkMeta[i3].isEmpty()) continue;
            MergeUtils.MetaListEntry entry = new MergeUtils.MetaListEntry(i3, seqChunkMeta[i3]);
            entry.next();
            metaListEntries[i3] = entry;
            ++idx;
            ptWrittens[i3] = 0;
        }
        this.mergedChunkNum.set(0);
        this.unmergedChunkNum.set(0);
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (i = 0; i < mergeChunkSubTaskNum; ++i) {
            futures.add(MergeManager.getINSTANCE().submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i], metaListEntries, ptWrittens, reader, mergeFileWriter, unseqReaders, currFile, isLastFile, i)));
            if (!Thread.interrupted()) continue;
            Thread.currentThread().interrupt();
            return false;
        }
        for (i = 0; i < mergeChunkSubTaskNum; ++i) {
            try {
                ((Future)futures.get(i)).get();
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (ExecutionException e) {
                throw new IOException(e);
            }
        }
        this.mergeContext.getMergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ? this.mergedChunkNum.get() : anInt + this.mergedChunkNum.get());
        this.mergeContext.getUnmergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ? this.unmergedChunkNum.get() : anInt + this.unmergedChunkNum.get());
        return this.mergedChunkNum.get() > 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed, boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx, TsFileIOWriter mergeFileWriter, IPointReader unseqReader, IChunkWriter chunkWriter, TsFileResource currFile) throws IOException {
        boolean chunkModified;
        int unclosedChunkPoint = lastUnclosedChunkPoint;
        boolean bl = chunkModified = currMeta.getDeleteIntervalList() != null && !currMeta.getDeleteIntervalList().isEmpty();
        if (!(this.fullMerge || lastUnclosedChunkPoint != 0 || chunkTooSmall || chunkOverflowed || chunkModified)) {
            this.unmergedChunkNum.incrementAndGet();
            this.mergeContext.getUnmergedChunkStartTimes().get(currFile).get(this.currMergingPaths.get(pathIdx)).add(currMeta.getStartTime());
            return 0;
        }
        if (this.fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed && !chunkModified) {
            TsFileIOWriter tsFileIOWriter = mergeFileWriter;
            synchronized (tsFileIOWriter) {
                mergeFileWriter.writeChunk(chunk, currMeta);
            }
            this.mergeContext.incTotalPointWritten(currMeta.getNumOfPoints());
            this.mergeContext.incTotalChunkWritten();
            this.mergedChunkNum.incrementAndGet();
            return 0;
        }
        if (!chunkOverflowed) {
            unclosedChunkPoint += MergeUtils.writeChunkWithoutUnseq(chunk, chunkWriter);
            this.mergedChunkNum.incrementAndGet();
        } else {
            unclosedChunkPoint += this.writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx);
            this.mergedChunkNum.incrementAndGet();
        }
        this.mergeContext.incTotalPointWritten((long)unclosedChunkPoint - (long)lastUnclosedChunkPoint);
        if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum || unclosedChunkPoint > 0 && minChunkPointNum < 0) {
            TsFileIOWriter tsFileIOWriter = mergeFileWriter;
            synchronized (tsFileIOWriter) {
                chunkWriter.writeToFileWriter(mergeFileWriter);
            }
            unclosedChunkPoint = 0;
        }
        return unclosedChunkPoint;
    }

    private int writeRemainingUnseq(IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException {
        int ptWritten = 0;
        while (this.currTimeValuePairs[pathIdx] != null && this.currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
            MergeUtils.writeTVPair(this.currTimeValuePairs[pathIdx], chunkWriter);
            ++ptWritten;
            unseqReader.nextTimeValuePair();
            this.currTimeValuePairs[pathIdx] = unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null;
        }
        return ptWritten;
    }

    private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader, long chunkLimitTime, int pathIdx) throws IOException {
        int cnt = 0;
        ChunkReader chunkReader = new ChunkReader(chunk, null);
        while (chunkReader.hasNextSatisfiedPage()) {
            BatchData batchData = chunkReader.nextPageData();
            cnt += this.mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
        }
        return cnt += this.writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
    }

    private int mergeWriteBatch(BatchData batchData, IChunkWriter chunkWriter, IPointReader unseqReader, int pathIdx) throws IOException {
        int cnt = 0;
        for (int i = 0; i < batchData.length(); ++i) {
            long time = batchData.getTimeByIndex(i);
            boolean overwriteSeqPoint = false;
            while (this.currTimeValuePairs[pathIdx] != null && this.currTimeValuePairs[pathIdx].getTimestamp() <= time) {
                MergeUtils.writeTVPair(this.currTimeValuePairs[pathIdx], chunkWriter);
                if (this.currTimeValuePairs[pathIdx].getTimestamp() == time) {
                    overwriteSeqPoint = true;
                }
                unseqReader.nextTimeValuePair();
                this.currTimeValuePairs[pathIdx] = unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null;
                ++cnt;
            }
            if (overwriteSeqPoint) continue;
            MergeUtils.writeBatchPoint(batchData, i, chunkWriter);
            ++cnt;
        }
        return cnt;
    }

    public class MergeChunkHeapTask
    implements Callable<Void> {
        private PriorityQueue<Integer> chunkIdxHeap;
        private MergeUtils.MetaListEntry[] metaListEntries;
        private int[] ptWrittens;
        private TsFileSequenceReader reader;
        private RestorableTsFileIOWriter mergeFileWriter;
        private IPointReader[] unseqReaders;
        private TsFileResource currFile;
        private boolean isLastFile;
        private int taskNum;
        private int totalSeriesNum;

        public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap, MergeUtils.MetaListEntry[] metaListEntries, int[] ptWrittens, TsFileSequenceReader reader, RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile, int taskNum) {
            this.chunkIdxHeap = chunkIdxHeap;
            this.metaListEntries = metaListEntries;
            this.ptWrittens = ptWrittens;
            this.reader = reader;
            this.mergeFileWriter = mergeFileWriter;
            this.unseqReaders = unseqReaders;
            this.currFile = currFile;
            this.isLastFile = isLastFile;
            this.taskNum = taskNum;
            this.totalSeriesNum = chunkIdxHeap.size();
        }

        @Override
        public Void call() throws Exception {
            this.mergeChunkHeap();
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void mergeChunkHeap() throws IOException {
            while (!this.chunkIdxHeap.isEmpty()) {
                int pathIdx = this.chunkIdxHeap.poll();
                PartialPath path = (PartialPath)MergeMultiChunkTask.this.currMergingPaths.get(pathIdx);
                MeasurementSchema measurementSchema = MergeMultiChunkTask.this.resource.getSchema(path);
                IChunkWriter chunkWriter = MergeMultiChunkTask.this.resource.getChunkWriter(measurementSchema);
                if (Thread.interrupted()) {
                    Thread.currentThread().interrupt();
                    return;
                }
                if (this.metaListEntries[pathIdx] != null) {
                    Chunk chunk;
                    MergeUtils.MetaListEntry metaListEntry = this.metaListEntries[pathIdx];
                    ChunkMetadata currMeta = metaListEntry.current();
                    boolean isLastChunk = !metaListEntry.hasNext();
                    boolean chunkOverflowed = MergeUtils.isChunkOverflowed(MergeMultiChunkTask.this.currTimeValuePairs[pathIdx], currMeta);
                    boolean chunkTooSmall = MergeUtils.isChunkTooSmall(this.ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
                    TsFileSequenceReader tsFileSequenceReader = this.reader;
                    synchronized (tsFileSequenceReader) {
                        chunk = this.reader.readMemChunk(currMeta);
                    }
                    this.ptWrittens[pathIdx] = MergeMultiChunkTask.this.mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, this.ptWrittens[pathIdx], pathIdx, (TsFileIOWriter)this.mergeFileWriter, this.unseqReaders[pathIdx], chunkWriter, this.currFile);
                    if (!isLastChunk) {
                        metaListEntry.next();
                        this.chunkIdxHeap.add(pathIdx);
                        continue;
                    }
                }
                if (this.isLastFile && MergeMultiChunkTask.this.currTimeValuePairs[pathIdx] != null) {
                    int n = pathIdx;
                    this.ptWrittens[n] = this.ptWrittens[n] + MergeMultiChunkTask.this.writeRemainingUnseq(chunkWriter, this.unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx);
                    MergeMultiChunkTask.this.mergedChunkNum.incrementAndGet();
                }
                if (this.ptWrittens[pathIdx] <= 0) continue;
                RestorableTsFileIOWriter restorableTsFileIOWriter = this.mergeFileWriter;
                synchronized (restorableTsFileIOWriter) {
                    chunkWriter.writeToFileWriter((TsFileIOWriter)this.mergeFileWriter);
                }
            }
        }

        public String getStorageGroupName() {
            return MergeMultiChunkTask.this.storageGroupName;
        }

        public String getTaskName() {
            return MergeMultiChunkTask.this.taskName + "_" + this.taskNum;
        }

        public String getProgress() {
            return String.format("Processed %d/%d series", this.totalSeriesNum - this.chunkIdxHeap.size(), this.totalSeriesNum);
        }
    }
}

