package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.class */
public class FsDatasetCache {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FsDatasetCache.class);
    private final FsDatasetImpl dataset;
    private final ThreadPoolExecutor uncachingExecutor;
    private final ScheduledThreadPoolExecutor deferredUncachingExecutor;
    private final long revocationMs;
    private final long revocationPollingMs;
    private final UsedBytesCount usedBytesCount;
    private final long maxBytes;
    private final HashMap<ExtendedBlockId, Value> mappableBlockMap = new HashMap<>();
    private final AtomicLong numBlocksCached = new AtomicLong(0);
    final AtomicLong numBlocksFailedToCache = new AtomicLong(0);
    final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$CachingTask.class */
    private class CachingTask implements Runnable {
        private final ExtendedBlockId key;
        private final String blockFileName;
        private final long length;
        private final long genstamp;

        CachingTask(ExtendedBlockId extendedBlockId, String str, long j, long j2) {
            this.key = extendedBlockId;
            this.blockFileName = str;
            this.length = j;
            this.genstamp = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            FileInputStream fileInputStream = null;
            FileInputStream fileInputStream2 = null;
            MappableBlock mappableBlock = null;
            ExtendedBlock extendedBlock = new ExtendedBlock(this.key.getBlockPoolId(), this.key.getBlockId(), this.length, this.genstamp);
            long reserve = FsDatasetCache.this.usedBytesCount.reserve(this.length);
            boolean z = false;
            try {
                if (reserve < 0) {
                    FsDatasetCache.LOG.warn("Failed to cache " + this.key + ": could not reserve " + this.length + " more bytes in the cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " of " + FsDatasetCache.this.maxBytes + " exceeded.");
                    IOUtils.closeQuietly((InputStream) null);
                    IOUtils.closeQuietly((InputStream) null);
                    if (0 == 0) {
                        if (0 != 0) {
                            FsDatasetCache.this.usedBytesCount.release(this.length);
                        }
                        FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                        if (0 != 0) {
                            mappableBlock.close();
                        }
                        FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                        synchronized (FsDatasetCache.this) {
                            FsDatasetCache.this.mappableBlockMap.remove(this.key);
                        }
                        return;
                    }
                    return;
                }
                z = true;
                try {
                    fileInputStream = (FileInputStream) FsDatasetCache.this.dataset.getBlockInputStream(extendedBlock, 0L);
                    fileInputStream2 = DatanodeUtil.getMetaDataInputStream(extendedBlock, FsDatasetCache.this.dataset);
                    try {
                        mappableBlock = MappableBlock.load(this.length, fileInputStream, fileInputStream2, this.blockFileName);
                        synchronized (FsDatasetCache.this) {
                            Value value = (Value) FsDatasetCache.this.mappableBlockMap.get(this.key);
                            Preconditions.checkNotNull(value);
                            Preconditions.checkState(value.state == State.CACHING || value.state == State.CACHING_CANCELLED);
                            if (value.state == State.CACHING_CANCELLED) {
                                FsDatasetCache.this.mappableBlockMap.remove(this.key);
                                FsDatasetCache.LOG.warn("Caching of " + this.key + " was cancelled.");
                                IOUtils.closeQuietly((InputStream) fileInputStream);
                                IOUtils.closeQuietly((InputStream) fileInputStream2);
                                if (0 == 0) {
                                    if (1 != 0) {
                                        FsDatasetCache.this.usedBytesCount.release(this.length);
                                    }
                                    FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                                    if (mappableBlock != null) {
                                        mappableBlock.close();
                                    }
                                    FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                                    synchronized (FsDatasetCache.this) {
                                        FsDatasetCache.this.mappableBlockMap.remove(this.key);
                                    }
                                    return;
                                }
                                return;
                            }
                            FsDatasetCache.this.mappableBlockMap.put(this.key, new Value(mappableBlock, State.CACHED));
                            FsDatasetCache.LOG.debug("Successfully cached {}.  We are now caching {} bytes in total.", this.key, Long.valueOf(reserve));
                            FsDatasetCache.this.dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(this.key);
                            FsDatasetCache.this.numBlocksCached.addAndGet(1L);
                            FsDatasetCache.this.dataset.datanode.getMetrics().incrBlocksCached(1);
                            IOUtils.closeQuietly((InputStream) fileInputStream);
                            IOUtils.closeQuietly((InputStream) fileInputStream2);
                            if (1 == 0) {
                                if (1 != 0) {
                                    FsDatasetCache.this.usedBytesCount.release(this.length);
                                }
                                FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                                if (mappableBlock != null) {
                                    mappableBlock.close();
                                }
                                FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                                synchronized (FsDatasetCache.this) {
                                    FsDatasetCache.this.mappableBlockMap.remove(this.key);
                                }
                            }
                        }
                    } catch (ChecksumException e) {
                        FsDatasetCache.LOG.warn("Failed to cache " + this.key + ": checksum verification failed.");
                        IOUtils.closeQuietly((InputStream) fileInputStream);
                        IOUtils.closeQuietly((InputStream) fileInputStream2);
                        if (0 == 0) {
                            if (1 != 0) {
                                FsDatasetCache.this.usedBytesCount.release(this.length);
                            }
                            FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                            if (mappableBlock != null) {
                                mappableBlock.close();
                            }
                            FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                            synchronized (FsDatasetCache.this) {
                                FsDatasetCache.this.mappableBlockMap.remove(this.key);
                            }
                        }
                    } catch (IOException e2) {
                        FsDatasetCache.LOG.warn("Failed to cache " + this.key, (Throwable) e2);
                        IOUtils.closeQuietly((InputStream) fileInputStream);
                        IOUtils.closeQuietly((InputStream) fileInputStream2);
                        if (0 == 0) {
                            if (1 != 0) {
                                FsDatasetCache.this.usedBytesCount.release(this.length);
                            }
                            FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                            if (mappableBlock != null) {
                                mappableBlock.close();
                            }
                            FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                            synchronized (FsDatasetCache.this) {
                                FsDatasetCache.this.mappableBlockMap.remove(this.key);
                            }
                        }
                    }
                } catch (FileNotFoundException e3) {
                    FsDatasetCache.LOG.info("Failed to cache " + this.key + ": failed to find backing files.");
                    IOUtils.closeQuietly((InputStream) fileInputStream);
                    IOUtils.closeQuietly((InputStream) fileInputStream2);
                    if (0 == 0) {
                        if (1 != 0) {
                            FsDatasetCache.this.usedBytesCount.release(this.length);
                        }
                        FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                        if (0 != 0) {
                            mappableBlock.close();
                        }
                        FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                        synchronized (FsDatasetCache.this) {
                            FsDatasetCache.this.mappableBlockMap.remove(this.key);
                        }
                    }
                } catch (IOException e4) {
                    FsDatasetCache.LOG.warn("Failed to cache " + this.key + ": failed to open file", (Throwable) e4);
                    IOUtils.closeQuietly((InputStream) fileInputStream);
                    IOUtils.closeQuietly((InputStream) fileInputStream2);
                    if (0 == 0) {
                        if (1 != 0) {
                            FsDatasetCache.this.usedBytesCount.release(this.length);
                        }
                        FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                        if (0 != 0) {
                            mappableBlock.close();
                        }
                        FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                        synchronized (FsDatasetCache.this) {
                            FsDatasetCache.this.mappableBlockMap.remove(this.key);
                        }
                    }
                } catch (ClassCastException e5) {
                    FsDatasetCache.LOG.warn("Failed to cache " + this.key + ": Underlying blocks are not backed by files.", (Throwable) e5);
                    IOUtils.closeQuietly((InputStream) fileInputStream);
                    IOUtils.closeQuietly((InputStream) fileInputStream2);
                    if (0 == 0) {
                        if (1 != 0) {
                            FsDatasetCache.this.usedBytesCount.release(this.length);
                        }
                        FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                        if (0 != 0) {
                            mappableBlock.close();
                        }
                        FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                        synchronized (FsDatasetCache.this) {
                            FsDatasetCache.this.mappableBlockMap.remove(this.key);
                        }
                    }
                }
            } catch (Throwable th) {
                IOUtils.closeQuietly((InputStream) fileInputStream);
                IOUtils.closeQuietly((InputStream) fileInputStream2);
                if (0 == 0) {
                    if (z) {
                        FsDatasetCache.this.usedBytesCount.release(this.length);
                    }
                    FsDatasetCache.LOG.debug("Caching of {} was aborted.  We are now caching only {} bytes in total.", this.key, Long.valueOf(FsDatasetCache.this.usedBytesCount.get()));
                    if (mappableBlock != null) {
                        mappableBlock.close();
                    }
                    FsDatasetCache.this.numBlocksFailedToCache.incrementAndGet();
                    synchronized (FsDatasetCache.this) {
                        FsDatasetCache.this.mappableBlockMap.remove(this.key);
                    }
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$PageRounder.class */
    public static class PageRounder {
        private final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();

        public long round(long j) {
            return ((j + (this.osPageSize - 1)) / this.osPageSize) * this.osPageSize;
        }
    }

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$State.class */
    private enum State {
        CACHING,
        CACHING_CANCELLED,
        CACHED,
        UNCACHING;

        public boolean shouldAdvertise() {
            return this == CACHED;
        }
    }

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$UncachingTask.class */
    private class UncachingTask implements Runnable {
        private final ExtendedBlockId key;
        private final long revocationTimeMs;

        UncachingTask(ExtendedBlockId extendedBlockId, long j) {
            this.key = extendedBlockId;
            if (j == 0) {
                this.revocationTimeMs = 0L;
            } else {
                this.revocationTimeMs = j + Time.monotonicNow();
            }
        }

        private boolean shouldDefer() {
            if (this.revocationTimeMs == 0) {
                return false;
            }
            if (!(!FsDatasetCache.this.dataset.datanode.getShortCircuitRegistry().processBlockMunlockRequest(this.key))) {
                FsDatasetCache.LOG.debug("Uncaching {} now that it is no longer in use by any clients.", this.key);
                return false;
            }
            long monotonicNow = this.revocationTimeMs - Time.monotonicNow();
            if (monotonicNow < 0) {
                FsDatasetCache.LOG.warn("Forcibly uncaching {} after {} because client(s) {} refused to stop using it.", this.key, DurationFormatUtils.formatDurationHMS(this.revocationTimeMs), FsDatasetCache.this.dataset.datanode.getShortCircuitRegistry().getClientNames(this.key));
                return false;
            }
            FsDatasetCache.LOG.info("Replica {} still can't be uncached because some clients continue to use it.  Will wait for {}", this.key, DurationFormatUtils.formatDurationHMS(monotonicNow));
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            Value value;
            if (shouldDefer()) {
                FsDatasetCache.this.deferredUncachingExecutor.schedule(this, FsDatasetCache.this.revocationPollingMs, TimeUnit.MILLISECONDS);
                return;
            }
            synchronized (FsDatasetCache.this) {
                value = (Value) FsDatasetCache.this.mappableBlockMap.get(this.key);
            }
            Preconditions.checkNotNull(value);
            Preconditions.checkArgument(value.state == State.UNCACHING);
            IOUtils.closeQuietly(value.mappableBlock);
            synchronized (FsDatasetCache.this) {
                FsDatasetCache.this.mappableBlockMap.remove(this.key);
            }
            long release = FsDatasetCache.this.usedBytesCount.release(value.mappableBlock.getLength());
            FsDatasetCache.this.numBlocksCached.addAndGet(-1L);
            FsDatasetCache.this.dataset.datanode.getMetrics().incrBlocksUncached(1);
            if (this.revocationTimeMs != 0) {
                FsDatasetCache.LOG.debug("Uncaching of {} completed. usedBytes = {}", this.key, Long.valueOf(release));
            } else {
                FsDatasetCache.LOG.debug("Deferred uncaching of {} completed. usedBytes = {}", this.key, Long.valueOf(release));
            }
        }
    }

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$UsedBytesCount.class */
    private class UsedBytesCount {
        private final AtomicLong usedBytes;
        private final PageRounder rounder;

        private UsedBytesCount() {
            this.usedBytes = new AtomicLong(0L);
            this.rounder = new PageRounder();
        }

        long reserve(long j) {
            long j2;
            long j3;
            long round = this.rounder.round(j);
            do {
                j2 = this.usedBytes.get();
                j3 = j2 + round;
                if (j3 > FsDatasetCache.this.maxBytes) {
                    return -1L;
                }
            } while (!this.usedBytes.compareAndSet(j2, j3));
            return j3;
        }

        long release(long j) {
            return this.usedBytes.addAndGet(-this.rounder.round(j));
        }

        long get() {
            return this.usedBytes.get();
        }
    }

    /* loaded from: input_file:hadoop-client-2.7.7/share/hadoop/client/lib/hadoop-hdfs-2.7.7.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache$Value.class */
    private static final class Value {
        final State state;
        final MappableBlock mappableBlock;

        Value(MappableBlock mappableBlock, State state) {
            this.mappableBlock = mappableBlock;
            this.state = state;
        }
    }

    public FsDatasetCache(FsDatasetImpl fsDatasetImpl) {
        this.dataset = fsDatasetImpl;
        this.maxBytes = fsDatasetImpl.datanode.getDnConf().getMaxLockedMemory();
        ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FsDatasetCache-%d-" + fsDatasetImpl.toString()).build();
        this.usedBytesCount = new UsedBytesCount();
        this.uncachingExecutor = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), build);
        this.uncachingExecutor.allowCoreThreadTimeOut(true);
        this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(1, build);
        this.revocationMs = fsDatasetImpl.datanode.getConf().getLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 900000L);
        long j = fsDatasetImpl.datanode.getConf().getLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 500L);
        long j2 = this.revocationMs / 2;
        if (j2 < j) {
            throw new RuntimeException("configured value " + j + "for " + DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS + " is too high.  It must not be more than half of the value of " + DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS + ".  Reconfigure this to " + j2);
        }
        this.revocationPollingMs = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized List<Long> getCachedBlocks(String str) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<ExtendedBlockId, Value> entry : this.mappableBlockMap.entrySet()) {
            if (entry.getKey().getBlockPoolId().equals(str) && entry.getValue().state.shouldAdvertise()) {
                arrayList.add(Long.valueOf(entry.getKey().getBlockId()));
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void cacheBlock(long j, String str, String str2, long j2, long j3, Executor executor) {
        ExtendedBlockId extendedBlockId = new ExtendedBlockId(j, str);
        Value value = this.mappableBlockMap.get(extendedBlockId);
        if (value != null) {
            LOG.debug("Block with id {}, pool {} already exists in the FsDatasetCache with state {}", Long.valueOf(j), str, value.state);
            this.numBlocksFailedToCache.incrementAndGet();
        } else {
            this.mappableBlockMap.put(extendedBlockId, new Value(null, State.CACHING));
            executor.execute(new CachingTask(extendedBlockId, str2, j2, j3));
            LOG.debug("Initiating caching for Block with id {}, pool {}", Long.valueOf(j), str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void uncacheBlock(String str, long j) {
        ExtendedBlockId extendedBlockId = new ExtendedBlockId(j, str);
        Value value = this.mappableBlockMap.get(extendedBlockId);
        boolean z = false;
        if (!this.dataset.datanode.getShortCircuitRegistry().processBlockMunlockRequest(extendedBlockId)) {
            z = true;
        }
        if (value == null) {
            LOG.debug("Block with id {}, pool {} does not need to be uncached, because it is not currently in the mappableBlockMap.", Long.valueOf(j), str);
            this.numBlocksFailedToUncache.incrementAndGet();
            return;
        }
        switch (value.state) {
            case CACHING:
                LOG.debug("Cancelling caching for block with id {}, pool {}.", Long.valueOf(j), str);
                this.mappableBlockMap.put(extendedBlockId, new Value(value.mappableBlock, State.CACHING_CANCELLED));
                return;
            case CACHED:
                this.mappableBlockMap.put(extendedBlockId, new Value(value.mappableBlock, State.UNCACHING));
                if (z) {
                    LOG.debug("{} is anchored, and can't be uncached now.  Scheduling it for uncaching in {} ", extendedBlockId, DurationFormatUtils.formatDurationHMS(this.revocationPollingMs));
                    this.deferredUncachingExecutor.schedule(new UncachingTask(extendedBlockId, this.revocationMs), this.revocationPollingMs, TimeUnit.MILLISECONDS);
                    return;
                } else {
                    LOG.debug("{} has been scheduled for immediate uncaching.", extendedBlockId);
                    this.uncachingExecutor.execute(new UncachingTask(extendedBlockId, 0L));
                    return;
                }
            default:
                LOG.debug("Block with id {}, pool {} does not need to be uncached, because it is in state {}.", Long.valueOf(j), str, value.state);
                this.numBlocksFailedToUncache.incrementAndGet();
                return;
        }
    }

    public long getCacheUsed() {
        return this.usedBytesCount.get();
    }

    public long getCacheCapacity() {
        return this.maxBytes;
    }

    public long getNumBlocksFailedToCache() {
        return this.numBlocksFailedToCache.get();
    }

    public long getNumBlocksFailedToUncache() {
        return this.numBlocksFailedToUncache.get();
    }

    public long getNumBlocksCached() {
        return this.numBlocksCached.get();
    }

    public synchronized boolean isCached(String str, long j) {
        Value value = this.mappableBlockMap.get(new ExtendedBlockId(j, str));
        return value != null && value.state.shouldAdvertise();
    }
}
