/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;

@InterfaceAudience.LimitedPrivate(value={"HDFS"})
public class CacheReplicationMonitor
extends Thread
implements Closeable {
    private static final Log LOG = LogFactory.getLog(CacheReplicationMonitor.class);
    private final FSNamesystem namesystem;
    private final BlockManager blockManager;
    private final CacheManager cacheManager;
    private final GSet<CachedBlock, CachedBlock> cachedBlocks;
    private static final Random random = new Random();
    private final long intervalMs;
    private final ReentrantLock lock;
    private final Condition doRescan;
    private final Condition scanFinished;
    private boolean needsRescan = true;
    private boolean isScanning = false;
    private long scanCount = 0L;
    private boolean shutdown = false;
    private boolean mark = false;
    private int scannedDirectives;
    private long scannedBlocks;

    public CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
        this.namesystem = namesystem;
        this.blockManager = namesystem.getBlockManager();
        this.cacheManager = cacheManager;
        this.cachedBlocks = cacheManager.getCachedBlocks();
        this.intervalMs = intervalMs;
        this.lock = lock;
        this.doRescan = this.lock.newCondition();
        this.scanFinished = this.lock.newCondition();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long startTimeMs = 0L;
        Thread.currentThread().setName("CacheReplicationMonitor(" + System.identityHashCode(this) + ")");
        LOG.info((Object)("Starting CacheReplicationMonitor with interval " + this.intervalMs + " milliseconds"));
        try {
            long curTimeMs = Time.monotonicNow();
            while (true) {
                this.lock.lock();
                try {
                    while (true) {
                        if (this.shutdown) {
                            LOG.info((Object)"Shutting down CacheReplicationMonitor");
                            return;
                        }
                        if (this.needsRescan) {
                            LOG.info((Object)"Rescanning because of pending operations");
                            break;
                        }
                        long delta = startTimeMs + this.intervalMs - curTimeMs;
                        if (delta <= 0L) {
                            LOG.info((Object)("Rescanning after " + (curTimeMs - startTimeMs) + " milliseconds"));
                            break;
                        }
                        this.doRescan.await(delta, TimeUnit.MILLISECONDS);
                        curTimeMs = Time.monotonicNow();
                    }
                    this.isScanning = true;
                    this.needsRescan = false;
                }
                finally {
                    this.lock.unlock();
                }
                startTimeMs = curTimeMs;
                this.mark = !this.mark;
                this.rescan();
                curTimeMs = Time.monotonicNow();
                this.lock.lock();
                try {
                    this.isScanning = false;
                    ++this.scanCount;
                    this.scanFinished.signalAll();
                }
                finally {
                    this.lock.unlock();
                }
                LOG.info((Object)("Scanned " + this.scannedDirectives + " directive(s) and " + this.scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " + "millisecond(s)."));
            }
        }
        catch (InterruptedException e) {
            LOG.info((Object)"Shutting down CacheReplicationMonitor.");
            return;
        }
        catch (Throwable t) {
            LOG.fatal((Object)"Thread exiting", t);
            ExitUtil.terminate(1, t);
            return;
        }
    }

    public void waitForRescanIfNeeded() {
        Preconditions.checkArgument(!this.namesystem.hasWriteLock(), "Must not hold the FSN write lock when waiting for a rescan.");
        Preconditions.checkArgument(this.lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan.");
        if (!this.needsRescan) {
            return;
        }
        if (!this.isScanning) {
            this.doRescan.signal();
        }
        long startCount = this.scanCount;
        while (!this.shutdown && startCount >= this.scanCount) {
            try {
                this.scanFinished.await();
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting for CacheReplicationMonitor rescan", (Throwable)e);
                break;
            }
        }
    }

    public void setNeedsRescan() {
        Preconditions.checkArgument(this.lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit.");
        this.needsRescan = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Preconditions.checkArgument(this.namesystem.hasWriteLock());
        this.lock.lock();
        try {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            this.doRescan.signalAll();
            this.scanFinished.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rescan() throws InterruptedException {
        this.scannedDirectives = 0;
        this.scannedBlocks = 0L;
        this.namesystem.writeLock();
        try {
            if (this.shutdown) {
                throw new InterruptedException("CacheReplicationMonitor was shut down.");
            }
            this.resetStatistics();
            this.rescanCacheDirectives();
            this.rescanCachedBlockMap();
            this.blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    private void resetStatistics() {
        for (CachePool pool : this.cacheManager.getCachePools()) {
            pool.resetStatistics();
        }
        for (CacheDirective directive : this.cacheManager.getCacheDirectives()) {
            directive.resetStatistics();
        }
    }

    private void rescanCacheDirectives() {
        FSDirectory fsDir = this.namesystem.getFSDirectory();
        long now = new Date().getTime();
        for (CacheDirective directive : this.cacheManager.getCacheDirectives()) {
            INode node;
            ++this.scannedDirectives;
            if (directive.getExpiryTime() > 0L && directive.getExpiryTime() <= now) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Directive " + directive.getId() + ": the directive " + "expired at " + directive.getExpiryTime() + " (now = " + now + ")"));
                continue;
            }
            String path = directive.getPath();
            try {
                node = fsDir.getINode(path);
            }
            catch (UnresolvedLinkException e) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Directive " + directive.getId() + ": got UnresolvedLinkException while resolving path " + path));
                continue;
            }
            if (node == null) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Directive " + directive.getId() + ": No inode found at " + path));
                continue;
            }
            if (node.isDirectory()) {
                INodeDirectory dir = node.asDirectory();
                ReadOnlyList<INode> children = dir.getChildrenList(0x7FFFFFFE);
                for (INode child : children) {
                    if (!child.isFile()) continue;
                    this.rescanFile(directive, child.asFile());
                }
                continue;
            }
            if (node.isFile()) {
                this.rescanFile(directive, node.asFile());
                continue;
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug((Object)("Directive " + directive.getId() + ": ignoring non-directive, non-file inode " + node));
        }
    }

    private void rescanFile(CacheDirective directive, INodeFile file) {
        BlockInfo[] blockInfos = file.getBlocks();
        directive.addFilesNeeded(1L);
        long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() * (long)directive.getReplication();
        directive.addBytesNeeded(neededTotal);
        CachePool pool = directive.getPool();
        if (pool.getBytesNeeded() > pool.getLimit()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Directive %d: not scanning file %s because bytesNeeded for pool %s is %d, but the pool's limit is %d", directive.getId(), file.getFullPathName(), pool.getPoolName(), pool.getBytesNeeded(), pool.getLimit()));
            }
            return;
        }
        long cachedTotal = 0L;
        for (BlockInfo blockInfo : blockInfos) {
            if (!blockInfo.getBlockUCState().equals((Object)HdfsServerConstants.BlockUCState.COMPLETE)) {
                if (!LOG.isTraceEnabled()) continue;
                LOG.trace((Object)("Directive " + directive.getId() + ": can't cache " + "block " + blockInfo + " because it is in state " + (Object)((Object)blockInfo.getBlockUCState()) + ", not COMPLETE."));
                continue;
            }
            Block block = new Block(blockInfo.getBlockId());
            CachedBlock ncblock = new CachedBlock(block.getBlockId(), directive.getReplication(), this.mark);
            CachedBlock ocblock = this.cachedBlocks.get(ncblock);
            if (ocblock == null) {
                this.cachedBlocks.put(ncblock);
                ocblock = ncblock;
            } else {
                List<DatanodeDescriptor> cachedOn = ocblock.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.CACHED);
                long cachedByBlock = (long)Math.min(cachedOn.size(), directive.getReplication()) * blockInfo.getNumBytes();
                cachedTotal += cachedByBlock;
                if (this.mark != ocblock.getMark() || ocblock.getReplication() < directive.getReplication()) {
                    ocblock.setReplicationAndMark(directive.getReplication(), this.mark);
                }
            }
            if (!LOG.isTraceEnabled()) continue;
            LOG.trace((Object)("Directive " + directive.getId() + ": setting replication " + "for block " + blockInfo + " to " + ocblock.getReplication()));
        }
        directive.addBytesCached(cachedTotal);
        if (cachedTotal == neededTotal) {
            directive.addFilesCached(1L);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Directive " + directive.getId() + ": caching " + file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal + " bytes"));
        }
    }

    private String findReasonForNotCaching(CachedBlock cblock, BlockInfo blockInfo) {
        if (blockInfo == null) {
            return "not tracked by the BlockManager";
        }
        if (!blockInfo.isComplete()) {
            return "not complete";
        }
        if (cblock.getReplication() == 0) {
            return "not needed by any directives";
        }
        if (cblock.getMark() != this.mark) {
            cblock.setReplicationAndMark((short)0, this.mark);
            return "no longer needed by any directives";
        }
        return null;
    }

    private void rescanCachedBlockMap() {
        Iterator cbIter = this.cachedBlocks.iterator();
        while (cbIter.hasNext()) {
            int neededUncached;
            DatanodeDescriptor datanode;
            Iterator<DatanodeDescriptor> iter;
            int numCached;
            ++this.scannedBlocks;
            CachedBlock cblock = (CachedBlock)cbIter.next();
            List<DatanodeDescriptor> pendingCached = cblock.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.PENDING_CACHED);
            List<DatanodeDescriptor> cached = cblock.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.CACHED);
            List<DatanodeDescriptor> pendingUncached = cblock.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.PENDING_UNCACHED);
            Iterator<DatanodeDescriptor> iter2 = pendingUncached.iterator();
            while (iter2.hasNext()) {
                DatanodeDescriptor datanode2 = iter2.next();
                if (cblock.isInList(datanode2.getCached())) continue;
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Block " + cblock.getBlockId() + ": removing from " + "PENDING_UNCACHED for node " + datanode2.getDatanodeUuid() + "because the DataNode uncached it."));
                }
                datanode2.getPendingUncached().remove(cblock);
                iter2.remove();
            }
            BlockInfo blockInfo = this.blockManager.getStoredBlock(new Block(cblock.getBlockId()));
            String reason = this.findReasonForNotCaching(cblock, blockInfo);
            short neededCached = 0;
            if (reason != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Block " + cblock.getBlockId() + ": can't cache " + "block because it is " + reason));
                }
            } else {
                neededCached = cblock.getReplication();
            }
            if ((numCached = cached.size()) >= neededCached) {
                iter = pendingCached.iterator();
                while (iter.hasNext()) {
                    datanode = iter.next();
                    datanode.getPendingCached().remove(cblock);
                    iter.remove();
                    if (!LOG.isTraceEnabled()) continue;
                    LOG.trace((Object)("Block " + cblock.getBlockId() + ": removing from " + "PENDING_CACHED for node " + datanode.getDatanodeUuid() + "because we already have " + numCached + " cached " + "replicas and we only need " + neededCached));
                }
            }
            if (numCached < neededCached) {
                iter = pendingUncached.iterator();
                while (iter.hasNext()) {
                    datanode = iter.next();
                    datanode.getPendingUncached().remove(cblock);
                    iter.remove();
                    if (!LOG.isTraceEnabled()) continue;
                    LOG.trace((Object)("Block " + cblock.getBlockId() + ": removing from " + "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() + "because we only have " + numCached + " cached replicas " + "and we need " + neededCached));
                }
            }
            if ((neededUncached = numCached - (pendingUncached.size() + neededCached)) > 0) {
                this.addNewPendingUncached(neededUncached, cblock, cached, pendingUncached);
            } else {
                int additionalCachedNeeded = neededCached - (numCached + pendingCached.size());
                if (additionalCachedNeeded > 0) {
                    this.addNewPendingCached(additionalCachedNeeded, cblock, cached, pendingCached);
                }
            }
            if (neededCached != 0 || !pendingUncached.isEmpty() || !pendingCached.isEmpty()) continue;
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Block " + cblock.getBlockId() + ": removing from " + "cachedBlocks, since neededCached == 0, and " + "pendingUncached and pendingCached are empty."));
            }
            cbIter.remove();
        }
    }

    private void addNewPendingUncached(int neededUncached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingUncached) {
        LinkedList<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
        for (DatanodeDescriptor datanode : cached) {
            if (pendingUncached.contains(datanode)) continue;
            possibilities.add(datanode);
        }
        while (neededUncached > 0) {
            if (possibilities.isEmpty()) {
                LOG.warn((Object)("Logic error: we're trying to uncache more replicas than actually exist for " + cachedBlock));
                return;
            }
            DatanodeDescriptor datanode = (DatanodeDescriptor)possibilities.remove(random.nextInt(possibilities.size()));
            pendingUncached.add(datanode);
            boolean added = datanode.getPendingUncached().add(cachedBlock);
            assert (added);
            --neededUncached;
        }
    }

    private void addNewPendingCached(int neededCached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingCached) {
        BlockInfo blockInfo = this.blockManager.getStoredBlock(new Block(cachedBlock.getBlockId()));
        if (blockInfo == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Block " + cachedBlock.getBlockId() + ": can't add new " + "cached replicas, because there is no record of this block " + "on the NameNode."));
            }
            return;
        }
        if (!blockInfo.isComplete()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Block " + cachedBlock.getBlockId() + ": can't cache this " + "block, because it is not yet complete."));
            }
            return;
        }
        LinkedList<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
        int numReplicas = blockInfo.getCapacity();
        Collection<DatanodeDescriptor> corrupt = this.blockManager.getCorruptReplicas(blockInfo);
        int outOfCapacity = 0;
        for (int i = 0; i < numReplicas; ++i) {
            BlockInfo info;
            DatanodeDescriptor datanode = blockInfo.getDatanode(i);
            if (datanode == null || datanode.isDecommissioned() || datanode.isDecommissionInProgress() || corrupt != null && corrupt.contains(datanode) || pendingCached.contains(datanode) || cached.contains(datanode)) continue;
            long pendingCapacity = datanode.getCacheRemaining();
            for (CachedBlock cBlock : datanode.getPendingCached()) {
                info = this.blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
                if (info == null) continue;
                pendingCapacity -= info.getNumBytes();
            }
            for (CachedBlock cBlock : datanode.getPendingUncached()) {
                info = this.blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
                if (info == null) continue;
                pendingCapacity += info.getNumBytes();
            }
            if (pendingCapacity < blockInfo.getNumBytes()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Block " + blockInfo.getBlockId() + ": DataNode " + datanode.getDatanodeUuid() + " is not a valid possibility " + "because the block has size " + blockInfo.getNumBytes() + ", but " + "the DataNode only has " + datanode.getCacheRemaining() + " " + "bytes of cache remaining."));
                }
                ++outOfCapacity;
                continue;
            }
            possibilities.add(datanode);
        }
        List<DatanodeDescriptor> chosen = CacheReplicationMonitor.chooseDatanodesForCaching(possibilities, neededCached, this.blockManager.getDatanodeManager().getStaleInterval());
        for (DatanodeDescriptor datanode : chosen) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Block " + blockInfo.getBlockId() + ": added to " + "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid()));
            }
            pendingCached.add(datanode);
            boolean added = datanode.getPendingCached().add(cachedBlock);
            assert (added);
        }
        if (neededCached > chosen.size() && LOG.isDebugEnabled()) {
            LOG.debug((Object)("Block " + blockInfo.getBlockId() + ": we only have " + (cachedBlock.getReplication() - neededCached + chosen.size()) + " of " + cachedBlock.getReplication() + " cached replicas.  " + outOfCapacity + " DataNodes have insufficient cache capacity."));
        }
    }

    private static List<DatanodeDescriptor> chooseDatanodesForCaching(List<DatanodeDescriptor> possibilities, int neededCached, long staleInterval) {
        AbstractList targets = new ArrayList<DatanodeDescriptor>(possibilities);
        LinkedList<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
        LinkedList<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
        Iterator it = targets.iterator();
        while (it.hasNext()) {
            DatanodeDescriptor d = (DatanodeDescriptor)it.next();
            if (!d.isStale(staleInterval)) continue;
            it.remove();
            stale.add(d);
        }
        while (chosen.size() < neededCached) {
            if (targets.isEmpty()) {
                if (stale.isEmpty()) break;
                targets = stale;
            }
            DatanodeDescriptor target = CacheReplicationMonitor.chooseRandomDatanodeByRemainingCapacity(targets);
            chosen.add(target);
            targets.remove(target);
        }
        return chosen;
    }

    private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(List<DatanodeDescriptor> targets) {
        float total = 0.0f;
        for (DatanodeDescriptor d : targets) {
            total += d.getCacheRemainingPercent();
        }
        TreeMap<Integer, DatanodeDescriptor> lottery = new TreeMap<Integer, DatanodeDescriptor>();
        int offset = 0;
        for (DatanodeDescriptor d : targets) {
            int weight = Math.max(1, (int)(d.getCacheRemainingPercent() / total * 1000000.0f));
            lottery.put(offset += weight, d);
        }
        DatanodeDescriptor winner = (DatanodeDescriptor)lottery.higherEntry(random.nextInt(offset)).getValue();
        return winner;
    }
}

