package org.apache.hama.bsp.message.io;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.Constants;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;

/* loaded from: input_file:org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.class */
public class SpillingDataOutputBuffer extends DataOutputStream {
    private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/bsp/message/io/SpillingDataOutputBuffer$ProcessSpilledDataThread.class */
    public static class ProcessSpilledDataThread implements Callable<Boolean> {
        private SpillWriteIndexStatus status_;
        private List<SpilledByteBuffer> bufferList_;
        private long fileWrittenSize_;
        private boolean closed = false;
        SpilledDataProcessor processor;

        ProcessSpilledDataThread(SpillWriteIndexStatus spillWriteIndexStatus, List<SpilledByteBuffer> list, SpilledDataProcessor spilledDataProcessor) {
            this.status_ = spillWriteIndexStatus;
            this.bufferList_ = list;
            this.processor = spilledDataProcessor;
        }

        private void keepProcessingData() throws IOException {
            do {
                try {
                    int nextProcessorBufferIndex = this.status_.getNextProcessorBufferIndex();
                    while (nextProcessorBufferIndex >= 0) {
                        SpilledByteBuffer spilledByteBuffer = this.bufferList_.get(nextProcessorBufferIndex);
                        this.processor.handleSpilledBuffer(spilledByteBuffer);
                        spilledByteBuffer.clear();
                        try {
                            nextProcessorBufferIndex = this.status_.getNextProcessorBufferIndex();
                        } catch (InterruptedException e) {
                            SpillingDataOutputBuffer.LOG.error("Interrupted getting next index to process data.", e);
                            throw new IOException(e);
                        }
                    }
                } catch (InterruptedException e2) {
                    throw new IOException(e2);
                }
            } while (!this.closed);
            if (SpillingDataOutputBuffer.LOG.isDebugEnabled()) {
                SpillingDataOutputBuffer.LOG.debug("Done handling spilling data.");
            }
        }

        public void completeSpill() {
            this.closed = true;
        }

        public long getFileWrittenSize() {
            return this.fileWrittenSize_;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            try {
                keepProcessingData();
                return Boolean.TRUE;
            } catch (Exception e) {
                SpillingDataOutputBuffer.LOG.error("Error handling spilled data.", e);
                this.status_.notifyError();
                return Boolean.FALSE;
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/message/io/SpillingDataOutputBuffer$SpillingStream.class */
    static class SpillingStream extends OutputStream {
        final byte[] b;
        final boolean direct_;
        private List<SpilledByteBuffer> bufferList_;
        private int bufferSize_;
        private BitSet bufferState_;
        private int numberBuffers_;
        private SpilledByteBuffer currentBuffer_;
        protected long bytesWritten_;
        protected long bytesWrittenToBuffer;
        private long bytesRemaining_;
        private SpillWriteIndexStatus spillStatus_;
        private int thresholdSize_;
        private boolean startedSpilling_;
        private ProcessSpilledDataThread spillThread_;
        private ExecutorService spillThreadService_;
        private Future<Boolean> spillThreadState_;
        private boolean closed_;
        private int interBufferEndOfRecord;
        private SpilledDataProcessor processor;
        protected byte[] buf;
        protected int count;
        protected int defaultBufferSize_;
        static final /* synthetic */ boolean $assertionsDisabled;

        SpillingStream(int i, int i2, int i3, boolean z, SpilledDataProcessor spilledDataProcessor) {
            this(i, i2, i3, z, spilledDataProcessor, 8192);
        }

        SpillingStream(int i, int i2, int i3, boolean z, SpilledDataProcessor spilledDataProcessor, int i4) {
            if (!$assertionsDisabled && i3 < i2) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && i3 >= i * i2) {
                throw new AssertionError();
            }
            this.defaultBufferSize_ = i4 > i2 ? i2 / 2 : i4;
            this.b = new byte[1];
            this.buf = new byte[this.defaultBufferSize_];
            this.count = 0;
            this.direct_ = z;
            this.numberBuffers_ = i;
            this.bufferSize_ = i2;
            this.bufferList_ = new ArrayList(this.numberBuffers_);
            this.bufferState_ = new BitSet(i);
            for (int i5 = 0; i5 < i / 2; i5++) {
                this.bufferList_.add(new SpilledByteBuffer(this.direct_, this.bufferSize_));
            }
            this.currentBuffer_ = this.bufferList_.get(0);
            this.bytesWritten_ = 0L;
            this.bytesRemaining_ = this.bufferSize_;
            this.spillStatus_ = new SpillWriteIndexStatus(i2, this.numberBuffers_, 0, -1, this.bufferState_);
            this.thresholdSize_ = i3;
            this.startedSpilling_ = false;
            this.spillThread_ = null;
            this.spillThreadState_ = null;
            this.processor = spilledDataProcessor;
            this.closed_ = false;
        }

        public void markEndOfRecord() {
            this.interBufferEndOfRecord = (int) (this.bytesWrittenToBuffer + this.count);
            if (this.currentBuffer_.capacity() > this.interBufferEndOfRecord) {
                this.currentBuffer_.markEndOfRecord(this.interBufferEndOfRecord);
            }
        }

        public void clear() throws IOException {
            close();
            this.startedSpilling_ = false;
            this.bufferState_.clear();
            Iterator<SpilledByteBuffer> it = this.bufferList_.iterator();
            while (it.hasNext()) {
                it.next().clear();
            }
            this.currentBuffer_ = this.bufferList_.get(0);
            this.bytesWritten_ = 0L;
            this.bytesRemaining_ = this.bufferSize_;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (this.count >= this.buf.length - 1) {
                this.b[0] = (byte) (i & 255);
                write(this.b);
            } else {
                byte[] bArr = this.buf;
                int i2 = this.count;
                this.count = i2 + 1;
                bArr[i2] = (byte) (i & 255);
            }
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            write(bArr, 0, bArr.length);
        }

        private void startSpilling() throws InterruptedException, IOException {
            synchronized (this) {
                this.spillThread_ = new ProcessSpilledDataThread(this.spillStatus_, this.bufferList_, this.processor);
                this.startedSpilling_ = true;
                this.spillThreadService_ = Executors.newFixedThreadPool(1);
                this.spillThreadState_ = this.spillThreadService_.submit(this.spillThread_);
                if (!this.spillStatus_.startSpilling()) {
                    throw new IOException("Could not start spilling on disk.");
                }
            }
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            if (i2 >= this.buf.length) {
                flushBuffer();
                writeInternal(bArr, i, i2);
            } else {
                if (i2 > this.buf.length - this.count) {
                    flushBuffer();
                }
                System.arraycopy(bArr, i, this.buf, this.count, i2);
                this.count += i2;
            }
        }

        private void writeInternalImperfect(byte[] bArr, int i, int i2) throws IOException {
            if (!this.startedSpilling_ && this.bytesWritten_ >= this.thresholdSize_) {
                try {
                    startSpilling();
                } catch (InterruptedException e) {
                    throw new IOException("Internal error occured writing to buffer.", e);
                }
            }
            if (i2 > this.bytesRemaining_) {
                this.currentBuffer_.flip();
                this.currentBuffer_ = getBuffer(this.spillStatus_.getNextBufferIndex());
                this.bytesRemaining_ = this.bufferSize_;
                this.bytesWrittenToBuffer = this.bytesWritten_;
            }
            this.currentBuffer_.put(bArr, i, i2);
            this.bytesRemaining_ -= i2;
            this.bytesWritten_ += i2;
        }

        public void writeInternal(byte[] bArr, int i, int i2) throws IOException {
            int remaining = this.currentBuffer_.remaining();
            while (true) {
                int i3 = remaining;
                if (i2 <= i3) {
                    this.currentBuffer_.put(bArr, i, i2);
                    this.bytesWritten_ += i2;
                    this.bytesRemaining_ -= i2;
                    if (!this.startedSpilling_) {
                        checkSpillStart();
                    }
                    this.bytesWrittenToBuffer += i2;
                    return;
                }
                this.currentBuffer_.put(bArr, i, i3);
                this.bytesWritten_ += i3;
                if (!this.startedSpilling_) {
                    checkSpillStart();
                }
                this.currentBuffer_.flip();
                this.currentBuffer_ = getBuffer(this.spillStatus_.getNextBufferIndex());
                if (this.currentBuffer_ == null) {
                    throw new IOException("Error writing to spilling buffer. Could not get free buffer.");
                }
                this.bytesRemaining_ = this.bufferSize_;
                this.bytesWrittenToBuffer = 0L;
                i += i3;
                i2 -= i3;
                remaining = this.currentBuffer_.remaining();
            }
        }

        private void checkSpillStart() throws IOException {
            if (this.bytesWritten_ >= this.thresholdSize_) {
                try {
                    startSpilling();
                } catch (InterruptedException e) {
                    throw new IOException("Internal error occured writing to buffer.", e);
                }
            }
        }

        private void flushBuffer() throws IOException {
            if (this.count > 0) {
                writeInternal(this.buf, 0, this.count);
                this.count = 0;
            }
        }

        SpilledByteBuffer getBuffer(int i) throws IOException {
            if (i < 0) {
                return null;
            }
            if (i >= this.bufferList_.size()) {
                this.bufferList_.add(new SpilledByteBuffer(this.direct_, this.bufferSize_));
            }
            return this.bufferList_.get(i);
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            flushBuffer();
            flushInternal();
        }

        public void flushInternal() throws IOException {
            if (this.closed_) {
                return;
            }
            this.currentBuffer_.flip();
            this.spillStatus_.spillCompleted();
            if (this.startedSpilling_) {
                this.spillThread_.completeSpill();
                try {
                    try {
                        try {
                            if (!this.spillThreadState_.get().booleanValue()) {
                                throw new IOException("Spilling Thread failed to complete sucessfully.");
                            }
                        } catch (InterruptedException e) {
                            throw new IOException(e);
                        }
                    } catch (ExecutionException e2) {
                        throw new IOException(e2);
                    }
                } finally {
                    this.closed_ = true;
                    this.processor.close();
                    this.spillThreadService_.shutdownNow();
                }
            }
            this.closed_ = true;
        }

        static {
            $assertionsDisabled = !SpillingDataOutputBuffer.class.desiredAssertionStatus();
        }
    }

    public SpillingDataOutputBuffer(String str) throws FileNotFoundException {
        super(new SpillingStream(3, Constants.BUFFER_DEFAULT_SIZE, Constants.BUFFER_DEFAULT_SIZE, true, new WriteSpilledDataProcessor(str)));
    }

    public SpillingDataOutputBuffer(SpilledDataProcessor spilledDataProcessor) throws FileNotFoundException {
        super(new SpillingStream(3, Constants.BUFFER_DEFAULT_SIZE, Constants.BUFFER_DEFAULT_SIZE, true, spilledDataProcessor));
    }

    public SpillingDataOutputBuffer() throws FileNotFoundException {
        super(new SpillingStream(3, Constants.BUFFER_DEFAULT_SIZE, Constants.BUFFER_DEFAULT_SIZE, true, new WriteSpilledDataProcessor(System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32))));
    }

    public SpillingDataOutputBuffer(int i, int i2, int i3, boolean z, SpilledDataProcessor spilledDataProcessor) {
        super(new SpillingStream(i, i2, i3, z, spilledDataProcessor));
    }

    public void clear() throws IOException {
        ((SpillingStream) this.out).clear();
    }

    public boolean hasSpilled() {
        return ((SpillingStream) this.out).startedSpilling_;
    }

    public void markRecordEnd() {
        ((SpillingStream) this.out).markEndOfRecord();
    }

    public SpilledDataInputBuffer getInputStreamToRead(String str) throws IOException {
        SpillingStream spillingStream = (SpillingStream) this.out;
        SpilledDataInputBuffer.SpilledInputStream spilledInputStream = new SpilledDataInputBuffer.SpilledInputStream(str, spillingStream.direct_, spillingStream.bufferList_, spillingStream.startedSpilling_);
        spilledInputStream.prepareRead();
        return new SpilledDataInputBuffer(spilledInputStream);
    }
}
