package org.apache.hama.bsp.message.queue;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle;
import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.PreFetchCache;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;

/* loaded from: input_file:org/apache/hama/bsp/message/queue/SpillingQueue.class */
public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M> implements MessageTransferQueue<M> {
    private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
    private Configuration conf;
    private SpillingDataOutputBuffer spillOutputBuffer;
    private int numMessagesWritten;
    private int numMessagesRead;
    public static final String SPILLBUFFER_COUNT = "hama.io.spillbuffer.count";
    public static final String SPILLBUFFER_SIZE = "hama.io.spillbuffer.size";
    public static final String SPILLBUFFER_FILENAME = "hama.io.spillbuffer.filename";
    public static final String SPILLBUFFER_THRESHOLD = "hama.io.spillbuffer.threshold";
    public static final String SPILLBUFFER_DIRECT = "hama.io.spillbuffer.direct";
    public static final String ENABLE_PREFETCH = "hama.io.spillbuffer.enableprefetch";
    public static final String SPILLBUFFER_MSGCLASS = "hama.io.spillbuffer.msgclass";
    private int bufferCount;
    private int bufferSize;
    private String fileName;
    private int threshold;
    private boolean direct;
    private SpilledDataInputBuffer spilledInput;
    private boolean objectWritableMode;
    private ObjectWritable objectWritable;
    private Class<M> messageClass;
    private PreFetchCache<M> prefetchCache;
    private boolean enablePrefetch;

    /* loaded from: input_file:org/apache/hama/bsp/message/queue/SpillingQueue$SpillIterator.class */
    private class SpillIterator implements Iterator<M> {
        private boolean objectMode;
        private Class<M> classObject;
        private M messageHolder;

        public SpillIterator(boolean z, Class<M> cls, Configuration configuration) {
            this.objectMode = z;
            this.classObject = cls;
            if (this.classObject != null) {
                this.messageHolder = (M) ReflectionUtils.newInstance(cls, configuration);
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return SpillingQueue.this.numMessagesRead != SpillingQueue.this.numMessagesWritten && SpillingQueue.this.numMessagesWritten > 0;
        }

        @Override // java.util.Iterator
        public M next() {
            return this.objectMode ? (M) SpillingQueue.this.poll() : (M) SpillingQueue.this.poll(this.messageHolder);
        }

        @Override // java.util.Iterator
        public void remove() {
        }
    }

    @Override // java.lang.Iterable
    public Iterator<M> iterator() {
        return new SpillIterator(this.objectWritableMode, this.messageClass, this.conf);
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
        this.objectWritable.setConf(configuration);
    }

    public Configuration getConf() {
        return this.conf;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void add(M m) {
        try {
            if (this.objectWritableMode) {
                this.objectWritable.set(m);
                this.objectWritable.write(this.spillOutputBuffer);
            } else {
                m.write(this.spillOutputBuffer);
            }
            this.spillOutputBuffer.markRecordEnd();
            this.numMessagesWritten++;
        } catch (IOException e) {
            LOG.error("Error adding message.", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void addAll(Iterable<M> iterable) {
        Iterator<M> it = iterable.iterator();
        while (it.hasNext()) {
            add((SpillingQueue<M>) it.next());
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void addAll(MessageQueue<M> messageQueue) {
        Iterator<M> it = messageQueue.iterator();
        while (it.hasNext()) {
            add((SpillingQueue<M>) it.next());
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void clear() {
        try {
            this.spillOutputBuffer.close();
            this.spillOutputBuffer.clear();
            this.spilledInput.close();
            this.spilledInput.clear();
        } catch (IOException e) {
            LOG.error("Error clearing spill stream.", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void close() {
        try {
            this.spillOutputBuffer.close();
            this.spilledInput.close();
            this.spilledInput.completeReading(true);
        } catch (IOException e) {
            LOG.error("Error closing the spilled input stream.", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void init(Configuration configuration, TaskAttemptID taskAttemptID) {
        this.bufferCount = configuration.getInt(SPILLBUFFER_COUNT, 3);
        this.bufferSize = configuration.getInt(SPILLBUFFER_SIZE, Constants.BUFFER_DEFAULT_SIZE);
        this.direct = configuration.getBoolean(SPILLBUFFER_DIRECT, true);
        this.threshold = configuration.getInt(SPILLBUFFER_THRESHOLD, Constants.BUFFER_DEFAULT_SIZE);
        this.fileName = configuration.get(SPILLBUFFER_FILENAME, System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32));
        this.messageClass = configuration.getClass(Constants.MESSAGE_CLASS, (Class) null);
        this.objectWritableMode = this.messageClass == null;
        try {
            CombineSpilledDataProcessor combineSpilledDataProcessor = new CombineSpilledDataProcessor(this.fileName);
            combineSpilledDataProcessor.init(configuration);
            this.spillOutputBuffer = new SpillingDataOutputBuffer(this.bufferCount, this.bufferSize, this.threshold, this.direct, combineSpilledDataProcessor);
            this.objectWritable = new ObjectWritable();
            this.objectWritable.setConf(configuration);
            this.conf = configuration;
        } catch (FileNotFoundException e) {
            LOG.error("Error initializing spilled data stream.", e);
            throw new RuntimeException(e);
        }
    }

    private void incReadMsgCount() {
        this.numMessagesRead++;
    }

    private M readDirect(M m) {
        if (this.numMessagesRead >= this.numMessagesWritten) {
            return null;
        }
        try {
            if (this.objectWritableMode) {
                this.objectWritable.readFields(this.spilledInput);
                incReadMsgCount();
                return (M) this.objectWritable.get();
            }
            m.readFields(this.spilledInput);
            incReadMsgCount();
            return m;
        } catch (IOException e) {
            LOG.error("Error getting values from spilled input", e);
            return null;
        }
    }

    public M poll(M m) {
        if (this.numMessagesRead >= this.numMessagesWritten) {
            return null;
        }
        return this.enablePrefetch ? readFromPrefetch(m) : readDirect(m);
    }

    private M readDirectObjectWritable() {
        if (!this.objectWritableMode) {
            throw new IllegalStateException("API call not supported. Set the configuration property 'hama.io.spillbuffer.newmsginit' to true.");
        }
        try {
            this.objectWritable.readFields(this.spilledInput);
            incReadMsgCount();
            return (M) this.objectWritable.get();
        } catch (IOException e) {
            LOG.error("Error getting values from spilled input", e);
            return null;
        }
    }

    private M readFromPrefetch(M m) {
        if (!this.objectWritableMode) {
            incReadMsgCount();
            return (M) this.prefetchCache.get();
        }
        this.objectWritable = this.prefetchCache.get();
        incReadMsgCount();
        return (M) this.objectWritable.get();
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public M poll() {
        if (this.numMessagesRead >= this.numMessagesWritten) {
            return null;
        }
        if (!this.enablePrefetch) {
            return readDirectObjectWritable();
        }
        M readFromPrefetch = readFromPrefetch(null);
        if (readFromPrefetch != null) {
            incReadMsgCount();
        }
        return readFromPrefetch;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void prepareRead() {
        try {
            this.spillOutputBuffer.close();
            try {
                this.spilledInput = this.spillOutputBuffer.getInputStreamToRead(this.fileName);
                if (this.conf.getBoolean(ENABLE_PREFETCH, false)) {
                    this.prefetchCache = new PreFetchCache<>(this.numMessagesWritten);
                    this.enablePrefetch = true;
                    try {
                        this.prefetchCache.startFetching(this.messageClass, this.spilledInput, this.conf);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    } catch (InterruptedException e2) {
                        LOG.error("Error starting prefetch on message queue.", e2);
                        throw new RuntimeException(e2);
                    }
                }
            } catch (IOException e3) {
                LOG.error("Error initializing the input spilled stream", e3);
                throw new RuntimeException(e3);
            }
        } catch (IOException e4) {
            LOG.error("Error closing spilled buffer", e4);
            throw new RuntimeException(e4);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void prepareWrite() {
        this.numMessagesWritten = 0;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public int size() {
        return this.numMessagesWritten;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public boolean isMessageSerialized() {
        return true;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageTransferQueue
    public MessageQueue<M> getSenderQueue(Configuration configuration) {
        return this;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageTransferQueue
    public MessageQueue<M> getReceiverQueue(Configuration configuration) {
        return this;
    }

    @Override // org.apache.hama.bsp.message.queue.ByteArrayMessageQueue, org.apache.hama.bsp.message.queue.BSPMessageInterface
    public void add(BSPMessageBundle<M> bSPMessageBundle) {
        try {
            this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle) bSPMessageBundle).getBuffer());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
