package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.class */
public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
    private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class);
    private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT = 0.25f;
    private final TaskAttemptID reduceId;
    private final JobConf jobConf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    protected MapOutputFile mapOutputFile;
    private MergeManagerImpl<K, V>.IntermediateMemoryToMemoryMerger memToMemMerger;
    private final MergeThread<InMemoryMapOutput<K, V>, K, V> inMemoryMerger;
    private final MergeManagerImpl<K, V>.OnDiskMerger onDiskMerger;

    @VisibleForTesting
    final long memoryLimit;
    private long usedMemory;
    private long commitMemory;

    @VisibleForTesting
    final long maxSingleShuffleLimit;
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final int ioSortFactor;
    private final Reporter reporter;
    private final ExceptionReporter exceptionReporter;
    private final Class<? extends Reducer> combinerClass;
    private final Task.CombineOutputCollector<K, V> combineCollector;
    private final Counters.Counter spilledRecordsCounter;
    private final Counters.Counter reduceCombineInputCounter;
    private final Counters.Counter mergedMapOutputsCounter;
    private final CompressionCodec codec;
    private final Progress mergePhase;
    Set<InMemoryMapOutput<K, V>> inMemoryMergedMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());
    Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());
    Set<CompressAwarePath> onDiskMapOutputs = new TreeSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl$CompressAwarePath.class */
    public static class CompressAwarePath extends Path {
        private long rawDataLength;
        private long compressedSize;

        public CompressAwarePath(Path path, long j, long j2) {
            super(path.toUri());
            this.rawDataLength = j;
            this.compressedSize = j2;
        }

        public long getRawDataLength() {
            return this.rawDataLength;
        }

        public long getCompressedSize() {
            return this.compressedSize;
        }

        @Override // org.apache.hadoop.fs.Path
        public boolean equals(Object obj) {
            return super.equals(obj);
        }

        @Override // org.apache.hadoop.fs.Path
        public int hashCode() {
            return super.hashCode();
        }

        @Override // org.apache.hadoop.fs.Path, java.lang.Comparable
        public int compareTo(Object obj) {
            if (obj instanceof CompressAwarePath) {
                CompressAwarePath compressAwarePath = (CompressAwarePath) obj;
                if (this.compressedSize < compressAwarePath.getCompressedSize()) {
                    return -1;
                }
                if (getCompressedSize() > compressAwarePath.getCompressedSize()) {
                    return 1;
                }
            }
            return super.compareTo(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl$InMemoryMerger.class */
    public class InMemoryMerger extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
        public InMemoryMerger(MergeManagerImpl<K, V> mergeManagerImpl) {
            super(mergeManagerImpl, Integer.MAX_VALUE, MergeManagerImpl.this.exceptionReporter);
            setName("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
            setDaemon(true);
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeThread
        public void merge(List<InMemoryMapOutput<K, V>> list) throws IOException {
            if (list == null || list.size() == 0) {
                return;
            }
            TaskID taskID = list.get(0).getMapId().getTaskID();
            ArrayList arrayList = new ArrayList();
            long createInMemorySegments = MergeManagerImpl.this.createInMemorySegments(list, arrayList, 0L);
            int size = arrayList.size();
            Path suffix = MergeManagerImpl.this.mapOutputFile.getInputFileForWrite(taskID, createInMemorySegments).suffix(Task.MERGED_OUTPUT_PREFIX);
            IFile.Writer<K, V> writer = new IFile.Writer<>(MergeManagerImpl.this.jobConf, CryptoUtils.wrapIfNecessary(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs.create(suffix)), MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, null, true);
            try {
                MergeManagerImpl.LOG.info("Initiating in-memory merge with " + size + " segments...");
                RawKeyValueIterator merge = Merger.merge(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), arrayList, arrayList.size(), new Path(MergeManagerImpl.this.reduceId.toString()), MergeManagerImpl.this.jobConf.getOutputKeyComparator(), MergeManagerImpl.this.reporter, MergeManagerImpl.this.spilledRecordsCounter, null, null);
                if (null == MergeManagerImpl.this.combinerClass) {
                    Merger.writeFile(merge, writer, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
                } else {
                    MergeManagerImpl.this.combineCollector.setWriter(writer);
                    MergeManagerImpl.this.combineAndSpill(merge, MergeManagerImpl.this.reduceCombineInputCounter);
                }
                writer.close();
                CompressAwarePath compressAwarePath = new CompressAwarePath(suffix, writer.getRawLength(), writer.getCompressedLength());
                MergeManagerImpl.LOG.info(MergeManagerImpl.this.reduceId + " Merge of the " + size + " files in-memory complete. Local file is " + suffix + " of size " + MergeManagerImpl.this.localFS.getFileStatus(suffix).getLen());
                MergeManagerImpl.this.closeOnDiskFile(compressAwarePath);
            } catch (IOException e) {
                MergeManagerImpl.this.localFS.delete(suffix, true);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl$IntermediateMemoryToMemoryMerger.class */
    public class IntermediateMemoryToMemoryMerger extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
        public IntermediateMemoryToMemoryMerger(MergeManagerImpl<K, V> mergeManagerImpl, int i) {
            super(mergeManagerImpl, i, MergeManagerImpl.this.exceptionReporter);
            setName("InMemoryMerger - Thread to do in-memory merge of in-memory shuffled map-outputs");
            setDaemon(true);
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeThread
        public void merge(List<InMemoryMapOutput<K, V>> list) throws IOException {
            if (list == null || list.size() == 0) {
                return;
            }
            TaskAttemptID mapId = list.get(0).getMapId();
            ArrayList arrayList = new ArrayList();
            long createInMemorySegments = MergeManagerImpl.this.createInMemorySegments(list, arrayList, 0L);
            int size = arrayList.size();
            InMemoryMapOutput<K, V> unconditionalReserve = MergeManagerImpl.this.unconditionalReserve(mapId, createInMemorySegments, false);
            InMemoryWriter inMemoryWriter = new InMemoryWriter(unconditionalReserve.getArrayStream());
            MergeManagerImpl.LOG.info("Initiating Memory-to-Memory merge with " + size + " segments of total-size: " + createInMemorySegments);
            Merger.writeFile(Merger.merge(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), arrayList, arrayList.size(), new Path(MergeManagerImpl.this.reduceId.toString()), MergeManagerImpl.this.jobConf.getOutputKeyComparator(), MergeManagerImpl.this.reporter, null, null, null), inMemoryWriter, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
            inMemoryWriter.close();
            MergeManagerImpl.LOG.info(MergeManagerImpl.this.reduceId + " Memory-to-Memory merge of the " + size + " files in-memory complete.");
            MergeManagerImpl.this.closeInMemoryMergedFile(unconditionalReserve);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl$OnDiskMerger.class */
    public class OnDiskMerger extends MergeThread<CompressAwarePath, K, V> {
        public OnDiskMerger(MergeManagerImpl<K, V> mergeManagerImpl) {
            super(mergeManagerImpl, MergeManagerImpl.this.ioSortFactor, MergeManagerImpl.this.exceptionReporter);
            setName("OnDiskMerger - Thread to merge on-disk map-outputs");
            setDaemon(true);
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeThread
        public void merge(List<CompressAwarePath> list) throws IOException {
            if (list == null || list.isEmpty()) {
                MergeManagerImpl.LOG.info("No ondisk files to merge...");
                return;
            }
            long j = 0;
            int i = MergeManagerImpl.this.jobConf.getInt("io.bytes.per.checksum", 512);
            MergeManagerImpl.LOG.info("OnDiskMerger: We have  " + list.size() + " map outputs on disk. Triggering merge...");
            Iterator<CompressAwarePath> it = list.iterator();
            while (it.hasNext()) {
                j += MergeManagerImpl.this.localFS.getFileStatus(it.next()).getLen();
            }
            long checksumLength = j + ChecksumFileSystem.getChecksumLength(j, i);
            Path suffix = MergeManagerImpl.this.localDirAllocator.getLocalPathForWrite(list.get(0).toString(), checksumLength, MergeManagerImpl.this.jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = new IFile.Writer(MergeManagerImpl.this.jobConf, CryptoUtils.wrapIfNecessary(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs.create(suffix)), MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, null, true);
            try {
                Merger.writeFile(Merger.merge((Configuration) MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, (Class) MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), (Class) MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, (Path[]) list.toArray(new Path[list.size()]), true, MergeManagerImpl.this.ioSortFactor, new Path(MergeManagerImpl.this.reduceId.toString()), MergeManagerImpl.this.jobConf.getOutputKeyComparator(), (Progressable) MergeManagerImpl.this.reporter, MergeManagerImpl.this.spilledRecordsCounter, (Counters.Counter) null, MergeManagerImpl.this.mergedMapOutputsCounter, (Progress) null), writer, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
                writer.close();
                MergeManagerImpl.this.closeOnDiskFile(new CompressAwarePath(suffix, writer.getRawLength(), writer.getCompressedLength()));
                MergeManagerImpl.LOG.info(MergeManagerImpl.this.reduceId + " Finished merging " + list.size() + " map output files on disk of total-size " + checksumLength + ". Local output file is " + suffix + " of size " + MergeManagerImpl.this.localFS.getFileStatus(suffix).getLen());
            } catch (IOException e) {
                MergeManagerImpl.this.localFS.delete(suffix, true);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl$RawKVIteratorReader.class */
    public class RawKVIteratorReader extends IFile.Reader<K, V> {
        private final RawKeyValueIterator kvIter;

        public RawKVIteratorReader(RawKeyValueIterator rawKeyValueIterator, long j) throws IOException {
            super((Configuration) null, (FSDataInputStream) null, j, (CompressionCodec) null, MergeManagerImpl.this.spilledRecordsCounter);
            this.kvIter = rawKeyValueIterator;
        }

        @Override // org.apache.hadoop.mapred.IFile.Reader
        public boolean nextRawKey(DataInputBuffer dataInputBuffer) throws IOException {
            if (!this.kvIter.next()) {
                return false;
            }
            DataInputBuffer key = this.kvIter.getKey();
            int position = key.getPosition();
            int length = key.getLength() - position;
            dataInputBuffer.reset(key.getData(), position, length);
            this.bytesRead += length;
            return true;
        }

        @Override // org.apache.hadoop.mapred.IFile.Reader
        public void nextRawValue(DataInputBuffer dataInputBuffer) throws IOException {
            DataInputBuffer value = this.kvIter.getValue();
            int position = value.getPosition();
            int length = value.getLength() - position;
            dataInputBuffer.reset(value.getData(), position, length);
            this.bytesRead += length;
        }

        @Override // org.apache.hadoop.mapred.IFile.Reader
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override // org.apache.hadoop.mapred.IFile.Reader
        public void close() throws IOException {
            this.kvIter.close();
        }
    }

    public MergeManagerImpl(TaskAttemptID taskAttemptID, JobConf jobConf, FileSystem fileSystem, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec compressionCodec, Class<? extends Reducer> cls, Task.CombineOutputCollector<K, V> combineOutputCollector, Counters.Counter counter, Counters.Counter counter2, Counters.Counter counter3, ExceptionReporter exceptionReporter, Progress progress, MapOutputFile mapOutputFile) {
        this.reduceId = taskAttemptID;
        this.jobConf = jobConf;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.reporter = reporter;
        this.codec = compressionCodec;
        this.combinerClass = cls;
        this.combineCollector = combineOutputCollector;
        this.reduceCombineInputCounter = counter2;
        this.spilledRecordsCounter = counter;
        this.mergedMapOutputsCounter = counter3;
        this.mapOutputFile = mapOutputFile;
        this.mapOutputFile.setConf(jobConf);
        this.localFS = fileSystem;
        this.rfs = ((LocalFileSystem) fileSystem).getRaw();
        float f = jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.7f);
        if (f > 1.0d || f < CMAESOptimizer.DEFAULT_STOPFITNESS) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.input.buffer.percent: " + f);
        }
        this.memoryLimit = ((float) jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Runtime.getRuntime().maxMemory())) * f;
        this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
        float f2 = jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.25f);
        if (f2 <= 0.0f || f2 > 1.0f) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.memory.limit.percent: " + f2);
        }
        this.usedMemory = 0L;
        this.commitMemory = 0L;
        long j = ((float) this.memoryLimit) * f2;
        if (j > 2147483647L) {
            j = 2147483647L;
            LOG.info("The max number of bytes for a single in-memory shuffle cannot be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE");
        }
        this.maxSingleShuffleLimit = j;
        this.memToMemMergeOutputsThreshold = jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, this.ioSortFactor);
        this.mergeThreshold = ((float) this.memoryLimit) * jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f);
        LOG.info("MergerManager: memoryLimit=" + this.memoryLimit + Strings.DEFAULT_KEYVALUE_SEPARATOR + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + Strings.DEFAULT_KEYVALUE_SEPARATOR + "mergeThreshold=" + this.mergeThreshold + Strings.DEFAULT_KEYVALUE_SEPARATOR + "ioSortFactor=" + this.ioSortFactor + Strings.DEFAULT_KEYVALUE_SEPARATOR + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold);
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invalid configuration: maxSingleShuffleLimit should be less than mergeThreshold maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold);
        }
        if (jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false)) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold);
            this.memToMemMerger.start();
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = createInMemoryMerger();
        this.inMemoryMerger.start();
        this.onDiskMerger = new OnDiskMerger(this);
        this.onDiskMerger.start();
        this.mergePhase = progress;
    }

    protected MergeThread<InMemoryMapOutput<K, V>, K, V> createInMemoryMerger() {
        return new InMemoryMerger(this);
    }

    protected MergeThread<CompressAwarePath, K, V> createOnDiskMerger() {
        return new OnDiskMerger(this);
    }

    TaskAttemptID getReduceId() {
        return this.reduceId;
    }

    @VisibleForTesting
    ExceptionReporter getExceptionReporter() {
        return this.exceptionReporter;
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.MergeManager
    public void waitForResource() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.MergeManager
    public synchronized MapOutput<K, V> reserve(TaskAttemptID taskAttemptID, long j, int i) throws IOException {
        if (j > this.maxSingleShuffleLimit) {
            LOG.info(taskAttemptID + ": Shuffling to disk since " + j + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")");
            return new OnDiskMapOutput(taskAttemptID, this.reduceId, this, j, this.jobConf, this.mapOutputFile, i, true);
        }
        if (this.usedMemory > this.memoryLimit) {
            LOG.debug(taskAttemptID + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + "). CommitMemory is (" + this.commitMemory + ")");
            return null;
        }
        LOG.debug(taskAttemptID + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ").CommitMemory is (" + this.commitMemory + ")");
        return unconditionalReserve(taskAttemptID, j, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized InMemoryMapOutput<K, V> unconditionalReserve(TaskAttemptID taskAttemptID, long j, boolean z) {
        this.usedMemory += j;
        return new InMemoryMapOutput<>(this.jobConf, taskAttemptID, this, (int) j, this.codec, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unreserve(long j) {
        this.usedMemory -= j;
    }

    public synchronized void closeInMemoryFile(InMemoryMapOutput<K, V> inMemoryMapOutput) {
        this.inMemoryMapOutputs.add(inMemoryMapOutput);
        LOG.info("closeInMemoryFile -> map-output of size: " + inMemoryMapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory);
        this.commitMemory += inMemoryMapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            LOG.info("Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory);
            this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
            this.inMemoryMergedMapOutputs.clear();
            this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            this.commitMemory = 0L;
        }
        if (this.memToMemMerger == null || this.inMemoryMapOutputs.size() < this.memToMemMergeOutputsThreshold) {
            return;
        }
        this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
    }

    public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K, V> inMemoryMapOutput) {
        this.inMemoryMergedMapOutputs.add(inMemoryMapOutput);
        LOG.info("closeInMemoryMergedFile -> size: " + inMemoryMapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size());
    }

    public synchronized void closeOnDiskFile(CompressAwarePath compressAwarePath) {
        this.onDiskMapOutputs.add(compressAwarePath);
        if (this.onDiskMapOutputs.size() >= (2 * this.ioSortFactor) - 1) {
            this.onDiskMerger.startMerge(this.onDiskMapOutputs);
        }
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.MergeManager
    public RawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList arrayList = new ArrayList(this.inMemoryMergedMapOutputs);
        this.inMemoryMergedMapOutputs.clear();
        arrayList.addAll(this.inMemoryMapOutputs);
        this.inMemoryMapOutputs.clear();
        ArrayList arrayList2 = new ArrayList(this.onDiskMapOutputs);
        this.onDiskMapOutputs.clear();
        return finalMerge(this.jobConf, this.rfs, arrayList, arrayList2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void combineAndSpill(RawKeyValueIterator rawKeyValueIterator, Counters.Counter counter) throws IOException {
        JobConf jobConf = this.jobConf;
        Reducer reducer = (Reducer) ReflectionUtils.newInstance(this.combinerClass, jobConf);
        Class<?> mapOutputKeyClass = jobConf.getMapOutputKeyClass();
        Class<?> mapOutputValueClass = jobConf.getMapOutputValueClass();
        try {
            Task.CombineValuesIterator combineValuesIterator = new Task.CombineValuesIterator(rawKeyValueIterator, jobConf.getCombinerKeyGroupingComparator(), mapOutputKeyClass, mapOutputValueClass, jobConf, Reporter.NULL, counter);
            while (combineValuesIterator.more()) {
                reducer.reduce(combineValuesIterator.getKey(), combineValuesIterator, this.combineCollector, Reporter.NULL);
                combineValuesIterator.nextKey();
            }
        } finally {
            reducer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long createInMemorySegments(List<InMemoryMapOutput<K, V>> list, List<Merger.Segment<K, V>> list2, long j) throws IOException {
        long j2 = 0;
        long j3 = 0;
        while (list.iterator().hasNext()) {
            j3 += r0.next().getMemory().length;
        }
        while (j3 > j) {
            InMemoryMapOutput<K, V> remove = list.remove(0);
            byte[] memory = remove.getMemory();
            long length = memory.length;
            j2 += length;
            j3 -= length;
            list2.add(new Merger.Segment<>((IFile.Reader) new InMemoryReader(this, remove.getMapId(), memory, 0, (int) length, this.jobConf), true, remove.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return j2;
    }

    @VisibleForTesting
    final long getMaxInMemReduceLimit() {
        float f = this.jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0.0f);
        if (f > 1.0d || f < CMAESOptimizer.DEFAULT_STOPFITNESS) {
            throw new RuntimeException(f + ": " + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + " must be a float between 0 and 1.0");
        }
        return ((float) this.memoryLimit) * f;
    }

    /* JADX WARN: Finally extract failed */
    private RawKeyValueIterator finalMerge(JobConf jobConf, FileSystem fileSystem, List<InMemoryMapOutput<K, V>> list, List<CompressAwarePath> list2) throws IOException {
        LOG.info("finalMerge called with " + list.size() + " in-memory map-outputs and " + list2.size() + " on-disk map-outputs");
        long maxInMemReduceLimit = getMaxInMemReduceLimit();
        Class<?> mapOutputKeyClass = jobConf.getMapOutputKeyClass();
        Class<?> mapOutputValueClass = jobConf.getMapOutputValueClass();
        boolean keepFailedTaskFiles = jobConf.getKeepFailedTaskFiles();
        Path path = new Path(this.reduceId.toString());
        RawComparator outputKeyComparator = jobConf.getOutputKeyComparator();
        ArrayList arrayList = new ArrayList();
        long j = 0;
        boolean z = false;
        if (list.size() > 0) {
            TaskID taskID = list.get(0).getMapId().getTaskID();
            j = createInMemorySegments(list, arrayList, maxInMemReduceLimit);
            int size = arrayList.size();
            if (size > 0 && this.ioSortFactor > list2.size()) {
                z = true;
                Path suffix = this.mapOutputFile.getInputFileForWrite(taskID, j).suffix(Task.MERGED_OUTPUT_PREFIX);
                RawKeyValueIterator merge = Merger.merge(jobConf, fileSystem, mapOutputKeyClass, mapOutputValueClass, arrayList, size, path, outputKeyComparator, this.reporter, this.spilledRecordsCounter, null, this.mergePhase);
                IFile.Writer writer = new IFile.Writer(jobConf, CryptoUtils.wrapIfNecessary(jobConf, fileSystem.create(suffix)), mapOutputKeyClass, mapOutputValueClass, this.codec, null, true);
                try {
                    try {
                        Merger.writeFile(merge, writer, this.reporter, jobConf);
                        writer.close();
                        list2.add(new CompressAwarePath(suffix, writer.getRawLength(), writer.getCompressedLength()));
                        writer = null;
                        if (0 != 0) {
                            writer.close();
                        }
                        LOG.info("Merged " + size + " segments, " + j + " bytes to disk to satisfy reduce memory limit");
                        j = 0;
                        arrayList.clear();
                    } catch (Throwable th) {
                        if (null != writer) {
                            writer.close();
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    if (null != suffix) {
                        try {
                            fileSystem.delete(suffix, true);
                        } catch (IOException e2) {
                        }
                    }
                    throw e;
                }
            } else if (j != 0) {
                LOG.info("Keeping " + size + " segments, " + j + " bytes in memory for intermediate, on-disk merge");
            }
        }
        ArrayList arrayList2 = new ArrayList();
        long j2 = j;
        long j3 = j;
        CompressAwarePath[] compressAwarePathArr = (CompressAwarePath[]) list2.toArray(new CompressAwarePath[list2.size()]);
        for (CompressAwarePath compressAwarePath : compressAwarePathArr) {
            long len = fileSystem.getFileStatus(compressAwarePath).getLen();
            j2 += len;
            j3 += compressAwarePath.getRawDataLength() > 0 ? compressAwarePath.getRawDataLength() : len;
            LOG.debug("Disk file: " + compressAwarePath + " Length is " + len);
            arrayList2.add(new Merger.Segment(jobConf, fileSystem, compressAwarePath, this.codec, keepFailedTaskFiles, compressAwarePath.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter, compressAwarePath.getRawDataLength()));
        }
        LOG.info("Merging " + compressAwarePathArr.length + " files, " + j2 + " bytes from disk");
        Collections.sort(arrayList2, new Comparator<Merger.Segment<K, V>>() { // from class: org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.1
            @Override // java.util.Comparator
            public int compare(Merger.Segment<K, V> segment, Merger.Segment<K, V> segment2) {
                if (segment.getLength() == segment2.getLength()) {
                    return 0;
                }
                return segment.getLength() < segment2.getLength() ? -1 : 1;
            }
        });
        ArrayList arrayList3 = new ArrayList();
        LOG.info("Merging " + arrayList3.size() + " segments, " + createInMemorySegments(list, arrayList3, 0L) + " bytes from memory into reduce");
        if (0 != j2) {
            int size2 = arrayList.size();
            arrayList2.addAll(0, arrayList);
            arrayList.clear();
            RawKeyValueIterator merge2 = Merger.merge((Configuration) jobConf, fileSystem, (Class) mapOutputKeyClass, (Class) mapOutputValueClass, this.codec, (List) arrayList2, this.ioSortFactor, size2, path, outputKeyComparator, (Progressable) this.reporter, false, this.spilledRecordsCounter, (Counters.Counter) null, z ? null : this.mergePhase);
            arrayList2.clear();
            if (0 == arrayList3.size()) {
                return merge2;
            }
            arrayList3.add(new Merger.Segment<>((IFile.Reader) new RawKVIteratorReader(merge2, j2), true, j3));
        }
        return Merger.merge(jobConf, fileSystem, mapOutputKeyClass, mapOutputValueClass, arrayList3, arrayList3.size(), path, outputKeyComparator, this.reporter, this.spilledRecordsCounter, null, null);
    }
}
