package org.apache.hama.bsp.message.queue;

import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.TaskAttemptID;

/* loaded from: input_file:org/apache/hama/bsp/message/queue/DiskQueue.class */
public final class DiskQueue<M extends Writable> extends POJOMessageQueue<M> {
    public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
    private static final int MAX_RETRIES = 4;
    private static final Log LOG = LogFactory.getLog(DiskQueue.class);
    private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
    private Configuration conf;
    private FileSystem fs;
    private FSDataOutputStream writer;
    private FSDataInputStream reader;
    private Path queuePath;
    private TaskAttemptID id;
    private int size = 0;
    private final ObjectWritable writable = new ObjectWritable();

    /* loaded from: input_file:org/apache/hama/bsp/message/queue/DiskQueue$DiskIterator.class */
    private class DiskIterator implements Iterator<M> {
        public DiskIterator() {
            DiskQueue.this.prepareRead();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (DiskQueue.this.size == 0) {
                DiskQueue.this.closeInternal(false);
            }
            return DiskQueue.this.size != 0;
        }

        @Override // java.util.Iterator
        public M next() {
            return (M) DiskQueue.this.poll();
        }

        @Override // java.util.Iterator
        public void remove() {
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void init(Configuration configuration, TaskAttemptID taskAttemptID) {
        this.id = taskAttemptID;
        this.writable.setConf(configuration);
        try {
            this.fs = FileSystem.get(configuration);
            Path queueDir = getQueueDir(configuration, taskAttemptID, configuration.get(DISK_QUEUE_PATH_KEY));
            this.fs.mkdirs(queueDir);
            StringBuilder sb = new StringBuilder();
            int i = ONGOING_SEQUENCE_NUMBER;
            ONGOING_SEQUENCE_NUMBER = i + 1;
            this.queuePath = new Path(queueDir, sb.append(i).append("_messages.seq").toString());
            prepareWrite();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void close() {
        closeInternal(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeInternal(boolean z) {
        try {
            try {
                if (this.writer != null) {
                    this.writer.flush();
                    this.writer.close();
                    this.writer = null;
                }
            } catch (IOException e) {
                LOG.error(e);
                if (this.fs != null && z) {
                    try {
                        this.fs.delete(this.queuePath, true);
                    } catch (IOException e2) {
                        LOG.error(e2);
                    }
                }
                if (this.writer != null) {
                    try {
                        this.writer.flush();
                        this.writer.close();
                        this.writer = null;
                    } catch (IOException e3) {
                        LOG.error(e3);
                    }
                }
            }
            try {
                try {
                    if (this.reader != null) {
                        this.reader.close();
                        this.reader = null;
                    }
                    if (this.reader != null) {
                        try {
                            this.reader.close();
                            this.reader = null;
                        } catch (IOException e4) {
                            LOG.error(e4);
                        }
                    }
                } catch (Throwable th) {
                    if (this.reader != null) {
                        try {
                            this.reader.close();
                            this.reader = null;
                        } catch (IOException e5) {
                            LOG.error(e5);
                        }
                    }
                    throw th;
                }
            } catch (IOException e6) {
                LOG.error(e6);
                if (this.reader != null) {
                    try {
                        this.reader.close();
                        this.reader = null;
                    } catch (IOException e7) {
                        LOG.error(e7);
                    }
                }
            }
        } finally {
            if (this.fs != null && z) {
                try {
                    this.fs.delete(this.queuePath, true);
                } catch (IOException e8) {
                    LOG.error(e8);
                }
            }
            if (this.writer != null) {
                try {
                    this.writer.flush();
                    this.writer.close();
                    this.writer = null;
                } catch (IOException e9) {
                    LOG.error(e9);
                }
            }
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void prepareRead() {
        closeInternal(false);
        try {
            this.reader = this.fs.open(this.queuePath);
        } catch (IOException e) {
            LOG.error(e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void prepareWrite() {
        try {
            this.writer = this.fs.create(this.queuePath);
        } catch (IOException e) {
            LOG.error(e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public final void addAll(Iterable<M> iterable) {
        Iterator<M> it = iterable.iterator();
        while (it.hasNext()) {
            add((DiskQueue<M>) it.next());
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public void addAll(MessageQueue<M> messageQueue) {
        while (true) {
            M poll = messageQueue.poll();
            if (poll == null) {
                return;
            } else {
                add((DiskQueue<M>) poll);
            }
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public final void add(M m) {
        this.size++;
        try {
            new ObjectWritable(m).write(this.writer);
        } catch (IOException e) {
            LOG.error(e);
        }
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public final void clear() {
        closeInternal(true);
        this.size = 0;
        init(this.conf, this.id);
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public final M poll() {
        if (this.size == 0) {
            return null;
        }
        this.size--;
        int i = 1;
        while (i <= 4) {
            try {
                this.writable.readFields(this.reader);
                if (this.size > 0) {
                    return (M) this.writable.get();
                }
                closeInternal(true);
                return (M) this.writable.get();
            } catch (IOException e) {
                LOG.error("Retrying for the " + i + "th time!", e);
                i++;
            }
        }
        throw new RuntimeException("Message couldn't be read for " + i + " times! Giving up!");
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public int size() {
        return this.size;
    }

    @Override // java.lang.Iterable
    public Iterator<M> iterator() {
        return new DiskIterator();
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public static Path getQueueDir(Configuration configuration, TaskAttemptID taskAttemptID, String str) {
        Path createDiskQueuePath;
        if (str == null) {
            String str2 = configuration.get("hama.tmp.dir");
            createDiskQueuePath = str2 != null ? createDiskQueuePath(taskAttemptID, str2) : createDiskQueuePath(taskAttemptID, "/tmp/messageStorage/");
        } else {
            createDiskQueuePath = createDiskQueuePath(taskAttemptID, str);
        }
        return createDiskQueuePath;
    }

    private static Path createDiskQueuePath(TaskAttemptID taskAttemptID, String str) {
        return new Path(new Path(new Path(str, "diskqueue"), taskAttemptID.getJobID().toString()), taskAttemptID.getTaskID().toString());
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public boolean isMessageSerialized() {
        return false;
    }

    @Override // org.apache.hama.bsp.message.queue.MessageQueue
    public boolean isMemoryBasedQueue() {
        return false;
    }
}
