package org.apache.hadoop.hbase.replication.regionserver;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.phoenix.shaded.org.apache.commons.io.IOUtils;

@InterfaceStability.Evolving
@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.class */
public class WALEntryStream implements Iterator<WAL.Entry>, Closeable, Iterable<WAL.Entry> {
    private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
    private WAL.Reader reader;
    private Path currentPath;
    private WAL.Entry currentEntry;
    private long currentPosition;
    private PriorityBlockingQueue<Path> logQueue;
    private FileSystem fs;
    private Configuration conf;
    private MetricsSource metrics;

    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/WALEntryStream$WALEntryStreamRuntimeException.class */
    public static class WALEntryStreamRuntimeException extends RuntimeException {
        private static final long serialVersionUID = -6298201811259982568L;

        public WALEntryStreamRuntimeException(Exception exc) {
            super(exc);
        }
    }

    public WALEntryStream(PriorityBlockingQueue<Path> priorityBlockingQueue, FileSystem fileSystem, Configuration configuration, MetricsSource metricsSource) throws IOException {
        this(priorityBlockingQueue, fileSystem, configuration, 0L, metricsSource);
    }

    public WALEntryStream(PriorityBlockingQueue<Path> priorityBlockingQueue, FileSystem fileSystem, Configuration configuration, long j, MetricsSource metricsSource) throws IOException {
        this.currentPosition = 0L;
        this.logQueue = priorityBlockingQueue;
        this.fs = fileSystem;
        this.conf = configuration;
        this.currentPosition = j;
        this.metrics = metricsSource;
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.currentEntry == null) {
            try {
                tryAdvanceEntry();
            } catch (Exception e) {
                throw new WALEntryStreamRuntimeException(e);
            }
        }
        return this.currentEntry != null;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public WAL.Entry next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        WAL.Entry entry = this.currentEntry;
        this.currentEntry = null;
        return entry;
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        closeReader();
    }

    @Override // java.lang.Iterable
    public Iterator<WAL.Entry> iterator() {
        return this;
    }

    public long getPosition() {
        return this.currentPosition;
    }

    public Path getCurrentPath() {
        return this.currentPath;
    }

    private String getCurrentPathStat() {
        StringBuilder sb = new StringBuilder();
        if (this.currentPath != null) {
            sb.append("currently replicating from: ").append(this.currentPath).append(" at position: ").append(this.currentPosition).append(IOUtils.LINE_SEPARATOR_UNIX);
        } else {
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    public void reset() throws IOException {
        if (this.reader == null || this.currentPath == null) {
            return;
        }
        resetReader();
    }

    private void setPosition(long j) {
        this.currentPosition = j;
    }

    private void setCurrentPath(Path path) {
        this.currentPath = path;
    }

    private void tryAdvanceEntry() throws IOException {
        if (checkReader()) {
            readNextEntryAndSetPosition();
            if (this.currentEntry != null || this.logQueue.size() <= 1) {
                return;
            }
            resetReader();
            readNextEntryAndSetPosition();
            if (this.currentEntry == null && checkAllBytesParsed()) {
                dequeueCurrentLog();
                if (openNextLog()) {
                    readNextEntryAndSetPosition();
                }
            }
        }
    }

    private boolean checkAllBytesParsed() throws IOException {
        long currentTrailerSize = currentTrailerSize();
        FileStatus fileStatus = null;
        try {
            fileStatus = this.fs.getFileStatus(this.currentPath);
        } catch (IOException e) {
            LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (currentTrailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
            this.metrics.incrUnknownFileLengthForClosedWAL();
        }
        if (fileStatus != null) {
            if (currentTrailerSize < 0) {
                if (this.currentPosition < fileStatus.getLen()) {
                    long len = fileStatus.getLen() - this.currentPosition;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Reached the end of WAL file '" + this.currentPath + "'. It was not closed cleanly, so we did not parse " + len + " bytes of data. This is normally ok.");
                    }
                    this.metrics.incrUncleanlyClosedWALs();
                    this.metrics.incrBytesSkippedInUncleanlyClosedWALs(len);
                }
            } else if (this.currentPosition + currentTrailerSize < fileStatus.getLen()) {
                LOG.warn("Processing end of WAL file '" + this.currentPath + "'. At position " + this.currentPosition + ", which is too far away from reported file length " + fileStatus.getLen() + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
                setPosition(0L);
                resetReader();
                this.metrics.incrRestartedWALReading();
                this.metrics.incrRepeatedFileBytes(this.currentPosition);
                return false;
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + (fileStatus == null ? YarnConfiguration.DEFAULT_APPLICATION_NAME : Long.valueOf(fileStatus.getLen())));
        }
        this.metrics.incrCompletedWAL();
        return true;
    }

    private void dequeueCurrentLog() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reached the end of log " + this.currentPath);
        }
        closeReader();
        this.logQueue.remove();
        setPosition(0L);
        this.metrics.decrSizeOfLogQueue();
    }

    private void readNextEntryAndSetPosition() throws IOException {
        WAL.Entry next = this.reader.next();
        long position = this.reader.getPosition();
        if (next != null) {
            this.metrics.incrLogEditsRead();
            this.metrics.incrLogReadInBytes(position - this.currentPosition);
        }
        this.currentEntry = next;
        setPosition(position);
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    private boolean checkReader() throws IOException {
        if (this.reader == null) {
            return openNextLog();
        }
        return true;
    }

    private boolean openNextLog() throws IOException {
        Path peek = this.logQueue.peek();
        if (peek == null) {
            return false;
        }
        openReader(peek);
        return this.reader != null;
    }

    private Path getArchivedLog(Path path) throws IOException {
        Path path2 = new Path(new Path(FSUtils.getRootDir(this.conf), HConstants.HREGION_OLDLOGDIR_NAME), path.getName());
        if (this.fs.exists(path2)) {
            LOG.info("Log " + path + " was moved to " + path2);
            return path2;
        }
        LOG.error("Couldn't locate log: " + path);
        return path;
    }

    private void handleFileNotFound(Path path, FileNotFoundException fileNotFoundException) throws IOException {
        Path archivedLog = getArchivedLog(path);
        if (path.equals(archivedLog)) {
            throw fileNotFoundException;
        }
        openReader(archivedLog);
    }

    private void openReader(Path path) throws IOException {
        try {
            if (this.reader == null || !getCurrentPath().equals(path)) {
                closeReader();
                this.reader = WALFactory.createReader(this.fs, path, this.conf);
                seek();
                setCurrentPath(path);
            } else {
                resetReader();
            }
        } catch (FileNotFoundException e) {
            handleFileNotFound(path, e);
        } catch (NullPointerException e2) {
            LOG.warn("Got NPE opening reader, will retry.");
            this.reader = null;
        } catch (LeaseNotRecoveredException e3) {
            LOG.warn("Try to recover the WAL lease " + this.currentPath, e3);
            recoverLease(this.conf, this.currentPath);
            this.reader = null;
        } catch (RemoteException e4) {
            IOException unwrapRemoteException = e4.unwrapRemoteException(FileNotFoundException.class);
            if (!(unwrapRemoteException instanceof FileNotFoundException)) {
                throw unwrapRemoteException;
            }
            handleFileNotFound(path, (FileNotFoundException) unwrapRemoteException);
        }
    }

    private void recoverLease(Configuration configuration, final Path path) {
        try {
            FileSystem currentFileSystem = FSUtils.getCurrentFileSystem(configuration);
            FSUtils.getInstance(currentFileSystem, configuration).recoverFileLease(currentFileSystem, path, configuration, new CancelableProgressable() { // from class: org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.1
                @Override // org.apache.hadoop.hbase.util.CancelableProgressable
                public boolean progress() {
                    WALEntryStream.LOG.debug("recover WAL lease: " + path);
                    return true;
                }
            });
        } catch (IOException e) {
            LOG.warn("unable to recover lease for WAL: " + path, e);
        }
    }

    private void resetReader() throws IOException {
        try {
            this.reader.reset();
            seek();
        } catch (FileNotFoundException e) {
            Path archivedLog = getArchivedLog(this.currentPath);
            if (this.currentPath.equals(archivedLog)) {
                throw e;
            }
            openReader(archivedLog);
        } catch (NullPointerException e2) {
            throw new IOException("NPE resetting reader, likely HDFS-4380", e2);
        }
    }

    private void seek() throws IOException {
        if (this.currentPosition != 0) {
            this.reader.seek(this.currentPosition);
        }
    }

    private long currentTrailerSize() {
        long j = -1;
        if (this.reader instanceof ProtobufLogReader) {
            j = ((ProtobufLogReader) this.reader).trailerSize();
        }
        return j;
    }
}
