package org.apache.flink.runtime.instance;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.topology.NetworkNode;
import org.apache.flink.runtime.topology.NetworkTopology;

/* loaded from: input_file:org/apache/flink/runtime/instance/DefaultInstanceManager.class */
public class DefaultInstanceManager implements InstanceManager {
    private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
    private static final int DEFAULT_CLEANUP_INTERVAL = 120;
    private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
    private final long cleanUpInterval;
    private final NetworkTopology networkTopology;
    private InstanceListener instanceListener;
    private boolean shutdown;
    private final Object lock = new Object();
    private final TimerTask cleanupStaleMachines = new TimerTask() { // from class: org.apache.flink.runtime.instance.DefaultInstanceManager.1
        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (DefaultInstanceManager.this.lock) {
                ArrayList arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                for (Map.Entry entry : DefaultInstanceManager.this.registeredHosts.entrySet()) {
                    Instance instance = (Instance) entry.getValue();
                    if (!instance.isStillAlive(DefaultInstanceManager.this.cleanUpInterval)) {
                        for (AllocatedSlot allocatedSlot : instance.removeAllocatedSlots()) {
                            JobID jobID = allocatedSlot.getJobID();
                            List list = (List) hashMap.get(jobID);
                            if (list == null) {
                                list = new ArrayList();
                                hashMap.put(jobID, list);
                            }
                            list.add(new AllocatedResource(instance, allocatedSlot.getAllocationID()));
                        }
                        arrayList.add(entry);
                    }
                }
                DefaultInstanceManager.this.registeredHosts.entrySet().removeAll(arrayList);
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    if (DefaultInstanceManager.this.instanceListener != null) {
                        DefaultInstanceManager.this.instanceListener.allocatedResourcesDied((JobID) entry2.getKey(), (List) entry2.getValue());
                    }
                }
            }
        }
    };
    private final Map<InstanceConnectionInfo, Instance> registeredHosts = new HashMap();

    public DefaultInstanceManager() {
        long integer = GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
        if (integer < 10) {
            LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of 120 secs.");
            integer = 120;
        }
        this.cleanUpInterval = integer;
        this.networkTopology = NetworkTopology.createEmptyTopology();
        new Timer(true).schedule(this.cleanupStaleMachines, 1000L, 1000L);
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void shutdown() {
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            this.cleanupStaleMachines.cancel();
            Iterator<Instance> it = this.registeredHosts.values().iterator();
            while (it.hasNext()) {
                it.next().destroyProxies();
            }
            this.registeredHosts.clear();
            this.shutdown = true;
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException {
        synchronized (this.lock) {
            allocatedResource.getInstance().releaseSlot(allocatedResource.getAllocationID());
        }
    }

    private Instance createNewHost(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int i) {
        NetworkNode nodeByName;
        int lastIndexOf;
        String hostname = instanceConnectionInfo.hostname();
        NetworkNode rootNode = this.networkTopology.getRootNode();
        while (true) {
            nodeByName = this.networkTopology.getNodeByName(hostname);
            if (nodeByName == null && (lastIndexOf = hostname.lastIndexOf(46)) != -1) {
                hostname = hostname.substring(0, lastIndexOf);
            }
        }
        if (nodeByName == null) {
            nodeByName = this.networkTopology.getNodeByName(instanceConnectionInfo.address().toString().replaceAll("/", ""));
        }
        if (nodeByName != null) {
            if (nodeByName.getParentNode() != null) {
                rootNode = nodeByName.getParentNode();
            }
            nodeByName.remove();
        }
        LOG.info("Creating instance for " + instanceConnectionInfo + ", parent is " + rootNode.getName());
        return new Instance(instanceConnectionInfo, rootNode, this.networkTopology, hardwareDescription, i);
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
        synchronized (this.lock) {
            Instance instance = this.registeredHosts.get(instanceConnectionInfo);
            if (instance == null) {
                LOG.error("Task manager with connection info " + instanceConnectionInfo + " has not been registered.");
            } else {
                instance.reportHeartBeat();
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int i) {
        synchronized (this.lock) {
            if (this.registeredHosts.containsKey(instanceConnectionInfo)) {
                LOG.error("Task manager with connection info " + instanceConnectionInfo + " has already been registered.");
                return;
            }
            Instance createNewHost = createNewHost(instanceConnectionInfo, hardwareDescription, i);
            if (createNewHost == null) {
                LOG.error("Could not create a new host object for register task manager for connection info " + instanceConnectionInfo);
                return;
            }
            this.registeredHosts.put(instanceConnectionInfo, createNewHost);
            LOG.info("New number of registered hosts is " + this.registeredHosts.size());
            createNewHost.reportHeartBeat();
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void requestInstance(JobID jobID, Configuration configuration, int i) throws InstanceException {
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList();
            int i2 = 0;
            for (Instance instance : this.registeredHosts.values()) {
                while (instance.getNumberOfAvailableSlots() > 0 && i2 < i) {
                    arrayList.add(instance.allocateSlot(jobID));
                    i2++;
                }
            }
            if (i2 < i) {
                throw new InstanceException("Cannot allocate the required number of slots: " + i + ".");
            }
            if (this.instanceListener != null) {
                new InstanceNotifier(this.instanceListener, jobID, arrayList).start();
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public NetworkTopology getNetworkTopology(JobID jobID) {
        return this.networkTopology;
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public void setInstanceListener(InstanceListener instanceListener) {
        synchronized (this.lock) {
            this.instanceListener = instanceListener;
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public Instance getInstanceByName(String str) {
        if (str == null) {
            throw new IllegalArgumentException("Argument name must not be null");
        }
        synchronized (this.lock) {
            for (Instance instance : this.registeredHosts.values()) {
                if (str.equals(instance.getName())) {
                    return instance;
                }
            }
            return null;
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public int getNumberOfTaskTrackers() {
        return this.registeredHosts.size();
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public int getNumberOfSlots() {
        int i = 0;
        Iterator<Instance> it = this.registeredHosts.values().iterator();
        while (it.hasNext()) {
            i += it.next().getNumberOfSlots();
        }
        return i;
    }

    @Override // org.apache.flink.runtime.instance.InstanceManager
    public Map<InstanceConnectionInfo, Instance> getInstances() {
        return this.registeredHosts;
    }
}
