/*
 * Decompiled with CFR 0.152.
 */
package water;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import water.H2O;
import water.Job;
import water.Key;
import water.MemoryManager;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.parser.FVecParseWriter;
import water.util.Log;

public class FrameSizeMonitor
implements Runnable,
Thread.UncaughtExceptionHandler {
    private static final String LOG_LEVEL_PROP = "util.frameSizeMonitor.logLevel";
    private static final String ENABLED_PROP = "util.frameSizeMonitor.enabled";
    private static final String SAFE_COEF_PROP = "util.frameSizeMonitor.safetyCoefficient";
    private static final String SAFE_FREE_MEM_DEFAULT_COEF = "0.2";
    private static final int LOG_LEVEL;
    private static final boolean ENABLED;
    private static final float SAFE_FREE_MEM_COEF;
    private static final int SLEEP_MS = 100;
    private static final int MB = 0x100000;
    private static final float FIRST_CHECK_PROGRESS = 0.02f;
    private static final ConcurrentMap<Key<Job>, FrameSizeMonitor> registry;
    private final Key<Job> jobKey;
    private final Set<FVecParseWriter> writers = new HashSet<FVecParseWriter>();
    private final long totalMemory = this.getTotalMemory();
    private long committedMemory = 0L;

    FrameSizeMonitor(Key<Job> jobKey) {
        this.jobKey = jobKey;
    }

    public static void get(Key<Job> jobKey, Consumer<FrameSizeMonitor> c) {
        if (!ENABLED) {
            return;
        }
        FrameSizeMonitor monitor = registry.computeIfAbsent(jobKey, key -> {
            if (((Job)jobKey.get()).stop_requested()) {
                throw new IllegalStateException("Memory is running low. Forcefully terminating.");
            }
            FrameSizeMonitor m = new FrameSizeMonitor(jobKey);
            Thread t = new Thread((Runnable)m, "FrameSizeMonitor-" + ((Job)jobKey.get())._result);
            t.setUncaughtExceptionHandler(m);
            t.start();
            return m;
        });
        c.accept(monitor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void finish(Key<Job> jobKey) {
        ConcurrentMap<Key<Job>, FrameSizeMonitor> concurrentMap = registry;
        synchronized (concurrentMap) {
            registry.remove(jobKey);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        float nextProgress = 0.02f;
        Job job = this.jobKey.get();
        while (job.isRunning() && nextProgress < 1.0f) {
            if (!MemoryManager.canAlloc()) {
                Log.info("FrameSizeMonitor: MemoryManager is running low on memory, stopping job " + this.jobKey + " writing frame " + job._result);
                job.fail(new RuntimeException("Aborting due to critically low memory."));
                break;
            }
            float currentProgress = job.progress();
            if (currentProgress >= nextProgress) {
                if (this.isMemoryUsageOverLimit() && this.isFrameSizeOverLimit(currentProgress, job)) {
                    job.fail(new RuntimeException("Aborting due to projected memory usage too high."));
                    break;
                }
                nextProgress = nextProgress < 0.1f ? currentProgress + 0.01f : currentProgress + 0.1f;
            } else if (Log.isLoggingFor(LOG_LEVEL)) {
                Log.log(LOG_LEVEL, "FrameSizeMonitor: waiting for progress " + currentProgress + " to jump over " + nextProgress);
            }
            FrameSizeMonitor frameSizeMonitor = this;
            synchronized (frameSizeMonitor) {
                try {
                    this.wait(100L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (Log.isLoggingFor(LOG_LEVEL)) {
            if (!job.isStopped()) {
                job.get();
            }
            if (job.isDone()) {
                Log.log(LOG_LEVEL, "FrameSizeMonitor: finished monitoring job " + this.jobKey + ", final frame size is " + ((Frame)job._result.get()).byteSize() / 0x100000L + " MB");
            }
        }
        FrameSizeMonitor.finish(this.jobKey);
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        Log.err(e);
        FrameSizeMonitor.finish(this.jobKey);
    }

    private boolean isMemoryUsageOverLimit() {
        long minimumAvailableMemory;
        long availableMemory = this.getAvailableMemory();
        if (availableMemory < (minimumAvailableMemory = (long)((float)(this.totalMemory * 2L) * SAFE_FREE_MEM_COEF))) {
            Log.log(LOG_LEVEL, "FrameSizeMonitor: Checking output of job " + this.jobKey + " because the available memory " + availableMemory / 0x100000L + " MB is lower than threshold " + minimumAvailableMemory / 0x100000L + " MB (" + SAFE_FREE_MEM_COEF + " of " + this.totalMemory / 0x100000L + " MB total memory)");
            return true;
        }
        Log.log(LOG_LEVEL, "FrameSizeMonitor: Overall memory usage is ok, still have " + availableMemory / 0x100000L + " MB available of " + minimumAvailableMemory / 0x100000L + " MB required.");
        return false;
    }

    private boolean isFrameSizeOverLimit(float progress, Job job) {
        long currentCommittedMemory = this.committedMemory;
        long currentInProgressMemory = this.getInProgressMemory();
        long projectedTotalFrameSize = (long)((float)currentInProgressMemory + (float)currentCommittedMemory / progress);
        long projectedAdditionalFrameSize = projectedTotalFrameSize - currentCommittedMemory - currentInProgressMemory;
        long availableMemory = this.getAvailableMemory();
        long usableMemory = (long)((float)availableMemory - (float)this.totalMemory * SAFE_FREE_MEM_COEF);
        if (Log.isLoggingFor(LOG_LEVEL)) {
            Log.log(LOG_LEVEL, "FrameSizeMonitor: Frame " + job._result + ": \n committed: " + currentCommittedMemory / 0x100000L + " MB\n loading: " + currentInProgressMemory / 0x100000L + " MB\n progress: " + progress + "\n projected additional: " + projectedAdditionalFrameSize / 0x100000L + " MB\n projected total: " + projectedTotalFrameSize / 0x100000L + " MB\n availableMemory: " + availableMemory / 0x100000L + " MB\n totalMemory: " + this.totalMemory / 0x100000L + " MB\n usableMemory: " + usableMemory / 0x100000L + " MB\n enough: " + (projectedAdditionalFrameSize <= usableMemory));
        }
        if (projectedAdditionalFrameSize > usableMemory) {
            Log.err("FrameSizeMonitor: Stopping job " + this.jobKey + " writing frame " + job._result + " because the projected size of " + projectedAdditionalFrameSize / 0x100000L + " MB  does not safely fit in " + availableMemory / 0x100000L + " MB of available memory.");
            return true;
        }
        if (Log.isLoggingFor(LOG_LEVEL)) {
            Log.log(LOG_LEVEL, "FrameSizeMonitor: Projected memory " + projectedAdditionalFrameSize / 0x100000L + "MB for frame " + job._result + " fits safely into " + availableMemory / 0x100000L + " MB of available memory.");
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getInProgressMemory() {
        long usedMemory = 0L;
        Set<FVecParseWriter> set = this.writers;
        synchronized (set) {
            for (FVecParseWriter writer : this.writers) {
                NewChunk[] nvs = writer.getNvs();
                if (nvs == null) continue;
                usedMemory += this.getUsedMemory(nvs);
            }
        }
        return usedMemory;
    }

    private long getUsedMemory(NewChunk[] nvs) {
        long usedMemory = 0L;
        for (NewChunk nv : nvs) {
            if (nv == null) continue;
            usedMemory += nv.byteSize();
        }
        return usedMemory;
    }

    private long getTotalMemory() {
        return H2O.SELF._heartbeat.get_kv_mem() + H2O.SELF._heartbeat.get_pojo_mem() + H2O.SELF._heartbeat.get_free_mem();
    }

    private long getAvailableMemory() {
        return H2O.SELF._heartbeat.get_free_mem();
    }

    public static void register(Key<Job> jobKey, FVecParseWriter writer) {
        FrameSizeMonitor.get(jobKey, t -> t.register(writer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void register(FVecParseWriter writer) {
        Set<FVecParseWriter> set = this.writers;
        synchronized (set) {
            this.writers.add(writer);
        }
    }

    public static void closed(Key<Job> jobKey, FVecParseWriter writer, long mem) {
        FrameSizeMonitor.get(jobKey, t -> t.closed(writer, mem));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed(FVecParseWriter writer, long mem) {
        Set<FVecParseWriter> set = this.writers;
        synchronized (set) {
            this.writers.remove(writer);
            this.committedMemory += mem;
        }
    }

    static {
        registry = new ConcurrentHashMap<Key<Job>, FrameSizeMonitor>();
        LOG_LEVEL = Integer.parseInt(H2O.getSysProperty(LOG_LEVEL_PROP, String.valueOf(4)));
        ENABLED = H2O.getSysBoolProperty(ENABLED_PROP, false);
        SAFE_FREE_MEM_COEF = Float.parseFloat(H2O.getSysProperty(SAFE_COEF_PROP, SAFE_FREE_MEM_DEFAULT_COEF));
    }
}

