package org.apache.heron.simulator;

import java.lang.Thread;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.HeronTopology;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.config.SystemConfigKey;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.simulator.executors.InstanceExecutor;
import org.apache.heron.simulator.executors.MetricsExecutor;
import org.apache.heron.simulator.executors.StreamExecutor;
import org.apache.heron.simulator.utils.TopologyManager;

/* loaded from: input_file:org/apache/heron/simulator/Simulator.class */
public class Simulator {
    private static final Logger LOG = Logger.getLogger(Simulator.class.getName());
    private final List<InstanceExecutor> instanceExecutors;
    private final ExecutorService threadsPool;
    private SystemConfig systemConfig;
    private StreamExecutor streamExecutor;
    private MetricsExecutor metricsExecutor;

    /* loaded from: input_file:org/apache/heron/simulator/Simulator$DefaultExceptionHandler.class */
    public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler {
        public DefaultExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            try {
                handleException(thread, th);
            } catch (Throwable th2) {
                Simulator.LOG.log(Level.SEVERE, "Failed to handle exception. Process halting", th2);
                Runtime.getRuntime().halt(1);
            }
        }

        private void handleException(Thread thread, Throwable th) {
            Simulator.LOG.severe("Local Mode Process exiting.");
            Simulator.LOG.log(Level.SEVERE, "Exception caught in thread: " + thread.getName() + " with id: " + thread.getId(), th);
            for (Handler handler : Logger.getLogger("").getHandlers()) {
                handler.close();
            }
            Simulator.this.threadsPool.shutdownNow();
            Runtime.getRuntime().halt(1);
        }
    }

    public Simulator() {
        this(true);
    }

    public Simulator(boolean z) {
        this.instanceExecutors = new LinkedList();
        this.threadsPool = Executors.newCachedThreadPool();
        if (z) {
            init();
        }
    }

    protected void init() {
        this.systemConfig = getSystemConfig();
        synchronized (SingletonRegistry.INSTANCE) {
            if (isSystemConfigExisted()) {
                LOG.info("System config already existed.");
            } else {
                LOG.info("System config not existed. Registering...");
                registerSystemConfig(this.systemConfig);
                LOG.info("System config registered.");
            }
        }
    }

    protected boolean isSystemConfigExisted() {
        return SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
    }

    protected void registerSystemConfig(SystemConfig systemConfig) {
        SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
    }

    public void submitTopology(String str, Config config, HeronTopology heronTopology) {
        TopologyAPI.Topology topology = heronTopology.setConfig(config).setName(str).setState(TopologyAPI.TopologyState.RUNNING).getTopology();
        if (!TopologyUtils.verifyTopology(topology)) {
            throw new RuntimeException("Topology object is Malformed");
        }
        if (isTopologyStateful(config)) {
            throw new RuntimeException("Stateful topology is not supported");
        }
        TopologyManager topologyManager = new TopologyManager(topology);
        LOG.info("Physical Plan: \n" + topologyManager.getPhysicalPlan());
        this.streamExecutor = new StreamExecutor(topologyManager);
        this.metricsExecutor = new MetricsExecutor(this.systemConfig);
        Iterator<PhysicalPlans.Instance> it = topologyManager.getPhysicalPlan().getInstancesList().iterator();
        while (it.hasNext()) {
            InstanceExecutor instanceExecutor = new InstanceExecutor(topologyManager.getPhysicalPlan(), it.next().getInstanceId());
            this.streamExecutor.addInstanceExecutor(instanceExecutor);
            this.metricsExecutor.addInstanceExecutor(instanceExecutor);
            this.instanceExecutors.add(instanceExecutor);
        }
        Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
        this.threadsPool.execute(this.metricsExecutor);
        this.threadsPool.execute(this.streamExecutor);
        Iterator<InstanceExecutor> it2 = this.instanceExecutors.iterator();
        while (it2.hasNext()) {
            this.threadsPool.execute(it2.next());
        }
    }

    public void killTopology(String str) {
        LOG.info("To kill topology: " + str);
        stop();
        LOG.info("Topology killed successfully");
    }

    public void activate(String str) {
        LOG.info("To activate topology: " + str);
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().activate();
        }
        LOG.info("Activated topology: " + str);
    }

    public void deactivate(String str) {
        LOG.info("To deactivate topology: " + str);
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().deactivate();
        }
        LOG.info("Deactivated topology:" + str);
    }

    public void shutdown() {
        LOG.info("To shutdown thread pool");
        if (this.threadsPool.isShutdown()) {
            this.threadsPool.shutdownNow();
        }
        LOG.info("Heron simulator exited.");
    }

    public void stop() {
        Iterator<InstanceExecutor> it = this.instanceExecutors.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        LOG.info("To stop Stream Executor");
        this.streamExecutor.stop();
        LOG.info("To stop Metrics Executor");
        this.metricsExecutor.stop();
        this.threadsPool.shutdown();
    }

    protected SystemConfig getSystemConfig() {
        return SystemConfig.newBuilder(true).put(SystemConfigKey.INSTANCE_SET_DATA_TUPLE_CAPACITY, 256).put(SystemConfigKey.INSTANCE_SET_CONTROL_TUPLE_CAPACITY, 256).put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60).put(SystemConfigKey.INSTANCE_EXECUTE_BATCH_TIME, 16).put(SystemConfigKey.INSTANCE_EXECUTE_BATCH_SIZE, ByteAmount.fromBytes(32768L)).put(SystemConfigKey.INSTANCE_EMIT_BATCH_TIME, 16).put(SystemConfigKey.INSTANCE_EMIT_BATCH_SIZE, ByteAmount.fromBytes(32768L)).put(SystemConfigKey.INSTANCE_ACK_BATCH_TIME, 128).put(SystemConfigKey.INSTANCE_ACKNOWLEDGEMENT_NBUCKETS, 10).build();
    }

    private boolean isTopologyStateful(Config config) {
        return Config.TopologyReliabilityMode.EFFECTIVELY_ONCE.equals(Config.TopologyReliabilityMode.valueOf(String.valueOf(config.get(Config.TOPOLOGY_RELIABILITY_MODE))));
    }
}
