package org.apache.hama.bsp.message.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/io/PreFetchCache.class */
public class PreFetchCache<M extends Writable> {
    private static final Log LOG = LogFactory.getLog(PreFetchCache.class);
    private Object[] objectListArr;
    private long totalMessages;
    private int capacity;
    private int arrIndex;
    private int listIndex;
    private PreFetchThread<M> preFetchThread;
    private SpilledDataReadStatus status;
    private BitSet bufferBitSet;

    /* loaded from: input_file:org/apache/hama/bsp/message/io/PreFetchCache$PreFetchThread.class */
    private static class PreFetchThread<M extends Writable> extends Thread implements Runnable {
        private volatile boolean stopReading;
        SpilledDataInputBuffer stream;
        Class<M> objectClass;
        private Object[] listArr;
        SpilledDataReadStatus status;
        private long totalMsgs;
        private int listCapacity;
        private int messageCount = 0;
        private Configuration conf;

        private void fill(int i) throws IOException {
            List list = (List) this.listArr[i];
            for (int i2 = 0; i2 < this.listCapacity && this.messageCount < this.totalMsgs && !this.stopReading; i2++) {
                if (i2 != list.size()) {
                    ((Writable) list.get(i2)).readFields(this.stream);
                } else if (this.objectClass == null) {
                    ObjectWritable objectWritable = new ObjectWritable();
                    objectWritable.readFields(this.stream);
                    list.add(i2, objectWritable);
                } else {
                    Writable writable = (Writable) ReflectionUtils.newInstance(this.objectClass, this.conf);
                    writable.readFields(this.stream);
                    list.add(i2, writable);
                }
                this.messageCount++;
            }
        }

        public PreFetchThread(Class<M> cls, Object[] objArr, int i, SpilledDataInputBuffer spilledDataInputBuffer, long j, SpilledDataReadStatus spilledDataReadStatus, Configuration configuration) {
            this.objectClass = cls;
            this.listArr = objArr;
            this.status = spilledDataReadStatus;
            this.conf = configuration;
            this.totalMsgs = j;
            this.stream = spilledDataInputBuffer;
            this.listCapacity = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            do {
                try {
                    int fileBufferIndex = this.status.getFileBufferIndex();
                    if (fileBufferIndex < 0 || this.stopReading) {
                        break;
                    }
                    fill(fileBufferIndex);
                    if (this.stopReading) {
                        break;
                    }
                } catch (IOException e) {
                    PreFetchCache.LOG.error("Error reading pre-fetch buffer index.", e);
                } catch (InterruptedException e2) {
                    PreFetchCache.LOG.error("Interrupted reading pre-fetch buffer index.", e2);
                }
            } while (this.messageCount != this.totalMsgs);
            this.status.closedBySpiller();
            this.status.completeReading();
        }

        public synchronized void stopReading() {
            this.stopReading = true;
        }
    }

    public PreFetchCache(long j) {
        this(2, j, 64);
    }

    public PreFetchCache(int i, long j) {
        this(i, j, 64);
    }

    public PreFetchCache(int i, long j, int i2) {
        this.objectListArr = new Object[i];
        this.totalMessages = j;
        this.bufferBitSet = new BitSet();
        this.status = new SpilledDataReadStatus(i, this.bufferBitSet);
        for (int i3 = 0; i3 < i; i3++) {
            this.objectListArr[i3] = new ArrayList(i2);
        }
        this.capacity = i2;
    }

    public void startFetching(Class<M> cls, SpilledDataInputBuffer spilledDataInputBuffer, Configuration configuration) throws InterruptedException, IOException {
        this.preFetchThread = new PreFetchThread<>(cls, this.objectListArr, this.capacity, spilledDataInputBuffer, this.totalMessages, this.status, configuration);
        this.preFetchThread.start();
        if (!this.status.startReading()) {
            throw new IOException("Failed to start reading the spilled file: ");
        }
        this.arrIndex = this.status.getReadBufferIndex();
    }

    public Writable get() {
        if (this.listIndex == this.capacity) {
            try {
                this.arrIndex = this.status.getReadBufferIndex();
                if (this.arrIndex < 0) {
                    return null;
                }
                this.listIndex = 0;
            } catch (InterruptedException e) {
                LOG.error("Interrupted getting prefetched records.", e);
                return null;
            }
        }
        List list = (List) this.objectListArr[this.arrIndex];
        int i = this.listIndex;
        this.listIndex = i + 1;
        return (Writable) list.get(i);
    }

    public void close() {
        this.status.completeReading();
        this.preFetchThread.stopReading();
        try {
            this.preFetchThread.join();
        } catch (InterruptedException e) {
            LOG.error("Prefetch thread was interrupted.", e);
        }
    }
}
