package org.apache.storm.scheduler.resource;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
import org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.testing.TestWordCounter;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/TestResourceAwareScheduler.class */
public class TestResourceAwareScheduler {
    private final String TOPOLOGY_SUBMITTER = "jerry";
    private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
    private static int currentTime = 1450418597;
    private static final Config defaultTopologyConf = new Config();

    @BeforeClass
    public static void initConf() {
        defaultTopologyConf.put("storm.network.topography.plugin", "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
        defaultTopologyConf.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        defaultTopologyConf.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        defaultTopologyConf.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        defaultTopologyConf.put("topology.component.cpu.pcore.percent", Double.valueOf(10.0d));
        defaultTopologyConf.put("topology.component.resources.onheap.memory.mb", Double.valueOf(128.0d));
        defaultTopologyConf.put("topology.component.resources.offheap.memory.mb", Double.valueOf(0.0d));
        defaultTopologyConf.put("topology.worker.max.heap.size.mb", Double.valueOf(8192.0d));
        defaultTopologyConf.put("topology.priority", 0);
    }

    @Test
    public void testRASNodeSlotAssign() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map allNodesFrom = RAS_Nodes.getAllNodesFrom(new Cluster(iNimbusTest, TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, hashMap), new HashMap(), new HashMap()), new Topologies(new HashMap()));
        Assert.assertEquals(5L, allNodesFrom.size());
        RAS_Node rAS_Node = (RAS_Node) allNodesFrom.get("sup-0");
        Assert.assertEquals("sup-0", rAS_Node.getId());
        Assert.assertTrue(rAS_Node.isAlive());
        Assert.assertEquals(0L, rAS_Node.getRunningTopologies().size());
        Assert.assertTrue(rAS_Node.isTotallyFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0, "user");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ExecutorDetails(1, 1));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), topology, arrayList);
        Assert.assertEquals(1L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(3L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(1L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new ExecutorDetails(2, 2));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), topology, arrayList2);
        Assert.assertEquals(1L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(2L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(2L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0, "user");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new ExecutorDetails(1, 1));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), topology2, arrayList3);
        Assert.assertEquals(2L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(1L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(3L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(new ExecutorDetails(2, 2));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), topology2, arrayList4);
        Assert.assertEquals(2L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        rAS_Node.freeAllSlots();
        Assert.assertEquals(0L, rAS_Node.getRunningTopologies().size());
        Assert.assertTrue(rAS_Node.isTotallyFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
    }

    @Test
    public void sanityTestOfScheduling() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, hashMap);
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topology.getId(), topology);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topology.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, slots.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(2L, executors.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology.getId()));
    }

    @Test
    public void testTopologyWithMultipleSpouts() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, hashMap);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 1);
        topologyBuilder.setSpout("wordSpout2", new TestWordSpout(), 1);
        topologyBuilder.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
        topologyBuilder.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
        topologyBuilder.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
        topologyBuilder.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
        topologyBuilder.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
        topologyBuilder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config, createTopology2, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails.getId(), topologyDetails);
        hashMap2.put(topologyDetails2.getId(), topologyDetails2);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, slots.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(7L, executors.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
        SchedulerAssignment assignmentById2 = cluster.getAssignmentById(topologyDetails2.getId());
        Set slots2 = assignmentById2.getSlots();
        HashSet hashSet2 = new HashSet();
        Iterator it2 = slots2.iterator();
        while (it2.hasNext()) {
            hashSet2.add(((WorkerSlot) it2.next()).getNodeId());
        }
        Set executors2 = assignmentById2.getExecutors();
        Assert.assertEquals(1L, slots2.size());
        Assert.assertEquals(1L, hashSet2.size());
        Assert.assertEquals(2L, executors2.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails2.getId()));
    }

    @Test
    public void testTopologySetCpuAndMemLoad() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, hashMap);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d));
        topologyBuilder.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails.getId(), topologyDetails);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Set<WorkerSlot> slots = assignmentById.getSlots();
        double d = 0.0d;
        double d2 = 0.0d;
        HashSet hashSet = new HashSet();
        for (WorkerSlot workerSlot : slots) {
            hashSet.add(workerSlot.getNodeId());
            d += workerSlot.getAllocatedMemOnHeap() + workerSlot.getAllocatedMemOffHeap();
            d2 += workerSlot.getAllocatedCpu();
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, slots.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(2L, executors.size());
        Assert.assertEquals(400.0d, d, 0.001d);
        Assert.assertEquals(40.0d, d2, 0.001d);
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
    }

    @Test
    public void testResourceLimitation() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, hashMap);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(Double.valueOf(250.0d)).setMemoryLoad(Double.valueOf(1000.0d), Double.valueOf(200.0d));
        topologyBuilder.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(Double.valueOf(100.0d)).setMemoryLoad(Double.valueOf(500.0d), Double.valueOf(100.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 2, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails.getId(), topologyDetails);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set<ExecutorDetails> executors = assignmentById.getExecutors();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ExecutorDetails executorDetails : executors) {
            arrayList.add(topologyDetails.getTotalMemReqTask(executorDetails));
            arrayList2.add(topologyDetails.getTotalCpuReqTask(executorDetails));
        }
        Collections.sort(arrayList2);
        Collections.sort(arrayList);
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        for (Map.Entry entry : assignmentById.getExecutorToSlot().entrySet()) {
            hashMap3.put(entry.getKey(), cluster.getSupervisorById(((WorkerSlot) entry.getValue()).getNodeId()));
        }
        for (Map.Entry entry2 : hashMap3.entrySet()) {
            List list = (List) hashMap4.get(entry2.getValue());
            if (list == null) {
                list = new ArrayList();
                hashMap4.put(entry2.getValue(), list);
            }
            list.add(entry2.getKey());
        }
        for (Map.Entry entry3 : hashMap4.entrySet()) {
            Double totalCPU = ((SupervisorDetails) entry3.getKey()).getTotalCPU();
            Double totalMemory = ((SupervisorDetails) entry3.getKey()).getTotalMemory();
            Double valueOf = Double.valueOf(0.0d);
            Double valueOf2 = Double.valueOf(0.0d);
            for (ExecutorDetails executorDetails2 : (List) entry3.getValue()) {
                valueOf2 = Double.valueOf(valueOf2.doubleValue() + topologyDetails.getTotalCpuReqTask(executorDetails2).doubleValue());
                totalCPU = Double.valueOf(totalCPU.doubleValue() + topologyDetails.getTotalMemReqTask(executorDetails2).doubleValue());
            }
            hashMap5.put(totalCPU, valueOf);
            hashMap6.put(totalMemory, valueOf2);
        }
        Assert.assertEquals(2L, slots.size());
        Assert.assertEquals(2L, hashSet.size());
        Assert.assertEquals(3L, executors.size());
        Assert.assertEquals(100.0d, ((Double) arrayList2.get(0)).doubleValue(), 0.001d);
        Assert.assertEquals(250.0d, ((Double) arrayList2.get(1)).doubleValue(), 0.001d);
        Assert.assertEquals(250.0d, ((Double) arrayList2.get(2)).doubleValue(), 0.001d);
        Assert.assertEquals(600.0d, ((Double) arrayList.get(0)).doubleValue(), 0.001d);
        Assert.assertEquals(1200.0d, ((Double) arrayList.get(1)).doubleValue(), 0.001d);
        Assert.assertEquals(1200.0d, ((Double) arrayList.get(2)).doubleValue(), 0.001d);
        for (Map.Entry entry4 : hashMap6.entrySet()) {
            Assert.assertTrue(((Double) entry4.getKey()).doubleValue() - ((Double) entry4.getValue()).doubleValue() >= 0.0d);
        }
        for (Map.Entry entry5 : hashMap5.entrySet()) {
            Assert.assertTrue(((Double) entry5.getKey()).doubleValue() - ((Double) entry5.getValue()).doubleValue() >= 0.0d);
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
    }

    @Test
    public void testScheduleResilience() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, hashMap);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 3);
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 3, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 2);
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        Config config2 = new Config();
        config2.putAll(defaultTopologyConf);
        config2.put("topology.component.resources.onheap.memory.mb", Double.valueOf(1280.0d));
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config2, createTopology2, 2, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails2.getId(), topologyDetails2);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignmentImpl assignmentById = cluster.getAssignmentById(topologyDetails2.getId());
        WorkerSlot workerSlot = (WorkerSlot) new ArrayList(assignmentById.getSlots()).get(0);
        Map executorToSlot = assignmentById.getExecutorToSlot();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : executorToSlot.entrySet()) {
            if (((WorkerSlot) entry.getValue()).equals(workerSlot)) {
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            executorToSlot.remove((ExecutorDetails) it.next());
        }
        HashMap hashMap3 = new HashMap(executorToSlot);
        Set<ExecutorDetails> keySet = hashMap3.keySet();
        resourceAwareScheduler.schedule(topologies, cluster);
        Map executorToSlot2 = cluster.getAssignmentById(topologyDetails2.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails : keySet) {
            Assert.assertEquals(hashMap3.get(executorDetails), executorToSlot2.get(executorDetails));
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails2.getId()));
        HashMap hashMap4 = new HashMap();
        hashMap4.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0));
        hashMap4.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1));
        hashMap4.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
        HashMap hashMap5 = new HashMap();
        hashMap5.put(topologyDetails.getId(), new SchedulerAssignmentImpl(topologyDetails.getId(), hashMap4));
        HashMap hashMap6 = new HashMap(hashMap4);
        Set<ExecutorDetails> keySet2 = hashMap6.keySet();
        HashMap hashMap7 = new HashMap(genSupervisors);
        hashMap7.remove("sup-0");
        Cluster cluster2 = new Cluster(iNimbusTest, hashMap7, hashMap5, config);
        HashMap hashMap8 = new HashMap();
        hashMap8.put(topologyDetails.getId(), topologyDetails);
        resourceAwareScheduler.schedule(new Topologies(hashMap8), cluster2);
        Map executorToSlot3 = cluster2.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails2 : keySet2) {
            Assert.assertEquals(hashMap6.get(executorDetails2), executorToSlot3.get(executorDetails2));
        }
        Assert.assertEquals("Fully Scheduled", cluster2.getStatusMap().get(topologyDetails.getId()));
        HashMap hashMap9 = new HashMap();
        hashMap9.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1));
        hashMap9.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2));
        hashMap9.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
        HashMap hashMap10 = new HashMap();
        hashMap10.put(topologyDetails.getId(), new SchedulerAssignmentImpl(topologyDetails.getId(), hashMap9));
        hashMap9.remove(new ExecutorDetails(1, 1));
        HashMap hashMap11 = new HashMap(hashMap9);
        Set<ExecutorDetails> keySet3 = hashMap11.keySet();
        HashMap hashMap12 = new HashMap(genSupervisors);
        hashMap12.remove("sup-0");
        Cluster cluster3 = new Cluster(iNimbusTest, hashMap12, hashMap10, config);
        HashMap hashMap13 = new HashMap();
        hashMap13.put(topologyDetails.getId(), topologyDetails);
        resourceAwareScheduler.schedule(new Topologies(hashMap13), cluster3);
        Map executorToSlot4 = cluster3.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails3 : keySet3) {
            Assert.assertEquals(hashMap11.get(executorDetails3), executorToSlot4.get(executorDetails3));
        }
        Assert.assertEquals("Fully Scheduled", cluster3.getStatusMap().get(topologyDetails.getId()));
        Cluster cluster4 = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        HashMap hashMap14 = new HashMap();
        hashMap14.put(topologyDetails.getId(), topologyDetails);
        resourceAwareScheduler.schedule(new Topologies(hashMap14), cluster4);
        HashMap hashMap15 = new HashMap(cluster4.getAssignmentById(topologyDetails.getId()).getExecutorToSlot());
        hashMap14.put(topologyDetails2.getId(), topologyDetails2);
        resourceAwareScheduler.schedule(new Topologies(hashMap14), cluster4);
        Map executorToSlot5 = cluster4.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails4 : hashMap15.keySet()) {
            Assert.assertEquals(hashMap15.get(executorDetails4), executorToSlot5.get(executorDetails4));
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster4.getStatusMap().get(topologyDetails.getId()));
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster4.getStatusMap().get(topologyDetails2.getId()));
    }

    @Test
    public void testHeterogeneousCluster() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(800.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(4096.0d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("supervisor.cpu.capacity", Double.valueOf(200.0d));
        hashMap2.put("supervisor.memory.capacity.mb", Double.valueOf(1024.0d));
        HashMap hashMap3 = new HashMap();
        int i = 0;
        while (i < 2) {
            LinkedList linkedList = new LinkedList();
            for (int i2 = 0; i2 < 4; i2++) {
                linkedList.add(Integer.valueOf(i2));
            }
            SupervisorDetails supervisorDetails = new SupervisorDetails("sup-" + i, "host-" + i, (Object) null, linkedList, i == 0 ? hashMap : hashMap2);
            hashMap3.put(supervisorDetails.getId(), supervisorDetails);
            i++;
        }
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(Double.valueOf(300.0d)).setMemoryLoad(Double.valueOf(2000.0d), Double.valueOf(48.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(Double.valueOf(100.0d)).setMemoryLoad(Double.valueOf(500.0d), Double.valueOf(12.0d));
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        Config config2 = new Config();
        config2.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config2, createTopology2, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        TopologyBuilder topologyBuilder3 = new TopologyBuilder();
        topologyBuilder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d), Double.valueOf(56.0d));
        StormTopology createTopology3 = topologyBuilder3.createTopology();
        new Config().putAll(defaultTopologyConf);
        TopologyDetails topologyDetails3 = new TopologyDetails("topology3", config2, createTopology3, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology3), 0, "user");
        TopologyBuilder topologyBuilder4 = new TopologyBuilder();
        topologyBuilder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(Double.valueOf(30.0d)).setMemoryLoad(Double.valueOf(100.0d), Double.valueOf(0.0d));
        StormTopology createTopology4 = topologyBuilder4.createTopology();
        Config config3 = new Config();
        config3.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails4 = new TopologyDetails("topology4", config3, createTopology4, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology4), 0, "user");
        TopologyBuilder topologyBuilder5 = new TopologyBuilder();
        topologyBuilder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(Double.valueOf(25.0d)).setMemoryLoad(Double.valueOf(100.0d), Double.valueOf(28.0d));
        StormTopology createTopology5 = topologyBuilder5.createTopology();
        Config config4 = new Config();
        config4.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails5 = new TopologyDetails("topology5", config4, createTopology5, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology5), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, hashMap3, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap4 = new HashMap();
        hashMap4.put(topologyDetails.getId(), topologyDetails);
        hashMap4.put(topologyDetails2.getId(), topologyDetails2);
        hashMap4.put(topologyDetails3.getId(), topologyDetails3);
        Topologies topologies = new Topologies(hashMap4);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails2.getId()));
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails3.getId()));
        Map<SupervisorDetails, Double> supervisorToCpuUsage = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
        Map<SupervisorDetails, Double> supervisorToMemoryUsage = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
        Double valueOf = Double.valueOf(1.0E-4d);
        for (SupervisorDetails supervisorDetails2 : hashMap3.values()) {
            Assert.assertTrue(Math.abs(supervisorDetails2.getTotalMemory().doubleValue() - supervisorToMemoryUsage.get(supervisorDetails2).doubleValue()) < valueOf.doubleValue() || Math.abs(supervisorDetails2.getTotalCPU().doubleValue() - supervisorToCpuUsage.get(supervisorDetails2).doubleValue()) < valueOf.doubleValue());
        }
        Cluster cluster2 = new Cluster(iNimbusTest, hashMap3, new HashMap(), config);
        HashMap hashMap5 = new HashMap();
        hashMap5.put(topologyDetails.getId(), topologyDetails);
        hashMap5.put(topologyDetails2.getId(), topologyDetails2);
        hashMap5.put(topologyDetails4.getId(), topologyDetails4);
        Topologies topologies2 = new Topologies(hashMap5);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies2, cluster2);
        int i3 = ((String) cluster2.getStatusMap().get(topologyDetails.getId())).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy") ? 0 + 1 : 0;
        if (((String) cluster2.getStatusMap().get(topologyDetails2.getId())).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
            i3++;
        }
        if (((String) cluster2.getStatusMap().get(topologyDetails4.getId())).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
            i3++;
        }
        Assert.assertEquals(2L, i3);
        Cluster cluster3 = new Cluster(iNimbusTest, hashMap3, new HashMap(), config);
        HashMap hashMap6 = new HashMap();
        hashMap6.put(topologyDetails5.getId(), topologyDetails5);
        Topologies topologies3 = new Topologies(hashMap6);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies3, cluster3);
        Map<SupervisorDetails, Double> supervisorToCpuUsage2 = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster3, topologies3);
        Map<SupervisorDetails, Double> supervisorToMemoryUsage2 = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster3, topologies3);
        for (SupervisorDetails supervisorDetails3 : hashMap3.values()) {
            Double totalCPU = supervisorDetails3.getTotalCPU();
            Double totalMemory = supervisorDetails3.getTotalMemory();
            Double d = supervisorToCpuUsage2.get(supervisorDetails3);
            Double d2 = supervisorToMemoryUsage2.get(supervisorDetails3);
            Assert.assertEquals(totalCPU.doubleValue(), d.doubleValue(), 1.0E-4d);
            Assert.assertEquals(totalMemory.doubleValue(), d2.doubleValue(), 1.0E-4d);
        }
    }

    @Test
    public void testTopologyWorkerMaxHeapSize() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(400.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(2000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, hashMap);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 4);
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        config.put("topology.worker.max.heap.size.mb", Double.valueOf(128.0d));
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails.getId(), topologyDetails);
        Topologies topologies = new Topologies(hashMap2);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
        Assert.assertEquals(4L, cluster.getAssignedNumWorkers(topologyDetails));
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 5);
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        Config config2 = new Config();
        config2.putAll(defaultTopologyConf);
        config2.put("topology.worker.max.heap.size.mb", Double.valueOf(128.0d));
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config2, createTopology2, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        Cluster cluster2 = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topologyDetails2.getId(), topologyDetails2);
        Topologies topologies2 = new Topologies(hashMap3);
        resourceAwareScheduler.prepare(config2);
        resourceAwareScheduler.schedule(topologies2, cluster2);
        Assert.assertEquals("Not enough resources to schedule - 0/5 executors scheduled", cluster2.getStatusMap().get(topologyDetails2.getId()));
        Assert.assertEquals(5L, cluster2.getUnassignedExecutors(topologyDetails2).size());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 4);
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        config.put("topology.worker.max.heap.size.mb", Double.valueOf(128.0d));
        config.put("topology.component.resources.onheap.memory.mb", Double.valueOf(129.0d));
        StormSubmitter.submitTopologyWithProgressBar("test", config, createTopology);
    }

    @Test
    public void TestReadInResourceAwareSchedulerUserPools() {
        Map findAndReadConfigFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
        LOG.info("fromFile: {}", findAndReadConfigFile);
        ConfigValidation.validateFields(findAndReadConfigFile);
    }

    @Test
    public void TestSubmitUsersWithNoGuarantees() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(100.0d));
        config.put("topology.component.resources.offheap.memory.mb", 500);
        config.put("topology.component.resources.onheap.memory.mb", 500);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jerry", new HashMap());
        ((Map) hashMap2.get("jerry")).put("cpu", Double.valueOf(200.0d));
        ((Map) hashMap2.get("jerry")).put("memory", Double.valueOf(2000.0d));
        config.put("resource.aware.scheduler.user.pools", hashMap2);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, "jerry");
        TopologyDetails topology3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20, "jerry");
        TopologyDetails topology4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, "bobby");
        TopologyDetails topology5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20, "bobby");
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topology.getId(), topology);
        hashMap3.put(topology2.getId(), topology2);
        hashMap3.put(topology3.getId(), topology3);
        hashMap3.put(topology4.getId(), topology4);
        hashMap3.put(topology5.getId(), topology5);
        Topologies topologies = new Topologies(hashMap3);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Iterator it = resourceAwareScheduler.getUser("jerry").getTopologiesRunning().iterator();
        while (it.hasNext()) {
            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 3L, resourceAwareScheduler.getUser("jerry").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesInvalid().size());
        Iterator it2 = resourceAwareScheduler.getUser("bobby").getTopologiesRunning().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it2.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 1L, resourceAwareScheduler.getUser("bobby").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 1L, resourceAwareScheduler.getUser("bobby").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesInvalid().size());
    }

    @Test
    public void TestTopologySortedInCorrectOrder() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1024.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(10.0d));
        config.put("topology.component.resources.offheap.memory.mb", Double.valueOf(128.0d));
        config.put("topology.component.resources.onheap.memory.mb", Double.valueOf(0.0d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jerry", new HashMap());
        ((Map) hashMap2.get("jerry")).put("cpu", 1000);
        ((Map) hashMap2.get("jerry")).put("memory", Double.valueOf(8192.0d));
        hashMap2.put("bobby", new HashMap());
        ((Map) hashMap2.get("bobby")).put("cpu", Double.valueOf(10000.0d));
        ((Map) hashMap2.get("bobby")).put("memory", 32768);
        hashMap2.put("derek", new HashMap());
        ((Map) hashMap2.get("derek")).put("cpu", Double.valueOf(5000.0d));
        ((Map) hashMap2.get("derek")).put("memory", Double.valueOf(16384.0d));
        config.put("resource.aware.scheduler.user.pools", hashMap2);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30, "jerry");
        TopologyDetails topology3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30, "jerry");
        TopologyDetails topology4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20, "jerry");
        TopologyDetails topology5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30, "jerry");
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topology.getId(), topology);
        hashMap3.put(topology2.getId(), topology2);
        hashMap3.put(topology3.getId(), topology3);
        hashMap3.put(topology4.getId(), topology4);
        hashMap3.put(topology5.getId(), topology5);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        Topologies topologies = new Topologies(hashMap3);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Assert.assertEquals("check size", resourceAwareScheduler.getUser("jerry").getTopologiesPending().size(), 0L);
        Set topologiesRunning = resourceAwareScheduler.getUser("jerry").getTopologiesRunning();
        Iterator it = topologiesRunning.iterator();
        TopologyDetails topologyDetails = (TopologyDetails) it.next();
        LOG.info("{} - {}", topologyDetails.getName(), topologiesRunning);
        Assert.assertEquals("check order", topologyDetails.getName(), "topo-4");
        TopologyDetails topologyDetails2 = (TopologyDetails) it.next();
        LOG.info("{} - {}", topologyDetails2.getName(), topologiesRunning);
        Assert.assertEquals("check order", topologyDetails2.getName(), "topo-1");
        TopologyDetails topologyDetails3 = (TopologyDetails) it.next();
        LOG.info("{} - {}", topologyDetails3.getName(), topologiesRunning);
        Assert.assertEquals("check order", topologyDetails3.getName(), "topo-5");
        TopologyDetails topologyDetails4 = (TopologyDetails) it.next();
        LOG.info("{} - {}", topologyDetails4.getName(), topologiesRunning);
        Assert.assertEquals("check order", topologyDetails4.getName(), "topo-3");
        TopologyDetails topologyDetails5 = (TopologyDetails) it.next();
        LOG.info("{} - {}", topologyDetails5.getName(), topologiesRunning);
        Assert.assertEquals("check order", topologyDetails5.getName(), "topo-2");
        TopologyDetails topology6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10, "jerry");
        hashMap3.put(topology6.getId(), topology6);
        Topologies topologies2 = new Topologies(hashMap3);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies2, cluster);
        Iterator it2 = resourceAwareScheduler.getUser("jerry").getTopologiesRunning().iterator();
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-6");
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-4");
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-1");
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-5");
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-3");
        Assert.assertEquals("check order", ((TopologyDetails) it2.next()).getName(), "topo-2");
        Assert.assertEquals("check size", resourceAwareScheduler.getUser("jerry").getTopologiesPending().size(), 0L);
    }

    @Test
    public void TestMultipleUsers() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(1000.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(10240.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jerry", new HashMap());
        ((Map) hashMap2.get("jerry")).put("cpu", 1000);
        ((Map) hashMap2.get("jerry")).put("memory", Double.valueOf(8192.0d));
        hashMap2.put("bobby", new HashMap());
        ((Map) hashMap2.get("bobby")).put("cpu", Double.valueOf(10000.0d));
        ((Map) hashMap2.get("bobby")).put("memory", 32768);
        hashMap2.put("derek", new HashMap());
        ((Map) hashMap2.get("derek")).put("cpu", Double.valueOf(5000.0d));
        ((Map) hashMap2.get("derek")).put("memory", Double.valueOf(16384.0d));
        config.put("resource.aware.scheduler.user.pools", hashMap2);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, "jerry");
        TopologyDetails topology3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29, "jerry");
        TopologyDetails topology4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20, "jerry");
        TopologyDetails topology5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29, "jerry");
        TopologyDetails topology6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20, "bobby");
        TopologyDetails topology7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29, "bobby");
        TopologyDetails topology8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29, "bobby");
        TopologyDetails topology9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20, "bobby");
        TopologyDetails topology10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29, "bobby");
        TopologyDetails topology11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20, "derek");
        TopologyDetails topology12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29, "derek");
        TopologyDetails topology13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29, "derek");
        TopologyDetails topology14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20, "derek");
        TopologyDetails topology15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29, "derek");
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topology.getId(), topology);
        hashMap3.put(topology2.getId(), topology2);
        hashMap3.put(topology3.getId(), topology3);
        hashMap3.put(topology4.getId(), topology4);
        hashMap3.put(topology5.getId(), topology5);
        hashMap3.put(topology6.getId(), topology6);
        hashMap3.put(topology7.getId(), topology7);
        hashMap3.put(topology8.getId(), topology8);
        hashMap3.put(topology9.getId(), topology9);
        hashMap3.put(topology10.getId(), topology10);
        hashMap3.put(topology11.getId(), topology11);
        hashMap3.put(topology12.getId(), topology12);
        hashMap3.put(topology13.getId(), topology13);
        hashMap3.put(topology14.getId(), topology14);
        hashMap3.put(topology15.getId(), topology15);
        Topologies topologies = new Topologies(hashMap3);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Iterator it = hashMap3.values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it.next()).getId())));
        }
        for (User user : resourceAwareScheduler.getUserMap().values()) {
            Assert.assertEquals(user.getTopologiesPending().size(), 0L);
            Assert.assertEquals(user.getTopologiesRunning().size(), 5L);
        }
    }

    @Test
    public void testHandlingClusterSubscription() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(200.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(10240.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jerry", new HashMap());
        ((Map) hashMap2.get("jerry")).put("cpu", 1000);
        ((Map) hashMap2.get("jerry")).put("memory", Double.valueOf(8192.0d));
        hashMap2.put("bobby", new HashMap());
        ((Map) hashMap2.get("bobby")).put("cpu", Double.valueOf(10000.0d));
        ((Map) hashMap2.get("bobby")).put("memory", 32768);
        hashMap2.put("derek", new HashMap());
        ((Map) hashMap2.get("derek")).put("cpu", Double.valueOf(5000.0d));
        ((Map) hashMap2.get("derek")).put("memory", Double.valueOf(16384.0d));
        config.put("resource.aware.scheduler.user.pools", hashMap2);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, "jerry");
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topology.getId(), topology);
        hashMap3.put(topology2.getId(), topology2);
        Topologies topologies = new Topologies(hashMap3);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        int i = 0;
        Iterator it = hashMap3.values().iterator();
        while (it.hasNext()) {
            if (TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it.next()).getId()))) {
                i++;
            }
        }
        Assert.assertEquals("# of Fully scheduled", 1L, i);
        Assert.assertEquals("# of topologies schedule attempted", 1L, resourceAwareScheduler.getUser("jerry").getTopologiesAttempted().size());
        Assert.assertEquals("# of topologies running", 1L, resourceAwareScheduler.getUser("jerry").getTopologiesRunning().size());
        Assert.assertEquals("# of topologies schedule pending", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesPending().size());
    }

    @Test
    public void TestFaultTolerance() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(6, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(100.0d));
        config.put("topology.component.resources.offheap.memory.mb", 500);
        config.put("topology.component.resources.onheap.memory.mb", 500);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jerry", new HashMap());
        ((Map) hashMap2.get("jerry")).put("cpu", Double.valueOf(50.0d));
        ((Map) hashMap2.get("jerry")).put("memory", Double.valueOf(500.0d));
        hashMap2.put("bobby", new HashMap());
        ((Map) hashMap2.get("bobby")).put("cpu", Double.valueOf(200.0d));
        ((Map) hashMap2.get("bobby")).put("memory", Double.valueOf(2000.0d));
        hashMap2.put("derek", new HashMap());
        ((Map) hashMap2.get("derek")).put("cpu", Double.valueOf(100.0d));
        ((Map) hashMap2.get("derek")).put("memory", Double.valueOf(1000.0d));
        config.put("resource.aware.scheduler.user.pools", hashMap2);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, "jerry");
        TopologyDetails topology3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10, "bobby");
        TopologyDetails topology4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, "bobby");
        TopologyDetails topology5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, "derek");
        TopologyDetails topology6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10, "derek");
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topology.getId(), topology);
        hashMap3.put(topology2.getId(), topology2);
        hashMap3.put(topology3.getId(), topology3);
        hashMap3.put(topology4.getId(), topology4);
        hashMap3.put(topology5.getId(), topology5);
        hashMap3.put(topology6.getId(), topology6);
        Topologies topologies = new Topologies(hashMap3);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Iterator it = resourceAwareScheduler.getUser("jerry").getTopologiesRunning().iterator();
        while (it.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 2L, resourceAwareScheduler.getUser("jerry").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesInvalid().size());
        Iterator it2 = resourceAwareScheduler.getUser("derek").getTopologiesRunning().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it2.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 2L, resourceAwareScheduler.getUser("derek").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesInvalid().size());
        Iterator it3 = resourceAwareScheduler.getUser("bobby").getTopologiesRunning().iterator();
        while (it3.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster.getStatusMap().get(((TopologyDetails) it3.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 2L, resourceAwareScheduler.getUser("bobby").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesPending().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesInvalid().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesAttempted().size());
        SupervisorDetails supervisorDetails = (SupervisorDetails) cluster.getSupervisors().values().iterator().next();
        LOG.info("/***** failing supervisor: {} ****/", supervisorDetails.getHost());
        genSupervisors.remove(supervisorDetails.getId());
        HashMap hashMap4 = new HashMap();
        for (Map.Entry entry : cluster.getAssignments().entrySet()) {
            String str = (String) entry.getKey();
            SchedulerAssignment schedulerAssignment = (SchedulerAssignment) entry.getValue();
            HashMap hashMap5 = new HashMap();
            for (Map.Entry entry2 : schedulerAssignment.getExecutorToSlot().entrySet()) {
                ExecutorDetails executorDetails = (ExecutorDetails) entry2.getKey();
                WorkerSlot workerSlot = (WorkerSlot) entry2.getValue();
                if (!workerSlot.getNodeId().equals(supervisorDetails.getId())) {
                    hashMap5.put(executorDetails, workerSlot);
                }
            }
            hashMap4.put(str, new SchedulerAssignmentImpl(str, hashMap5));
        }
        Map statusMap = cluster.getStatusMap();
        Cluster cluster2 = new Cluster(iNimbusTest, genSupervisors, hashMap4, config);
        cluster2.setStatusMap(statusMap);
        resourceAwareScheduler.schedule(topologies, cluster2);
        Iterator it4 = resourceAwareScheduler.getUser("jerry").getTopologiesRunning().iterator();
        while (it4.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster2.getStatusMap().get(((TopologyDetails) it4.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 1L, resourceAwareScheduler.getUser("jerry").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 1L, resourceAwareScheduler.getUser("jerry").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("jerry").getTopologiesInvalid().size());
        Iterator it5 = resourceAwareScheduler.getUser("derek").getTopologiesRunning().iterator();
        while (it5.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster2.getStatusMap().get(((TopologyDetails) it5.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 2L, resourceAwareScheduler.getUser("derek").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesPending().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesAttempted().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("derek").getTopologiesInvalid().size());
        Iterator it6 = resourceAwareScheduler.getUser("bobby").getTopologiesRunning().iterator();
        while (it6.hasNext()) {
            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess((String) cluster2.getStatusMap().get(((TopologyDetails) it6.next()).getId())));
        }
        Assert.assertEquals("# of running topologies", 2L, resourceAwareScheduler.getUser("bobby").getTopologiesRunning().size());
        Assert.assertEquals("# of pending topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesPending().size());
        Assert.assertEquals("# of invalid topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesInvalid().size());
        Assert.assertEquals("# of attempted topologies", 0L, resourceAwareScheduler.getUser("bobby").getTopologiesAttempted().size());
    }

    @Test
    public void TestNodeFreeSlot() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(100.0d));
        config.put("topology.component.resources.offheap.memory.mb", 500);
        config.put("topology.component.resources.onheap.memory.mb", 500);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29, "user");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10, "user");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topology.getId(), topology);
        hashMap2.put(topology2.getId(), topology2);
        Topologies topologies = new Topologies(hashMap2);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Map allNodesFrom = RAS_Nodes.getAllNodesFrom(cluster, topologies);
        for (SchedulerAssignment schedulerAssignment : cluster.getAssignments().values()) {
            for (WorkerSlot workerSlot : schedulerAssignment.getSlots()) {
                double doubleValue = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableMemoryResources().doubleValue();
                double doubleValue2 = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableCpuResources().doubleValue();
                double memoryUsedByWorker = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getMemoryUsedByWorker(workerSlot);
                Assert.assertEquals("Check if memory used by worker is calculated correctly", 1000.0d, memoryUsedByWorker, 0.001d);
                double cpuUsedByWorker = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getCpuUsedByWorker(workerSlot);
                Assert.assertEquals("Check if CPU used by worker is calculated correctly", 100.0d, cpuUsedByWorker, 0.001d);
                ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).free(workerSlot);
                double doubleValue3 = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableMemoryResources().doubleValue();
                double doubleValue4 = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableCpuResources().doubleValue();
                Assert.assertEquals("Check if free correctly frees amount of memory", doubleValue + memoryUsedByWorker, doubleValue3, 0.001d);
                Assert.assertEquals("Check if free correctly frees amount of memory", doubleValue2 + cpuUsedByWorker, doubleValue4, 0.001d);
                Assert.assertFalse("Check if worker was removed from assignments", schedulerAssignment.getSlotToExecutors().containsKey(workerSlot));
            }
        }
    }

    @Test
    public void TestSchedulingAfterFailedScheduling() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(8, 4, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(100.0d));
        config.put("topology.component.resources.offheap.memory.mb", 500);
        config.put("topology.component.resources.onheap.memory.mb", 500);
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        TopologyDetails topology = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10, "jerry");
        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20, "jerry");
        TopologyDetails topology3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20, "jerry");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topology.getId(), topology);
        hashMap2.put(topology2.getId(), topology2);
        hashMap2.put(topology3.getId(), topology3);
        Topologies topologies = new Topologies(hashMap2);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Assert.assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(topology2.getId()) != null);
        Assert.assertEquals("Topo-2 all executors scheduled?", 4L, cluster.getAssignmentById(topology2.getId()).getExecutorToSlot().size());
        Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topology3.getId()) != null);
        Assert.assertEquals("Topo-3 all executors scheduled?", 3L, cluster.getAssignmentById(topology3.getId()).getExecutorToSlot().size());
    }

    @Test
    public void TestMultipleSpoutsAndCyclicTopologies() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(), 5);
        topologyBuilder.setSpout("spout-2", new TestUtilsForResourceAwareScheduler.TestSpout(), 5);
        topologyBuilder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("spout-1").shuffleGrouping("bolt-3");
        topologyBuilder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("bolt-1");
        topologyBuilder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("bolt-2").shuffleGrouping("spout-2");
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(100.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(1000.0d));
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(25, 1, hashMap);
        Config config = new Config();
        config.putAll(Utils.readDefaultConfig());
        config.put("resource.aware.scheduler.eviction.strategy", DefaultEvictionStrategy.class.getName());
        config.put("resource.aware.scheduler.priority.strategy", DefaultSchedulingPriorityStrategy.class.getName());
        config.put("topology.scheduler.strategy", DefaultResourceAwareStrategy.class.getName());
        config.put("topology.component.cpu.pcore.percent", Double.valueOf(100.0d));
        config.put("topology.component.resources.offheap.memory.mb", 500);
        config.put("topology.component.resources.onheap.memory.mb", 500);
        config.put("topology.worker.max.heap.size.mb", Double.valueOf(Double.MAX_VALUE));
        StormTopology createTopology = topologyBuilder.createTopology();
        TopologyDetails topologyDetails = new TopologyDetails("topo-1", config, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "jerry");
        Cluster cluster = new Cluster(iNimbusTest, genSupervisors, new HashMap(), config);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topologyDetails.getId(), topologyDetails);
        Topologies topologies = new Topologies(hashMap2);
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        Assert.assertTrue("Topo scheduled?", cluster.getAssignmentById(topologyDetails.getId()) != null);
        Assert.assertEquals("Topo all executors scheduled?", 25L, cluster.getAssignmentById(topologyDetails.getId()).getExecutorToSlot().size());
    }
}
