/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.writers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.writers.BaseUnorderedPartitionedKVWriter;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnorderedPartitionedKVWriter
extends BaseUnorderedPartitionedKVWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVWriter.class);
    private static final int INT_SIZE = 4;
    private static final int NUM_META = 3;
    private static final int INDEX_KEYLEN = 0;
    private static final int INDEX_VALLEN = 1;
    private static final int INDEX_NEXT = 2;
    private static final int META_SIZE = 12;
    private static final int APPROX_HEADER_LENGTH = 150;
    private final String destNameTrimmed;
    private final long availableMemory;
    @VisibleForTesting
    final WrappedBuffer[] buffers;
    @VisibleForTesting
    final BlockingQueue<WrappedBuffer> availableBuffers;
    private final ByteArrayOutputStream baos;
    private final DataOutputStream dos;
    @VisibleForTesting
    WrappedBuffer currentBuffer;
    private final FileSystem rfs;
    private final List<SpillInfo> spillInfoList = Collections.synchronizedList(new ArrayList());
    private final ListeningExecutorService spillExecutor;
    private final int[] numRecordsPerPartition;
    private volatile long spilledSize = 0L;
    protected final TezCounter outputLargeRecordsCounter;
    @VisibleForTesting
    int numBuffers;
    @VisibleForTesting
    int sizePerBuffer;
    @VisibleForTesting
    int numInitializedBuffers;
    private Throwable spillException;
    private AtomicBoolean isShutdown = new AtomicBoolean(false);
    @VisibleForTesting
    final AtomicInteger numSpills = new AtomicInteger(0);
    private final AtomicInteger pendingSpillCount = new AtomicInteger(0);
    @VisibleForTesting
    Path finalIndexPath;
    @VisibleForTesting
    Path finalOutPath;
    private final IFile.Writer writer;
    private final boolean skipBuffers;
    private final ReentrantLock spillLock = new ReentrantLock();
    private final Condition spillInProgress = this.spillLock.newCondition();
    private final boolean pipelinedShuffle;
    private final long indexFileSizeEstimate;

    public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException {
        super(outputContext, conf, numOutputs);
        Preconditions.checkArgument((availableMemoryBytes >= 0L ? 1 : 0) != 0, (Object)"availableMemory should be >= 0 bytes");
        this.destNameTrimmed = TezUtilsInternal.cleanVertexName((String)outputContext.getDestinationVertexName());
        this.pipelinedShuffle = this.conf.getBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        if (availableMemoryBytes == 0L) {
            Preconditions.checkArgument((this.numPartitions == 1 && !this.pipelinedShuffle ? 1 : 0) != 0, (Object)("availableMemory can be set to 0 only when numPartitions=1 and tez.runtime.pipelined-shuffle.enabled is disabled. current numPartitions=" + this.numPartitions + ", " + "tez.runtime.pipelined-shuffle.enabled" + "=" + this.pipelinedShuffle));
        }
        this.availableMemory = availableMemoryBytes;
        int maxSingleBufferSizeBytes = conf.getInt("tez.runtime.unordered.output.max-per-buffer.size-bytes", Integer.MAX_VALUE);
        this.computeNumBuffersAndSize(maxSingleBufferSizeBytes);
        this.availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
        this.buffers = new WrappedBuffer[this.numBuffers];
        this.buffers[0] = new WrappedBuffer(numOutputs, this.sizePerBuffer);
        this.numInitializedBuffers = 1;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.destNameTrimmed + ": " + "Initializing Buffer #" + this.numInitializedBuffers + " with size=" + this.sizePerBuffer);
        }
        this.currentBuffer = this.buffers[0];
        this.baos = new ByteArrayOutputStream();
        this.dos = new DataOutputStream(this.baos);
        this.keySerializer.open((OutputStream)this.dos);
        this.valSerializer.open((OutputStream)this.dos);
        this.rfs = FileSystem.getLocal((Configuration)this.conf).getRaw();
        ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName((String)outputContext.getDestinationVertexName()) + "}").build());
        this.spillExecutor = MoreExecutors.listeningDecorator((ExecutorService)executor);
        this.numRecordsPerPartition = new int[this.numPartitions];
        this.outputLargeRecordsCounter = outputContext.getCounters().findCounter((Enum)TaskCounter.OUTPUT_LARGE_RECORDS);
        this.indexFileSizeEstimate = this.numPartitions * 24;
        if (this.numPartitions == 1 && !this.pipelinedShuffle) {
            this.finalOutPath = this.outputFileHandler.getOutputFileForWrite();
            this.finalIndexPath = this.outputFileHandler.getOutputIndexFileForWrite(this.indexFileSizeEstimate);
            this.skipBuffers = true;
            this.writer = new IFile.Writer(conf, this.rfs, this.finalOutPath, this.keyClass, this.valClass, this.codec, this.outputRecordsCounter, this.outputRecordBytesCounter);
        } else {
            this.skipBuffers = false;
            this.writer = null;
        }
        LOG.info(this.destNameTrimmed + ": " + "numBuffers=" + this.numBuffers + ", sizePerBuffer=" + this.sizePerBuffer + ", skipBuffers=" + this.skipBuffers + ", pipelinedShuffle=" + this.pipelinedShuffle + ", numPartitions=" + this.numPartitions);
    }

    private void computeNumBuffersAndSize(int bufferLimit) {
        this.numBuffers = Math.max(2, (int)(this.availableMemory / (long)bufferLimit) + (this.availableMemory % (long)bufferLimit == 0L ? 0 : 1));
        this.sizePerBuffer = (int)(this.availableMemory / (long)this.numBuffers);
        this.sizePerBuffer -= this.sizePerBuffer % 4;
    }

    @Override
    public void write(Object key, Object value) throws IOException {
        if (this.isShutdown.get()) {
            throw new RuntimeException("Writer already closed");
        }
        if (this.spillException != null) {
            throw new IOException("Exception during spill", new IOException(this.spillException));
        }
        if (this.skipBuffers) {
            this.writer.append(key, value);
            this.outputContext.notifyProgress();
        } else {
            int partition = this.partitioner.getPartition(key, value, this.numPartitions);
            this.write(key, value, partition);
        }
    }

    private void write(Object key, Object value, int partition) throws IOException {
        int metaSkip;
        int mod = this.currentBuffer.nextPosition % 4;
        int n = metaSkip = mod == 0 ? 0 : 4 - mod;
        if (this.currentBuffer.availableSize < 12 + metaSkip || this.currentBuffer.full) {
            metaSkip = 0;
            this.setupNextBuffer();
        }
        this.currentBuffer.nextPosition += metaSkip;
        int metaStart = this.currentBuffer.nextPosition;
        this.currentBuffer.availableSize -= 12 + metaSkip;
        this.currentBuffer.nextPosition += 12;
        this.keySerializer.serialize(key);
        if (this.currentBuffer.full) {
            if (metaStart == 0) {
                this.currentBuffer.reset();
                this.writeLargeRecord(key, value, partition);
                return;
            }
            this.setupNextBuffer();
            this.write(key, value, partition);
            return;
        }
        int valStart = this.currentBuffer.nextPosition;
        this.valSerializer.serialize(value);
        if (this.currentBuffer.full) {
            if (metaStart == 0) {
                this.currentBuffer.reset();
                this.writeLargeRecord(key, value, partition);
                return;
            }
            this.setupNextBuffer();
            this.write(key, value, partition);
            return;
        }
        int metaIndex = metaStart / 4;
        int indexNext = this.currentBuffer.partitionPositions[partition];
        this.currentBuffer.metaBuffer.put(metaIndex + 0, valStart - (metaStart + 12));
        this.currentBuffer.metaBuffer.put(metaIndex + 1, this.currentBuffer.nextPosition - valStart);
        this.currentBuffer.metaBuffer.put(metaIndex + 2, indexNext);
        this.currentBuffer.skipSize += metaSkip;
        this.outputRecordBytesCounter.increment((long)(this.currentBuffer.nextPosition - (metaStart + 12)));
        this.outputBytesWithOverheadCounter.increment((long)(this.currentBuffer.nextPosition - metaStart + metaSkip));
        this.outputRecordsCounter.increment(1L);
        this.outputContext.notifyProgress();
        ((WrappedBuffer)this.currentBuffer).partitionPositions[partition] = metaStart;
        int[] nArray = this.currentBuffer.recordsPerPartition;
        int n2 = partition;
        nArray[n2] = nArray[n2] + 1;
        this.currentBuffer.numRecords++;
    }

    private void setupNextBuffer() throws IOException {
        if (this.currentBuffer.numRecords == 0) {
            this.currentBuffer.reset();
        } else {
            WrappedBuffer wb;
            LOG.info(this.destNameTrimmed + ": " + "Moving to next buffer and triggering spill");
            this.updateGlobalStats(this.currentBuffer);
            this.pendingSpillCount.incrementAndGet();
            SpillPathDetails spillPathDetails = this.getSpillPathDetails(false, -1L);
            ListenableFuture future = this.spillExecutor.submit((Callable)((Object)new SpillCallable(this.currentBuffer, this.codec, this.spilledRecordsCounter, spillPathDetails)));
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new SpillCallback(spillPathDetails.spillIndex));
            this.currentBuffer = wb = this.getNextAvailableBuffer();
        }
    }

    private void updateGlobalStats(WrappedBuffer buffer) {
        for (int i = 0; i < this.numPartitions; ++i) {
            int n = i;
            this.numRecordsPerPartition[n] = this.numRecordsPerPartition[n] + buffer.recordsPerPartition[i];
        }
    }

    private WrappedBuffer getNextAvailableBuffer() throws IOException {
        if (this.availableBuffers.peek() == null) {
            if (this.numInitializedBuffers < this.numBuffers) {
                this.buffers[this.numInitializedBuffers] = new WrappedBuffer(this.numPartitions, this.sizePerBuffer);
                ++this.numInitializedBuffers;
                return this.buffers[this.numInitializedBuffers - 1];
            }
            try {
                return this.availableBuffers.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOInterruptedException("Interrupted while waiting for next buffer", e);
            }
        }
        return (WrappedBuffer)this.availableBuffers.poll();
    }

    private void writePartition(int pos, WrappedBuffer wrappedBuffer, IFile.Writer writer, DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException {
        while (pos != -1) {
            int metaIndex = pos / 4;
            int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + 0);
            int valLength = wrappedBuffer.metaBuffer.get(metaIndex + 1);
            keyBuffer.reset(wrappedBuffer.buffer, pos + 12, keyLength);
            valBuffer.reset(wrappedBuffer.buffer, pos + 12 + keyLength, valLength);
            writer.append(keyBuffer, valBuffer);
            pos = wrappedBuffer.metaBuffer.get(metaIndex + 2);
        }
    }

    public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
        int initialMemRequestMb = conf.getInt("tez.runtime.unordered.output.buffer.size-mb", 100);
        Preconditions.checkArgument((initialMemRequestMb != 0 ? 1 : 0) != 0, (Object)"tez.runtime.unordered.output.buffer.size-mb should be larger than 0");
        long reqBytes = initialMemRequestMb << 20;
        LOG.info("Requested BufferSize (tez.runtime.unordered.output.buffer.size-mb) : " + initialMemRequestMb);
        return reqBytes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Event> close() throws IOException, InterruptedException {
        this.isShutdown.set(true);
        this.spillLock.lock();
        try {
            LOG.info(this.destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + this.pendingSpillCount.get());
            while (this.pendingSpillCount.get() != 0 && this.spillException == null) {
                this.spillInProgress.await();
            }
        }
        finally {
            this.spillLock.unlock();
        }
        if (this.spillException != null) {
            LOG.error(this.destNameTrimmed + ": " + "Error during spill, throwing");
            this.cleanup();
            this.currentBuffer.cleanup();
            this.currentBuffer = null;
            if (this.spillException instanceof IOException) {
                throw (IOException)this.spillException;
            }
            throw new IOException(this.spillException);
        }
        LOG.info(this.destNameTrimmed + ": " + "All spills complete");
        this.cleanup();
        LinkedList events = Lists.newLinkedList();
        if (!this.pipelinedShuffle) {
            if (this.skipBuffers) {
                this.writer.close();
                long rawLen = this.writer.getRawLength();
                long compLen = this.writer.getCompressedLength();
                TezIndexRecord rec = new TezIndexRecord(0L, rawLen, compLen);
                TezSpillRecord sr = new TezSpillRecord(1);
                sr.putIndex(rec, 0);
                sr.writeToFile(this.finalIndexPath, this.conf);
                BitSet emptyPartitions = new BitSet();
                if (this.outputRecordsCounter.getValue() == 0L) {
                    emptyPartitions.set(0);
                }
                this.cleanupCurrentBuffer();
                this.outputBytesWithOverheadCounter.increment(rawLen);
                this.fileOutputBytesCounter.increment(compLen + this.indexFileSizeEstimate);
                return Collections.singletonList(this.generateDMEvent(false, -1, false, this.outputContext.getUniqueIdentifier(), emptyPartitions));
            }
            if (this.numSpills.get() > 0) {
                this.mergeAll();
            } else {
                this.finalSpill();
            }
            this.cleanupCurrentBuffer();
            return Collections.singletonList(this.generateDMEvent());
        }
        if (this.finalSpill()) {
            this.sendPipelinedEventForSpill(this.currentBuffer.recordsPerPartition, this.numSpills.get() - 1, true);
        }
        this.cleanupCurrentBuffer();
        return events;
    }

    private BitSet getEmptyPartitions(int[] recordsPerPartition) {
        Preconditions.checkArgument((recordsPerPartition != null ? 1 : 0) != 0, (Object)"records per partition can not be null");
        BitSet emptyPartitions = new BitSet();
        for (int i = 0; i < this.numPartitions; ++i) {
            if (recordsPerPartition[i] != 0) continue;
            emptyPartitions.set(i);
        }
        return emptyPartitions;
    }

    private Event generateDMEvent() throws IOException {
        BitSet emptyPartitions = this.getEmptyPartitions(this.numRecordsPerPartition);
        return this.generateDMEvent(false, -1, false, this.outputContext.getUniqueIdentifier(), emptyPartitions);
    }

    private Event generateDMEvent(boolean addSpillDetails, int spillId, boolean isLastSpill, String pathComponent, BitSet emptyPartitions) throws IOException {
        this.outputContext.notifyProgress();
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        String host = this.getHost();
        if (emptyPartitions.cardinality() != 0) {
            ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)emptyPartitions));
            payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
        }
        if (emptyPartitions.cardinality() != this.numPartitions) {
            payloadBuilder.setHost(host);
            payloadBuilder.setPort(this.getShufflePort());
            payloadBuilder.setPathComponent(pathComponent);
        }
        if (addSpillDetails) {
            payloadBuilder.setSpillId(spillId);
            payloadBuilder.setLastEvent(isLastSpill);
        }
        ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
        return CompositeDataMovementEvent.create((int)0, (int)this.numPartitions, (ByteBuffer)payload);
    }

    private void cleanupCurrentBuffer() {
        this.currentBuffer.cleanup();
        this.currentBuffer = null;
    }

    private void cleanup() {
        if (this.spillExecutor != null) {
            this.spillExecutor.shutdownNow();
        }
        for (int i = 0; i < this.buffers.length; ++i) {
            if (this.buffers[i] == null || this.buffers[i] == this.currentBuffer) continue;
            this.buffers[i].cleanup();
            this.buffers[i] = null;
        }
        this.availableBuffers.clear();
    }

    private boolean finalSpill() throws IOException {
        if (this.currentBuffer.nextPosition == 0) {
            if (this.pipelinedShuffle) {
                BitSet emptyPartitions = new BitSet(this.numPartitions);
                emptyPartitions.flip(0, this.numPartitions);
                this.outputContext.sendEvents(Collections.singletonList(this.generateDMEvent(true, this.numSpills.get(), true, null, emptyPartitions)));
            }
            return false;
        }
        this.updateGlobalStats(this.currentBuffer);
        SpillPathDetails spillPathDetails = this.getSpillPathDetails(true, -1L);
        SpillCallable spillCallable = new SpillCallable(this.currentBuffer, this.codec, null, spillPathDetails);
        try {
            SpillResult spillResult = (SpillResult)spillCallable.call();
            this.fileOutputBytesCounter.increment(spillResult.spillSize);
            this.fileOutputBytesCounter.increment(this.indexFileSizeEstimate);
        }
        catch (Exception ex) {
            throw ex instanceof IOException ? (IOException)ex : new IOException(ex);
        }
        return true;
    }

    private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) throws IOException {
        int spillNumber = this.numSpills.getAndIncrement();
        long spillSize = expectedSpillSize < 0L ? (long)(this.currentBuffer.nextPosition + this.numPartitions * 150) : expectedSpillSize;
        Path outputFilePath = null;
        Path indexFilePath = null;
        if (!this.pipelinedShuffle) {
            if (isFinalSpill) {
                outputFilePath = this.outputFileHandler.getOutputFileForWrite(spillSize);
                indexFilePath = this.outputFileHandler.getOutputIndexFileForWrite(this.indexFileSizeEstimate);
                this.finalOutPath = outputFilePath;
                this.finalIndexPath = indexFilePath;
            } else {
                outputFilePath = this.outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
            }
        } else {
            outputFilePath = this.outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
            indexFilePath = this.outputFileHandler.getSpillIndexFileForWrite(spillNumber, this.indexFileSizeEstimate);
        }
        SpillPathDetails spillDetails = new SpillPathDetails(outputFilePath, indexFilePath, spillNumber);
        return spillDetails;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeAll() throws IOException {
        long expectedSize = this.spilledSize;
        if (this.currentBuffer.nextPosition != 0) {
            expectedSize += (long)(this.currentBuffer.nextPosition - this.currentBuffer.numRecords * 12 - this.currentBuffer.skipSize + this.numPartitions * 150);
            this.updateGlobalStats(this.currentBuffer);
        }
        SpillPathDetails spillPathDetails = this.getSpillPathDetails(true, expectedSize);
        this.finalIndexPath = spillPathDetails.indexFilePath;
        this.finalOutPath = spillPathDetails.outputFilePath;
        TezSpillRecord finalSpillRecord = new TezSpillRecord(this.numPartitions);
        DataInputBuffer keyBuffer = new DataInputBuffer();
        DataInputBuffer valBuffer = new DataInputBuffer();
        DataInputBuffer keyBufferIFile = new DataInputBuffer();
        DataInputBuffer valBufferIFile = new DataInputBuffer();
        try (FSDataOutputStream out = null;){
            out = this.rfs.create(this.finalOutPath);
            IFile.Writer writer = null;
            for (int i = 0; i < this.numPartitions; ++i) {
                long segmentStart = out.getPos();
                if (this.numRecordsPerPartition[i] == 0) {
                    LOG.info(this.destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
                    continue;
                }
                writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, null, null);
                try {
                    if (this.currentBuffer.nextPosition != 0 && this.currentBuffer.partitionPositions[i] != -1) {
                        this.writePartition(this.currentBuffer.partitionPositions[i], this.currentBuffer, writer, keyBuffer, valBuffer);
                    }
                    List<SpillInfo> list = this.spillInfoList;
                    synchronized (list) {
                        for (SpillInfo spillInfo : this.spillInfoList) {
                            this.outputContext.notifyProgress();
                            TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
                            if (indexRecord.getPartLength() == 0L) continue;
                            FSDataInputStream in = this.rfs.open(spillInfo.outPath);
                            in.seek(indexRecord.getStartOffset());
                            IFile.Reader reader = new IFile.Reader((InputStream)in, indexRecord.getPartLength(), this.codec, null, this.additionalSpillBytesReadCounter, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize);
                            while (reader.nextRawKey(keyBufferIFile)) {
                                reader.nextRawValue(valBufferIFile);
                                writer.append(keyBufferIFile, valBufferIFile);
                            }
                            reader.close();
                        }
                    }
                    writer.close();
                    this.fileOutputBytesCounter.increment(writer.getCompressedLength());
                    TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    writer = null;
                    finalSpillRecord.putIndex(indexRecord, i);
                    continue;
                }
                finally {
                    if (writer != null) {
                        writer.close();
                    }
                }
            }
        }
        finalSpillRecord.writeToFile(this.finalIndexPath, this.conf);
        this.fileOutputBytesCounter.increment(this.indexFileSizeEstimate);
        LOG.info(this.destNameTrimmed + ": " + "Finished final spill after merging : " + this.numSpills.get() + " spills");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeLargeRecord(Object key, Object value, int partition) throws IOException {
        this.numAdditionalSpillsCounter.increment(1L);
        long size = this.sizePerBuffer - this.currentBuffer.numRecords * 12 - this.currentBuffer.skipSize + this.numPartitions * 150;
        SpillPathDetails spillPathDetails = this.getSpillPathDetails(false, size);
        int spillIndex = spillPathDetails.spillIndex;
        long outSize = 0L;
        try (FSDataOutputStream out = null;){
            TezSpillRecord spillRecord = new TezSpillRecord(this.numPartitions);
            Path outPath = spillPathDetails.outputFilePath;
            out = this.rfs.create(outPath);
            BitSet emptyPartitions = null;
            if (this.pipelinedShuffle) {
                emptyPartitions = new BitSet(this.numPartitions);
            }
            for (int i = 0; i < this.numPartitions; ++i) {
                long recordStart = out.getPos();
                if (i == partition) {
                    this.spilledRecordsCounter.increment(1L);
                    try (IFile.Writer writer = null;){
                        writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, null, null);
                        writer.append(key, value);
                        this.outputLargeRecordsCounter.increment(1L);
                        int n = i;
                        this.numRecordsPerPartition[n] = this.numRecordsPerPartition[n] + 1;
                        writer.close();
                        this.additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
                        TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), writer.getCompressedLength());
                        spillRecord.putIndex(indexRecord, i);
                        outSize = writer.getCompressedLength();
                        writer = null;
                        continue;
                    }
                }
                if (emptyPartitions == null) continue;
                emptyPartitions.set(i);
            }
            this.handleSpillIndex(spillPathDetails, spillRecord);
            this.sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
            LOG.info(this.destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath=" + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath);
            }
        }
    }

    private void handleSpillIndex(SpillPathDetails spillPathDetails, TezSpillRecord spillRecord) throws IOException {
        if (spillPathDetails.indexFilePath != null) {
            spillRecord.writeToFile(spillPathDetails.indexFilePath, this.conf);
        } else {
            SpillInfo spillInfo = new SpillInfo(spillRecord, spillPathDetails.outputFilePath);
            this.spillInfoList.add(spillInfo);
            this.numAdditionalSpillsCounter.increment(1L);
        }
    }

    private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) {
        if (!this.pipelinedShuffle) {
            return;
        }
        try {
            String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + spillNumber;
            Event compEvent = this.generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions);
            LOG.info(this.destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
            this.outputContext.sendEvents(Collections.singletonList(compEvent));
        }
        catch (IOException e) {
            LOG.error(this.destNameTrimmed + ": " + "Error in sending pipelined events", (Throwable)e);
            this.outputContext.fatalError((Throwable)e, "Error in sending pipelined events");
        }
    }

    private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean isFinalUpdate) {
        BitSet emptyPartitions = this.getEmptyPartitions(recordsPerPartition);
        this.sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate);
    }

    @VisibleForTesting
    String getHost() {
        return this.outputContext.getExecutionContext().getHostName();
    }

    @VisibleForTesting
    int getShufflePort() throws IOException {
        ByteBuffer shuffleMetadata = this.outputContext.getServiceProviderMetaData("mapreduce_shuffle");
        int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
        return shufflePort;
    }

    @InterfaceAudience.Private
    static class SpillPathDetails {
        final Path indexFilePath;
        final Path outputFilePath;
        final int spillIndex;

        SpillPathDetails(Path outputFilePath, Path indexFilePath, int spillIndex) {
            this.outputFilePath = outputFilePath;
            this.indexFilePath = indexFilePath;
            this.spillIndex = spillIndex;
        }
    }

    private static class SpillInfo {
        final TezSpillRecord spillRecord;
        final Path outPath;

        SpillInfo(TezSpillRecord spillRecord, Path outPath) {
            this.spillRecord = spillRecord;
            this.outPath = outPath;
        }
    }

    private static class SpillResult {
        final long spillSize;
        final WrappedBuffer wrappedBuffer;

        SpillResult(long size, WrappedBuffer wrappedBuffer) {
            this.spillSize = size;
            this.wrappedBuffer = wrappedBuffer;
        }
    }

    private class SpillCallback
    implements FutureCallback<SpillResult> {
        private final int spillNumber;

        SpillCallback(int spillNumber) {
            this.spillNumber = spillNumber;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(SpillResult result) {
            UnorderedPartitionedKVWriter.this.spilledSize += result.spillSize;
            UnorderedPartitionedKVWriter.this.sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, this.spillNumber, false);
            try {
                result.wrappedBuffer.reset();
                UnorderedPartitionedKVWriter.this.availableBuffers.add(result.wrappedBuffer);
            }
            catch (Throwable e) {
                LOG.error(UnorderedPartitionedKVWriter.this.destNameTrimmed + ": " + "Failure while attempting to reset buffer after spill", e);
                UnorderedPartitionedKVWriter.this.outputContext.fatalError(e, "Failure while attempting to reset buffer after spill");
            }
            if (!UnorderedPartitionedKVWriter.this.pipelinedShuffle) {
                UnorderedPartitionedKVWriter.this.additionalSpillBytesWritternCounter.increment(result.spillSize);
            } else {
                UnorderedPartitionedKVWriter.this.fileOutputBytesCounter.increment(UnorderedPartitionedKVWriter.this.indexFileSizeEstimate);
                UnorderedPartitionedKVWriter.this.fileOutputBytesCounter.increment(result.spillSize);
            }
            UnorderedPartitionedKVWriter.this.spillLock.lock();
            try {
                if (UnorderedPartitionedKVWriter.this.pendingSpillCount.decrementAndGet() == 0) {
                    UnorderedPartitionedKVWriter.this.spillInProgress.signal();
                }
            }
            finally {
                UnorderedPartitionedKVWriter.this.spillLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(Throwable t) {
            LOG.error(UnorderedPartitionedKVWriter.this.destNameTrimmed + ": " + "Failure while spilling to disk", t);
            UnorderedPartitionedKVWriter.this.spillException = t;
            UnorderedPartitionedKVWriter.this.outputContext.fatalError(t, "Failure while spilling to disk");
            UnorderedPartitionedKVWriter.this.spillLock.lock();
            try {
                UnorderedPartitionedKVWriter.this.spillInProgress.signal();
            }
            finally {
                UnorderedPartitionedKVWriter.this.spillLock.unlock();
            }
        }
    }

    private static class WrappedBuffer {
        private static final int PARTITION_ABSENT_POSITION = -1;
        private final int[] partitionPositions;
        private final int[] recordsPerPartition;
        private final int numPartitions;
        private final int size;
        private byte[] buffer;
        private IntBuffer metaBuffer;
        private int numRecords = 0;
        private int skipSize = 0;
        private int nextPosition = 0;
        private int availableSize;
        private boolean full = false;

        WrappedBuffer(int numPartitions, int size) {
            this.partitionPositions = new int[numPartitions];
            this.recordsPerPartition = new int[numPartitions];
            this.numPartitions = numPartitions;
            for (int i = 0; i < numPartitions; ++i) {
                this.partitionPositions[i] = -1;
                this.recordsPerPartition[i] = 0;
            }
            size -= size % 4;
            this.size = size;
            this.buffer = new byte[size];
            this.metaBuffer = ByteBuffer.wrap(this.buffer).order(ByteOrder.nativeOrder()).asIntBuffer();
            this.availableSize = size;
        }

        void reset() {
            for (int i = 0; i < this.numPartitions; ++i) {
                this.partitionPositions[i] = -1;
                this.recordsPerPartition[i] = 0;
            }
            this.numRecords = 0;
            this.nextPosition = 0;
            this.skipSize = 0;
            this.availableSize = this.size;
            this.full = false;
        }

        void cleanup() {
            this.buffer = null;
            this.metaBuffer = null;
        }
    }

    private class ByteArrayOutputStream
    extends OutputStream {
        private final byte[] scratch = new byte[1];

        private ByteArrayOutputStream() {
        }

        @Override
        public void write(int v) throws IOException {
            this.scratch[0] = (byte)v;
            this.write(this.scratch, 0, 1);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            if (!UnorderedPartitionedKVWriter.this.currentBuffer.full) {
                if (len > UnorderedPartitionedKVWriter.this.currentBuffer.availableSize) {
                    UnorderedPartitionedKVWriter.this.currentBuffer.full = true;
                } else {
                    System.arraycopy(b, off, UnorderedPartitionedKVWriter.this.currentBuffer.buffer, UnorderedPartitionedKVWriter.this.currentBuffer.nextPosition, len);
                    UnorderedPartitionedKVWriter.this.currentBuffer.nextPosition += len;
                    UnorderedPartitionedKVWriter.this.currentBuffer.availableSize -= len;
                }
            }
        }
    }

    private class SpillCallable
    extends CallableWithNdc<SpillResult> {
        private final WrappedBuffer wrappedBuffer;
        private final CompressionCodec codec;
        private final TezCounter numRecordsCounter;
        private final int spillIndex;
        private final SpillPathDetails spillPathDetails;

        public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) {
            this.wrappedBuffer = wrappedBuffer;
            this.codec = codec;
            this.numRecordsCounter = numRecordsCounter;
            this.spillIndex = spillPathDetails.spillIndex;
            Preconditions.checkArgument((spillPathDetails.outputFilePath != null ? 1 : 0) != 0, (Object)"Spill output file path can not be null");
            this.spillPathDetails = spillPathDetails;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected SpillResult callInternal() throws IOException {
            SpillResult spillResult = null;
            FSDataOutputStream out = UnorderedPartitionedKVWriter.this.rfs.create(this.spillPathDetails.outputFilePath);
            TezSpillRecord spillRecord = new TezSpillRecord(UnorderedPartitionedKVWriter.this.numPartitions);
            DataInputBuffer key = new DataInputBuffer();
            DataInputBuffer val = new DataInputBuffer();
            long compressedLength = 0L;
            for (int i = 0; i < UnorderedPartitionedKVWriter.this.numPartitions; ++i) {
                UnorderedPartitionedKVWriter.this.outputContext.notifyProgress();
                try (IFile.Writer writer = null;){
                    long segmentStart = out.getPos();
                    if (this.wrappedBuffer.partitionPositions[i] == -1) continue;
                    writer = new IFile.Writer(UnorderedPartitionedKVWriter.this.conf, out, UnorderedPartitionedKVWriter.this.keyClass, UnorderedPartitionedKVWriter.this.valClass, this.codec, this.numRecordsCounter, null);
                    UnorderedPartitionedKVWriter.this.writePartition(this.wrappedBuffer.partitionPositions[i], this.wrappedBuffer, writer, key, val);
                    writer.close();
                    compressedLength += writer.getCompressedLength();
                    TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    spillRecord.putIndex(indexRecord, i);
                    writer = null;
                    continue;
                }
            }
            spillResult = new SpillResult(compressedLength, this.wrappedBuffer);
            UnorderedPartitionedKVWriter.this.handleSpillIndex(this.spillPathDetails, spillRecord);
            LOG.info(UnorderedPartitionedKVWriter.this.destNameTrimmed + ": " + "Finished spill " + this.spillIndex);
            if (LOG.isDebugEnabled()) {
                LOG.debug(UnorderedPartitionedKVWriter.this.destNameTrimmed + ": " + "Spill=" + this.spillIndex + ", indexPath=" + this.spillPathDetails.indexFilePath + ", outputPath=" + this.spillPathDetails.outputFilePath);
            }
            return spillResult;
        }
    }
}

