package org.apache.sysds.runtime.lineage;

import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.data.LineageObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.util.CommonThreadPool;

/* loaded from: input_file:org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.class */
public class LineageSparkCacheEviction {
    private static long SPARK_STORAGE_LIMIT = 0;
    private static long _sparkStorageSize = 0;
    private static TreeSet<LineageCacheEntry> weightedQueue = new TreeSet<>(LineageCacheConfig.LineageCacheComparator);
    protected static final Map<LineageItem, Integer> RDDHitCountLocal = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/lineage/LineageSparkCacheEviction$TriggerRemoteTask.class */
    public static class TriggerRemoteTask implements Runnable {
        JavaPairRDD<?, ?> rdd;

        public TriggerRemoteTask(JavaPairRDD<?, ?> javaPairRDD) {
            this.rdd = javaPairRDD;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.rdd.count();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void resetEviction() {
        _sparkStorageSize = 0L;
        weightedQueue.clear();
        RDDHitCountLocal.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void addEntry(LineageCacheEntry lineageCacheEntry, long j) {
        if (lineageCacheEntry.isNullVal()) {
            return;
        }
        lineageCacheEntry.initiateScoreSpark(LineageCacheEviction._removelist, j);
        weightedQueue.add(lineageCacheEntry);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void maintainOrder(LineageCacheEntry lineageCacheEntry) {
        if (LineageCacheConfig.isTimeBased() && weightedQueue.remove(lineageCacheEntry)) {
            lineageCacheEntry.updateTimestamp();
            weightedQueue.add(lineageCacheEntry);
        }
        if (LineageCacheConfig.isCostNsize() && weightedQueue.remove(lineageCacheEntry)) {
            lineageCacheEntry.updateScore(true);
            weightedQueue.add(lineageCacheEntry);
        }
    }

    protected static void removeSingleEntry(Map<LineageItem, LineageCacheEntry> map, LineageCacheEntry lineageCacheEntry) {
        lineageCacheEntry.setCacheStatus(LineageCacheConfig.LineageCacheStatus.TOPERSISTRDD);
        lineageCacheEntry.getRDDObject().getRDD().unpersist(false);
        _sparkStorageSize -= lineageCacheEntry.getSize();
        LineageCacheEviction._removelist.merge(lineageCacheEntry._key, 1, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
        if (DMLScript.STATISTICS) {
            LineageCacheStatistics.incrementRDDUnpersists();
        }
    }

    private static void removeEntry(Map<LineageItem, LineageCacheEntry> map, LineageCacheEntry lineageCacheEntry) {
        if (lineageCacheEntry._origItem == null) {
            removeSingleEntry(map, lineageCacheEntry);
            return;
        }
        lineageCacheEntry.setCacheStatus(LineageCacheConfig.LineageCacheStatus.TODELETE);
        boolean z = false;
        LineageCacheEntry lineageCacheEntry2 = map.get(lineageCacheEntry._origItem);
        while (true) {
            LineageCacheEntry lineageCacheEntry3 = lineageCacheEntry2;
            if (lineageCacheEntry3 != null) {
                if (lineageCacheEntry3.getCacheStatus() != LineageCacheConfig.LineageCacheStatus.TODELETE) {
                    return;
                }
                z |= lineageCacheEntry3.getCacheStatus() == LineageCacheConfig.LineageCacheStatus.TODELETE;
                lineageCacheEntry2 = lineageCacheEntry3._nextEntry;
            } else {
                if (!z) {
                    return;
                }
                LineageCacheEntry lineageCacheEntry4 = map.get(lineageCacheEntry._origItem);
                while (true) {
                    LineageCacheEntry lineageCacheEntry5 = lineageCacheEntry4;
                    if (lineageCacheEntry5 == null) {
                        return;
                    }
                    removeSingleEntry(map, lineageCacheEntry5);
                    lineageCacheEntry4 = lineageCacheEntry5._nextEntry;
                }
            }
        }
    }

    private static void setSparkStorageLimit() {
        if (SPARK_STORAGE_LIMIT == 0) {
            SPARK_STORAGE_LIMIT = (long) (((long) SparkExecutionContext.getDataMemoryBudget(false, true)) * 0.7d);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static double getSparkStorageLimit() {
        if (SPARK_STORAGE_LIMIT == 0) {
            setSparkStorageLimit();
        }
        return SPARK_STORAGE_LIMIT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void updateSize(long j, boolean z) {
        _sparkStorageSize += z ? j : -j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean isBelowThreshold(long j) {
        if (!(((double) (j + _sparkStorageSize)) <= getSparkStorageLimit())) {
            _sparkStorageSize = SparkExecutionContext.getStorageSpaceUsed();
        }
        return ((double) (j + _sparkStorageSize)) <= getSparkStorageLimit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void makeSpace(Map<LineageItem, LineageCacheEntry> map, long j) {
        LineageCacheEntry pollFirst;
        while (j + _sparkStorageSize > getSparkStorageLimit() && (pollFirst = weightedQueue.pollFirst()) != null) {
            removeEntry(map, pollFirst);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void cleanupChildRDDs(LineageCacheEntry lineageCacheEntry) {
        if (lineageCacheEntry.getCacheStatus() == LineageCacheConfig.LineageCacheStatus.PERSISTEDRDD) {
            for (LineageObject lineageObject : lineageCacheEntry.getRDDObject().getLineageChilds()) {
                lineageObject.decrementNumReferences();
                rCleanupChildRDDs(lineageObject);
            }
            lineageCacheEntry.getRDDObject().removeAllChild();
        }
    }

    protected static void rCleanupChildRDDs(LineageObject lineageObject) {
        if (lineageObject.getNumReferences() <= 0 && !lineageObject.hasBackReference()) {
            if ((lineageObject instanceof RDDObject) && lineageObject.isInLineageCache() && SparkExecutionContext.isRDDCached(((RDDObject) lineageObject).getRDD().id())) {
                return;
            }
            SparkExecutionContext.cleanupSingleLineageObject(lineageObject);
            for (LineageObject lineageObject2 : lineageObject.getLineageChilds()) {
                lineageObject2.decrementNumReferences();
                rCleanupChildRDDs(lineageObject2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void moveToSpark(LineageCacheEntry lineageCacheEntry) {
        RDDHitCountLocal.merge(lineageCacheEntry._key, 1, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
        if (RDDHitCountLocal.get(lineageCacheEntry._key).intValue() > 3) {
            RDDHitCountLocal.remove(lineageCacheEntry._key);
            CommonThreadPool.getDynamicPool().submit(new TriggerRemoteTask(lineageCacheEntry.getRDDObject().getRDD()));
        }
    }
}
